distributed_config/
watcher.rs

1//! Configuration change watching and notification system
2
3use crate::value::ConfigValue;
4use serde::{Deserialize, Serialize};
5use std::time::SystemTime;
6use tokio::sync::broadcast;
7use tracing::debug;
8
9/// Represents a configuration change event
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct ConfigChange {
12    /// The configuration key that changed
13    pub key: String,
14    /// The previous value (None if the key was newly created)
15    pub old_value: Option<ConfigValue>,
16    /// The new value
17    pub new_value: ConfigValue,
18    /// When the change occurred
19    pub timestamp: SystemTime,
20    /// Who or what caused the change
21    pub changed_by: String,
22}
23
24/// Configuration watcher that can be used to monitor changes to specific keys
25pub struct ConfigWatcher {
26    /// Pattern to match against configuration keys
27    key_pattern: String,
28    /// Receiver for configuration change events
29    receiver: broadcast::Receiver<ConfigChange>,
30}
31
32impl ConfigWatcher {
33    /// Create a new configuration watcher
34    pub fn new(key_pattern: String, receiver: broadcast::Receiver<ConfigChange>) -> Self {
35        Self {
36            key_pattern,
37            receiver,
38        }
39    }
40
41    /// Get the next configuration change that matches this watcher's pattern
42    pub async fn next(&mut self) -> Option<ConfigChange> {
43        loop {
44            match self.receiver.recv().await {
45                Ok(change) => {
46                    if self.matches_pattern(&change.key) {
47                        debug!(
48                            "Configuration change matched pattern '{}': {}",
49                            self.key_pattern, change.key
50                        );
51                        return Some(change);
52                    }
53                    // Continue waiting if the change doesn't match our pattern
54                }
55                Err(broadcast::error::RecvError::Closed) => {
56                    debug!("Configuration change channel closed");
57                    return None;
58                }
59                Err(broadcast::error::RecvError::Lagged(_)) => {
60                    // We missed some messages, but continue listening
61                    debug!("Configuration watcher lagged, continuing...");
62                    continue;
63                }
64            }
65        }
66    }
67
68    /// Get the pattern this watcher is monitoring
69    pub fn pattern(&self) -> &str {
70        &self.key_pattern
71    }
72
73    /// Check if a key matches this watcher's pattern
74    fn matches_pattern(&self, key: &str) -> bool {
75        matches_pattern(&self.key_pattern, key)
76    }
77}
78
79/// Check if a configuration key matches a pattern
80fn matches_pattern(pattern: &str, key: &str) -> bool {
81    // Handle exact match
82    if pattern == key {
83        return true;
84    }
85
86    // Handle empty pattern (matches everything)
87    if pattern.is_empty() {
88        return true;
89    }
90
91    // Handle wildcard patterns
92    if pattern.contains('*') {
93        return matches_wildcard_pattern(pattern, key);
94    }
95
96    // Handle prefix match (pattern is a parent of the key)
97    if pattern.ends_with('.') {
98        return key.starts_with(pattern);
99    }
100
101    // Handle parent key match (key starts with pattern + dot)
102    let pattern_with_dot = format!("{pattern}.");
103    if key.starts_with(&pattern_with_dot) {
104        return true;
105    }
106
107    false
108}
109
110/// Check if a key matches a wildcard pattern
111fn matches_wildcard_pattern(pattern: &str, key: &str) -> bool {
112    // Simple glob-style matching
113    let pattern_parts: Vec<&str> = pattern.split('*').collect();
114
115    if pattern_parts.len() == 1 {
116        // No wildcards, just do exact match
117        return pattern == key;
118    }
119
120    let mut key_pos = 0;
121
122    for (i, part) in pattern_parts.iter().enumerate() {
123        if part.is_empty() {
124            continue;
125        }
126
127        if i == 0 {
128            // First part must match the beginning
129            if !key[key_pos..].starts_with(part) {
130                return false;
131            }
132            key_pos += part.len();
133        } else if i == pattern_parts.len() - 1 {
134            // Last part must match the end
135            return key[key_pos..].ends_with(part);
136        } else {
137            // Middle parts
138            if let Some(pos) = key[key_pos..].find(part) {
139                key_pos += pos + part.len();
140            } else {
141                return false;
142            }
143        }
144    }
145
146    true
147}
148
149/// A builder for creating multiple watchers with different patterns
150pub struct WatcherBuilder {
151    patterns: Vec<String>,
152}
153
154impl WatcherBuilder {
155    /// Create a new watcher builder
156    pub fn new() -> Self {
157        Self {
158            patterns: Vec::new(),
159        }
160    }
161
162    /// Add a pattern to watch
163    pub fn watch<S: Into<String>>(mut self, pattern: S) -> Self {
164        self.patterns.push(pattern.into());
165        self
166    }
167
168    /// Build watchers from a broadcast receiver
169    pub fn build(self, receiver: broadcast::Receiver<ConfigChange>) -> Vec<ConfigWatcher> {
170        self.patterns
171            .into_iter()
172            .map(|pattern| ConfigWatcher::new(pattern, receiver.resubscribe()))
173            .collect()
174    }
175}
176
177impl Default for WatcherBuilder {
178    fn default() -> Self {
179        Self::new()
180    }
181}
182
183/// Configuration change filter for more complex matching logic
184pub struct ChangeFilter {
185    /// Key patterns to include
186    include_patterns: Vec<String>,
187    /// Key patterns to exclude
188    exclude_patterns: Vec<String>,
189    /// Minimum time between notifications (debouncing)
190    debounce_duration: Option<std::time::Duration>,
191    /// Last notification time for debouncing
192    last_notification: Option<SystemTime>,
193}
194
195impl ChangeFilter {
196    /// Create a new change filter
197    pub fn new() -> Self {
198        Self {
199            include_patterns: Vec::new(),
200            exclude_patterns: Vec::new(),
201            debounce_duration: None,
202            last_notification: None,
203        }
204    }
205
206    /// Add an include pattern
207    pub fn include<S: Into<String>>(mut self, pattern: S) -> Self {
208        self.include_patterns.push(pattern.into());
209        self
210    }
211
212    /// Add an exclude pattern
213    pub fn exclude<S: Into<String>>(mut self, pattern: S) -> Self {
214        self.exclude_patterns.push(pattern.into());
215        self
216    }
217
218    /// Set debounce duration
219    pub fn debounce(mut self, duration: std::time::Duration) -> Self {
220        self.debounce_duration = Some(duration);
221        self
222    }
223
224    /// Check if a change should be processed based on this filter
225    pub fn should_process(&mut self, change: &ConfigChange) -> bool {
226        // Check debouncing
227        if let Some(debounce_duration) = self.debounce_duration {
228            if let Some(last_time) = self.last_notification {
229                if change
230                    .timestamp
231                    .duration_since(last_time)
232                    .unwrap_or_default()
233                    < debounce_duration
234                {
235                    return false;
236                }
237            }
238        }
239
240        // Check exclude patterns first
241        for exclude_pattern in &self.exclude_patterns {
242            if matches_pattern(exclude_pattern, &change.key) {
243                return false;
244            }
245        }
246
247        // If no include patterns specified, include everything (that wasn't excluded)
248        if self.include_patterns.is_empty() {
249            self.last_notification = Some(change.timestamp);
250            return true;
251        }
252
253        // Check include patterns
254        for include_pattern in &self.include_patterns {
255            if matches_pattern(include_pattern, &change.key) {
256                self.last_notification = Some(change.timestamp);
257                return true;
258            }
259        }
260
261        false
262    }
263}
264
265impl Default for ChangeFilter {
266    fn default() -> Self {
267        Self::new()
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use std::time::Duration;
275    use tokio::sync::broadcast;
276
277    #[test]
278    fn test_pattern_matching() {
279        assert!(matches_pattern("app.database", "app.database"));
280        assert!(matches_pattern("app.database", "app.database.host"));
281        assert!(matches_pattern("app.", "app.database.host"));
282        assert!(matches_pattern("", "anything"));
283        assert!(matches_pattern("app.*", "app.database"));
284        assert!(matches_pattern("app.*", "app.cache"));
285        assert!(matches_pattern("*.host", "database.host"));
286        assert!(matches_pattern("*.host", "cache.host"));
287
288        assert!(!matches_pattern("app.database", "app.cache"));
289        assert!(!matches_pattern("app.database.host", "app.database"));
290        assert!(!matches_pattern("database", "app.database"));
291    }
292
293    #[tokio::test]
294    async fn test_config_watcher() {
295        let (tx, rx) = broadcast::channel(10);
296        let mut watcher = ConfigWatcher::new("app.database".to_string(), rx);
297
298        // Send a matching change
299        let change = ConfigChange {
300            key: "app.database.host".to_string(),
301            old_value: None,
302            new_value: ConfigValue::String("localhost".to_string()),
303            timestamp: SystemTime::now(),
304            changed_by: "test".to_string(),
305        };
306
307        tx.send(change.clone()).unwrap();
308
309        // Should receive the change
310        let received = watcher.next().await.unwrap();
311        assert_eq!(received.key, "app.database.host");
312    }
313
314    #[tokio::test]
315    async fn test_change_filter() {
316        let mut filter = ChangeFilter::new()
317            .include("app.*")
318            .exclude("app.secret.*")
319            .debounce(Duration::from_millis(100));
320
321        let change1 = ConfigChange {
322            key: "app.database.host".to_string(),
323            old_value: None,
324            new_value: ConfigValue::String("localhost".to_string()),
325            timestamp: SystemTime::now(),
326            changed_by: "test".to_string(),
327        };
328
329        let change2 = ConfigChange {
330            key: "app.secret.key".to_string(),
331            old_value: None,
332            new_value: ConfigValue::String("secret".to_string()),
333            timestamp: SystemTime::now(),
334            changed_by: "test".to_string(),
335        };
336
337        assert!(filter.should_process(&change1));
338        assert!(!filter.should_process(&change2)); // Excluded
339
340        // Test debouncing
341        let change3 = ConfigChange {
342            key: "app.database.port".to_string(),
343            old_value: None,
344            new_value: ConfigValue::Integer(5432),
345            timestamp: SystemTime::now(),
346            changed_by: "test".to_string(),
347        };
348
349        assert!(!filter.should_process(&change3)); // Debounced
350    }
351
352    #[test]
353    fn test_wildcard_matching() {
354        assert!(matches_wildcard_pattern("app.*", "app.database"));
355        assert!(matches_wildcard_pattern("app.*", "app.cache"));
356        assert!(matches_wildcard_pattern("*.host", "database.host"));
357        assert!(matches_wildcard_pattern("app.*.host", "app.database.host"));
358        assert!(matches_wildcard_pattern("*", "anything"));
359
360        assert!(!matches_wildcard_pattern("app.*", "database.host"));
361        assert!(!matches_wildcard_pattern("*.host", "database.port"));
362        assert!(!matches_wildcard_pattern("app.*.host", "app.database.port"));
363    }
364}