revoke_config/
consul.rs

1use async_trait::async_trait;
2use revoke_core::{ConfigProvider, Result, RevokeError};
3use crate::types::{ConfigValue, ConfigMetadata, ConfigCache, ConfigChange, ChangeType};
4use crate::utils::UnpinStream;
5use reqwest::Client;
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use parking_lot::RwLock;
9use futures::Stream;
10use tokio::sync::mpsc;
11use tokio::time::{interval, Duration};
12use tracing::{info, error, debug};
13use base64::{Engine as _, engine::general_purpose};
14
15const DEFAULT_CLIENT_TIMEOUT: Duration = Duration::from_secs(30);
16const DEFAULT_WATCH_INTERVAL: Duration = Duration::from_secs(5);
17
18#[derive(Debug)]
19pub struct ConsulConfigProvider {
20    client: Client,
21    address: String,
22    namespace: Option<String>,
23    cache: ConfigCache,
24    watchers: Arc<RwLock<Vec<mpsc::UnboundedSender<ConfigChange>>>>,
25    watch_interval: Duration,
26}
27
28#[derive(Debug)]
29pub struct ConsulConfigOptions {
30    pub address: String,
31    pub namespace: Option<String>,
32    pub client_timeout: Duration,
33    pub watch_interval: Duration,
34    pub token: Option<String>,
35}
36
37impl Default for ConsulConfigOptions {
38    fn default() -> Self {
39        Self {
40            address: "http://localhost:8500".to_string(),
41            namespace: None,
42            client_timeout: DEFAULT_CLIENT_TIMEOUT,
43            watch_interval: DEFAULT_WATCH_INTERVAL,
44            token: None,
45        }
46    }
47}
48
49#[derive(Debug, Serialize, Deserialize)]
50struct ConsulKVPair {
51    #[serde(rename = "Key")]
52    key: String,
53    #[serde(rename = "Value")]
54    value: Option<String>,
55    #[serde(rename = "CreateIndex")]
56    create_index: u64,
57    #[serde(rename = "ModifyIndex")]
58    modify_index: u64,
59    #[serde(rename = "Flags")]
60    flags: u64,
61}
62
63impl ConsulConfigProvider {
64    pub fn new(options: ConsulConfigOptions) -> Self {
65        let client = Client::builder()
66            .timeout(options.client_timeout)
67            .build()
68            .unwrap_or_else(|_| Client::new());
69
70        Self {
71            client,
72            address: options.address,
73            namespace: options.namespace,
74            cache: ConfigCache::new(),
75            watchers: Arc::new(RwLock::new(Vec::new())),
76            watch_interval: options.watch_interval,
77        }
78    }
79
80    fn build_key(&self, key: &str) -> String {
81        match &self.namespace {
82            Some(ns) => format!("{}/{}", ns, key),
83            None => key.to_string(),
84        }
85    }
86
87    async fn get_from_consul(&self, key: &str) -> Result<Option<ConsulKVPair>> {
88        let url = format!("{}/v1/kv/{}", self.address, self.build_key(key));
89        
90        let response = self.client
91            .get(&url)
92            .send()
93            .await
94            .map_err(|e| RevokeError::ConnectionError(e.to_string()))?;
95
96        if response.status() == 404 {
97            return Ok(None);
98        }
99
100        if !response.status().is_success() {
101            return Err(RevokeError::ConfigError(
102                format!("Consul returned status: {}", response.status())
103            ));
104        }
105
106        let pairs: Vec<ConsulKVPair> = response
107            .json()
108            .await
109            .map_err(|e| RevokeError::Unknown(format!("Failed to parse Consul response: {}", e)))?;
110
111        Ok(pairs.into_iter().next())
112    }
113
114    async fn put_to_consul(&self, key: &str, value: &str) -> Result<()> {
115        let url = format!("{}/v1/kv/{}", self.address, self.build_key(key));
116        
117        let response = self.client
118            .put(&url)
119            .body(value.to_string())
120            .send()
121            .await
122            .map_err(|e| RevokeError::ConnectionError(e.to_string()))?;
123
124        if !response.status().is_success() {
125            return Err(RevokeError::ConfigError(
126                format!("Failed to put key: {}", response.status())
127            ));
128        }
129
130        Ok(())
131    }
132
133    async fn list_keys(&self, prefix: &str) -> Result<Vec<String>> {
134        let url = format!("{}/v1/kv/{}?keys", self.address, self.build_key(prefix));
135        
136        let response = self.client
137            .get(&url)
138            .send()
139            .await
140            .map_err(|e| RevokeError::ConnectionError(e.to_string()))?;
141
142        if response.status() == 404 {
143            return Ok(Vec::new());
144        }
145
146        if !response.status().is_success() {
147            return Err(RevokeError::ConfigError(
148                format!("Failed to list keys: {}", response.status())
149            ));
150        }
151
152        let keys: Vec<String> = response
153            .json()
154            .await
155            .map_err(|e| RevokeError::Unknown(format!("Failed to parse Consul response: {}", e)))?;
156
157        Ok(keys)
158    }
159
160    fn notify_watchers(&self, change: ConfigChange) {
161        let mut watchers = self.watchers.write();
162        watchers.retain(|tx| tx.send(change.clone()).is_ok());
163    }
164
165    pub async fn start_watch_loop(&self) {
166        let provider = self.clone();
167        tokio::spawn(async move {
168            let mut ticker = interval(provider.watch_interval);
169            let mut known_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
170            
171            loop {
172                ticker.tick().await;
173                
174                // List all keys in namespace
175                let prefix = provider.namespace.as_deref().unwrap_or("");
176                match provider.list_keys(prefix).await {
177                    Ok(keys) => {
178                        for key in &keys {
179                            let short_key = if let Some(ns) = &provider.namespace {
180                                key.strip_prefix(&format!("{}/", ns)).unwrap_or(key)
181                            } else {
182                                key
183                            };
184
185                            match provider.get_from_consul(short_key).await {
186                                Ok(Some(kv_pair)) => {
187                                    if let Some(encoded_value) = kv_pair.value {
188                                        if let Ok(decoded) = general_purpose::STANDARD.decode(&encoded_value) {
189                                            if let Ok(value_str) = String::from_utf8(decoded) {
190                                                let json_value: serde_json::Value = serde_json::from_str(&value_str)
191                                                    .unwrap_or_else(|_| serde_json::Value::String(value_str.clone()));
192
193                                                let old_value = provider.cache.get(short_key);
194                                                let is_new = !known_keys.contains(short_key);
195                                                known_keys.insert(short_key.to_string());
196
197                                                if is_new || old_value.as_ref().map(|v| &v.value) != Some(&json_value) {
198                                                    let version = old_value.as_ref().map(|v| v.version + 1).unwrap_or(1);
199                                                    
200                                                    let config_value = ConfigValue {
201                                                        key: short_key.to_string(),
202                                                        value: json_value.clone(),
203                                                        version,
204                                                        metadata: ConfigMetadata::new(),
205                                                    };
206
207                                                    provider.cache.set(short_key.to_string(), config_value);
208
209                                                    let change = ConfigChange {
210                                                        key: short_key.to_string(),
211                                                        old_value: old_value.as_ref().map(|v| v.value.clone()),
212                                                        new_value: Some(json_value),
213                                                        change_type: if is_new {
214                                                            ChangeType::Created
215                                                        } else {
216                                                            ChangeType::Updated
217                                                        },
218                                                    };
219
220                                                    provider.notify_watchers(change);
221                                                    debug!("Config change detected for key: {}", short_key);
222                                                }
223                                            }
224                                        }
225                                    }
226                                }
227                                Ok(None) => {
228                                    // Key was deleted
229                                    if known_keys.remove(short_key) {
230                                        if let Some(old_value) = provider.cache.remove(short_key) {
231                                            let change = ConfigChange {
232                                                key: short_key.to_string(),
233                                                old_value: Some(old_value.value),
234                                                new_value: None,
235                                                change_type: ChangeType::Deleted,
236                                            };
237                                            provider.notify_watchers(change);
238                                            debug!("Config key deleted: {}", short_key);
239                                        }
240                                    }
241                                }
242                                Err(e) => {
243                                    error!("Error watching key {}: {}", short_key, e);
244                                }
245                            }
246                        }
247                    }
248                    Err(e) => {
249                        error!("Error listing keys: {}", e);
250                    }
251                }
252            }
253        });
254    }
255}
256
257impl Clone for ConsulConfigProvider {
258    fn clone(&self) -> Self {
259        Self {
260            client: self.client.clone(),
261            address: self.address.clone(),
262            namespace: self.namespace.clone(),
263            cache: ConfigCache::new(), // Note: This creates a new cache
264            watchers: Arc::clone(&self.watchers),
265            watch_interval: self.watch_interval,
266        }
267    }
268}
269
270#[async_trait]
271impl ConfigProvider for ConsulConfigProvider {
272    async fn get(&self, key: &str) -> Result<String> {
273        // Try cache first
274        if let Some(cached) = self.cache.get(key) {
275            return Ok(cached.value.to_string());
276        }
277
278        // Fetch from Consul
279        match self.get_from_consul(key).await? {
280            Some(kv_pair) => {
281                if let Some(encoded_value) = kv_pair.value {
282                    let decoded = general_purpose::STANDARD
283                        .decode(&encoded_value)
284                        .map_err(|e| RevokeError::ConfigError(format!("Base64 decode error: {}", e)))?;
285                    
286                    let value_str = String::from_utf8(decoded)
287                        .map_err(|e| RevokeError::ConfigError(format!("UTF-8 decode error: {}", e)))?;
288                    
289                    // Cache the value
290                    let json_value: serde_json::Value = serde_json::from_str(&value_str)
291                        .unwrap_or_else(|_| serde_json::Value::String(value_str.clone()));
292                    
293                    let config_value = ConfigValue {
294                        key: key.to_string(),
295                        value: json_value,
296                        version: 1,
297                        metadata: ConfigMetadata::new(),
298                    };
299                    
300                    self.cache.set(key.to_string(), config_value);
301                    Ok(value_str)
302                } else {
303                    Err(RevokeError::ConfigError(format!("Key {} has no value", key)))
304                }
305            }
306            None => Err(RevokeError::ConfigError(format!("Key not found: {}", key))),
307        }
308    }
309
310    async fn set(&self, key: &str, value: &str) -> Result<()> {
311        self.put_to_consul(key, value).await?;
312
313        let json_value: serde_json::Value = serde_json::from_str(value)
314            .unwrap_or_else(|_| serde_json::Value::String(value.to_string()));
315
316        let old_value = self.cache.get(key);
317        let version = old_value.as_ref().map(|v| v.version + 1).unwrap_or(1);
318        
319        let config_value = ConfigValue {
320            key: key.to_string(),
321            value: json_value.clone(),
322            version,
323            metadata: ConfigMetadata::new(),
324        };
325
326        self.cache.set(key.to_string(), config_value);
327
328        let change = ConfigChange {
329            key: key.to_string(),
330            old_value: old_value.as_ref().map(|v| v.value.clone()),
331            new_value: Some(json_value),
332            change_type: if old_value.is_some() {
333                ChangeType::Updated
334            } else {
335                ChangeType::Created
336            },
337        };
338
339        self.notify_watchers(change);
340        info!("Config key '{}' updated in Consul", key);
341
342        Ok(())
343    }
344
345    async fn watch(&self, key: &str) -> Result<Box<dyn Stream<Item = String> + Send + Unpin>> {
346        let (tx, rx) = mpsc::unbounded_channel();
347        self.watchers.write().push(tx);
348
349        let key = key.to_string();
350        let stream = async_stream::stream! {
351            let mut rx = rx;
352            while let Some(change) = rx.recv().await {
353                if change.key == key {
354                    if let Some(new_value) = change.new_value {
355                        yield new_value.to_string();
356                    }
357                }
358            }
359        };
360
361        Ok(Box::new(UnpinStream::new(stream)) as Box<dyn Stream<Item = String> + Send + Unpin>)
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368
369    #[test]
370    fn test_build_key() {
371        let provider = ConsulConfigProvider::new(ConsulConfigOptions {
372            namespace: Some("myapp".to_string()),
373            ..Default::default()
374        });
375
376        assert_eq!(provider.build_key("config"), "myapp/config");
377
378        let provider = ConsulConfigProvider::new(ConsulConfigOptions::default());
379        assert_eq!(provider.build_key("config"), "config");
380    }
381}