configvault_sdk/
watcher.rs1use std::time::Duration;
2
3use eventsource_stream::Eventsource;
4use futures_util::StreamExt;
5use reqwest::header::{HeaderMap, HeaderValue};
6use tokio::sync::broadcast;
7
8use crate::models::ConfigChangedEvent;
9
10const DEFAULT_RECONNECT_DELAY: Duration = Duration::from_secs(5);
11const CHANNEL_CAPACITY: usize = 64;
12
13pub struct ConfigWatcher {
18 base_url: String,
19 api_key: String,
20 filter: Option<String>,
21 reconnect_delay: Duration,
22 sender: broadcast::Sender<ConfigChangedEvent>,
23}
24
25impl ConfigWatcher {
26 pub(crate) fn new(
28 base_url: &str,
29 api_key: &str,
30 filter: Option<&str>,
31 _timeout: Duration,
32 ) -> Self {
33 let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);
34 Self {
35 base_url: base_url.trim_end_matches('/').to_string(),
36 api_key: api_key.to_string(),
37 filter: filter.map(str::to_string),
38 reconnect_delay: DEFAULT_RECONNECT_DELAY,
39 sender,
40 }
41 }
42
43 pub fn subscribe(&self) -> broadcast::Receiver<ConfigChangedEvent> {
47 self.sender.subscribe()
48 }
49
50 pub fn start(&self) {
56 let base_url = self.base_url.clone();
57 let api_key = self.api_key.clone();
58 let filter = self.filter.clone();
59 let reconnect_delay = self.reconnect_delay;
60 let sender = self.sender.clone();
61
62 tokio::spawn(async move {
63 loop {
64 let result = stream_events(&base_url, &api_key, &filter, &sender).await;
65 if let Err(e) = result {
66 tracing_or_eprintln(format!("SSE connection error: {e}"));
67 }
68 if sender.receiver_count() == 0 {
70 break;
71 }
72 tokio::time::sleep(reconnect_delay).await;
73 }
74 });
75 }
76
77 pub fn stop(&self) {
79 }
82
83 pub fn with_reconnect_delay(mut self, delay: Duration) -> Self {
85 self.reconnect_delay = delay;
86 self
87 }
88}
89
90async fn stream_events(
92 base_url: &str,
93 api_key: &str,
94 filter: &Option<String>,
95 sender: &broadcast::Sender<ConfigChangedEvent>,
96) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
97 let mut url = format!("{base_url}/events");
98 if let Some(f) = filter {
99 url = format!("{url}?filter={}", urlencoding_encode(f));
100 }
101
102 let mut headers = HeaderMap::new();
103 headers.insert(
104 "X-Api-Key",
105 HeaderValue::from_str(api_key)?,
106 );
107 headers.insert(
108 reqwest::header::ACCEPT,
109 HeaderValue::from_static("text/event-stream"),
110 );
111
112 let client = reqwest::Client::builder()
113 .use_rustls_tls()
114 .default_headers(headers)
115 .build()?;
116
117 let response = client.get(&url).send().await?;
118
119 let stream = response.bytes_stream().eventsource();
120 futures_util::pin_mut!(stream);
121
122 while let Some(result) = stream.next().await {
123 let event = result?;
124 if event.event == "config-changed" {
125 if let Ok(parsed) = serde_json::from_str::<ConfigChangedEvent>(&event.data) {
126 let _ = sender.send(parsed);
128 }
129 }
130 }
131
132 Ok(())
133}
134
135fn urlencoding_encode(s: &str) -> String {
137 let mut encoded = String::with_capacity(s.len());
138 for byte in s.bytes() {
139 match byte {
140 b'A'..=b'Z'
141 | b'a'..=b'z'
142 | b'0'..=b'9'
143 | b'-'
144 | b'_'
145 | b'.'
146 | b'~'
147 | b'/'
148 | b'*' => encoded.push(byte as char),
149 b => encoded.push_str(&format!("%{b:02X}")),
150 }
151 }
152 encoded
153}
154
155#[allow(dead_code)]
158fn tracing_or_eprintln(msg: String) {
159 eprintln!("[configvault-sdk] {msg}");
160}