distributed_config/
watcher.rs1use crate::value::ConfigValue;
4use serde::{Deserialize, Serialize};
5use std::time::SystemTime;
6use tokio::sync::broadcast;
7use tracing::debug;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct ConfigChange {
12 pub key: String,
14 pub old_value: Option<ConfigValue>,
16 pub new_value: ConfigValue,
18 pub timestamp: SystemTime,
20 pub changed_by: String,
22}
23
24pub struct ConfigWatcher {
26 key_pattern: String,
28 receiver: broadcast::Receiver<ConfigChange>,
30}
31
32impl ConfigWatcher {
33 pub fn new(key_pattern: String, receiver: broadcast::Receiver<ConfigChange>) -> Self {
35 Self {
36 key_pattern,
37 receiver,
38 }
39 }
40
41 pub async fn next(&mut self) -> Option<ConfigChange> {
43 loop {
44 match self.receiver.recv().await {
45 Ok(change) => {
46 if self.matches_pattern(&change.key) {
47 debug!(
48 "Configuration change matched pattern '{}': {}",
49 self.key_pattern, change.key
50 );
51 return Some(change);
52 }
53 }
55 Err(broadcast::error::RecvError::Closed) => {
56 debug!("Configuration change channel closed");
57 return None;
58 }
59 Err(broadcast::error::RecvError::Lagged(_)) => {
60 debug!("Configuration watcher lagged, continuing...");
62 continue;
63 }
64 }
65 }
66 }
67
68 pub fn pattern(&self) -> &str {
70 &self.key_pattern
71 }
72
73 fn matches_pattern(&self, key: &str) -> bool {
75 matches_pattern(&self.key_pattern, key)
76 }
77}
78
79fn matches_pattern(pattern: &str, key: &str) -> bool {
81 if pattern == key {
83 return true;
84 }
85
86 if pattern.is_empty() {
88 return true;
89 }
90
91 if pattern.contains('*') {
93 return matches_wildcard_pattern(pattern, key);
94 }
95
96 if pattern.ends_with('.') {
98 return key.starts_with(pattern);
99 }
100
101 let pattern_with_dot = format!("{pattern}.");
103 if key.starts_with(&pattern_with_dot) {
104 return true;
105 }
106
107 false
108}
109
110fn matches_wildcard_pattern(pattern: &str, key: &str) -> bool {
112 let pattern_parts: Vec<&str> = pattern.split('*').collect();
114
115 if pattern_parts.len() == 1 {
116 return pattern == key;
118 }
119
120 let mut key_pos = 0;
121
122 for (i, part) in pattern_parts.iter().enumerate() {
123 if part.is_empty() {
124 continue;
125 }
126
127 if i == 0 {
128 if !key[key_pos..].starts_with(part) {
130 return false;
131 }
132 key_pos += part.len();
133 } else if i == pattern_parts.len() - 1 {
134 return key[key_pos..].ends_with(part);
136 } else {
137 if let Some(pos) = key[key_pos..].find(part) {
139 key_pos += pos + part.len();
140 } else {
141 return false;
142 }
143 }
144 }
145
146 true
147}
148
149pub struct WatcherBuilder {
151 patterns: Vec<String>,
152}
153
154impl WatcherBuilder {
155 pub fn new() -> Self {
157 Self {
158 patterns: Vec::new(),
159 }
160 }
161
162 pub fn watch<S: Into<String>>(mut self, pattern: S) -> Self {
164 self.patterns.push(pattern.into());
165 self
166 }
167
168 pub fn build(self, receiver: broadcast::Receiver<ConfigChange>) -> Vec<ConfigWatcher> {
170 self.patterns
171 .into_iter()
172 .map(|pattern| ConfigWatcher::new(pattern, receiver.resubscribe()))
173 .collect()
174 }
175}
176
177impl Default for WatcherBuilder {
178 fn default() -> Self {
179 Self::new()
180 }
181}
182
183pub struct ChangeFilter {
185 include_patterns: Vec<String>,
187 exclude_patterns: Vec<String>,
189 debounce_duration: Option<std::time::Duration>,
191 last_notification: Option<SystemTime>,
193}
194
195impl ChangeFilter {
196 pub fn new() -> Self {
198 Self {
199 include_patterns: Vec::new(),
200 exclude_patterns: Vec::new(),
201 debounce_duration: None,
202 last_notification: None,
203 }
204 }
205
206 pub fn include<S: Into<String>>(mut self, pattern: S) -> Self {
208 self.include_patterns.push(pattern.into());
209 self
210 }
211
212 pub fn exclude<S: Into<String>>(mut self, pattern: S) -> Self {
214 self.exclude_patterns.push(pattern.into());
215 self
216 }
217
218 pub fn debounce(mut self, duration: std::time::Duration) -> Self {
220 self.debounce_duration = Some(duration);
221 self
222 }
223
224 pub fn should_process(&mut self, change: &ConfigChange) -> bool {
226 if let Some(debounce_duration) = self.debounce_duration {
228 if let Some(last_time) = self.last_notification {
229 if change
230 .timestamp
231 .duration_since(last_time)
232 .unwrap_or_default()
233 < debounce_duration
234 {
235 return false;
236 }
237 }
238 }
239
240 for exclude_pattern in &self.exclude_patterns {
242 if matches_pattern(exclude_pattern, &change.key) {
243 return false;
244 }
245 }
246
247 if self.include_patterns.is_empty() {
249 self.last_notification = Some(change.timestamp);
250 return true;
251 }
252
253 for include_pattern in &self.include_patterns {
255 if matches_pattern(include_pattern, &change.key) {
256 self.last_notification = Some(change.timestamp);
257 return true;
258 }
259 }
260
261 false
262 }
263}
264
265impl Default for ChangeFilter {
266 fn default() -> Self {
267 Self::new()
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274 use std::time::Duration;
275 use tokio::sync::broadcast;
276
277 #[test]
278 fn test_pattern_matching() {
279 assert!(matches_pattern("app.database", "app.database"));
280 assert!(matches_pattern("app.database", "app.database.host"));
281 assert!(matches_pattern("app.", "app.database.host"));
282 assert!(matches_pattern("", "anything"));
283 assert!(matches_pattern("app.*", "app.database"));
284 assert!(matches_pattern("app.*", "app.cache"));
285 assert!(matches_pattern("*.host", "database.host"));
286 assert!(matches_pattern("*.host", "cache.host"));
287
288 assert!(!matches_pattern("app.database", "app.cache"));
289 assert!(!matches_pattern("app.database.host", "app.database"));
290 assert!(!matches_pattern("database", "app.database"));
291 }
292
293 #[tokio::test]
294 async fn test_config_watcher() {
295 let (tx, rx) = broadcast::channel(10);
296 let mut watcher = ConfigWatcher::new("app.database".to_string(), rx);
297
298 let change = ConfigChange {
300 key: "app.database.host".to_string(),
301 old_value: None,
302 new_value: ConfigValue::String("localhost".to_string()),
303 timestamp: SystemTime::now(),
304 changed_by: "test".to_string(),
305 };
306
307 tx.send(change.clone()).unwrap();
308
309 let received = watcher.next().await.unwrap();
311 assert_eq!(received.key, "app.database.host");
312 }
313
314 #[tokio::test]
315 async fn test_change_filter() {
316 let mut filter = ChangeFilter::new()
317 .include("app.*")
318 .exclude("app.secret.*")
319 .debounce(Duration::from_millis(100));
320
321 let change1 = ConfigChange {
322 key: "app.database.host".to_string(),
323 old_value: None,
324 new_value: ConfigValue::String("localhost".to_string()),
325 timestamp: SystemTime::now(),
326 changed_by: "test".to_string(),
327 };
328
329 let change2 = ConfigChange {
330 key: "app.secret.key".to_string(),
331 old_value: None,
332 new_value: ConfigValue::String("secret".to_string()),
333 timestamp: SystemTime::now(),
334 changed_by: "test".to_string(),
335 };
336
337 assert!(filter.should_process(&change1));
338 assert!(!filter.should_process(&change2)); let change3 = ConfigChange {
342 key: "app.database.port".to_string(),
343 old_value: None,
344 new_value: ConfigValue::Integer(5432),
345 timestamp: SystemTime::now(),
346 changed_by: "test".to_string(),
347 };
348
349 assert!(!filter.should_process(&change3)); }
351
352 #[test]
353 fn test_wildcard_matching() {
354 assert!(matches_wildcard_pattern("app.*", "app.database"));
355 assert!(matches_wildcard_pattern("app.*", "app.cache"));
356 assert!(matches_wildcard_pattern("*.host", "database.host"));
357 assert!(matches_wildcard_pattern("app.*.host", "app.database.host"));
358 assert!(matches_wildcard_pattern("*", "anything"));
359
360 assert!(!matches_wildcard_pattern("app.*", "database.host"));
361 assert!(!matches_wildcard_pattern("*.host", "database.port"));
362 assert!(!matches_wildcard_pattern("app.*.host", "app.database.port"));
363 }
364}