1use async_trait::async_trait;
2use revoke_core::{ConfigProvider, Result, RevokeError};
3use crate::types::{ConfigValue, ConfigMetadata, ConfigCache, ConfigChange, ChangeType};
4use crate::utils::UnpinStream;
5use reqwest::Client;
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use parking_lot::RwLock;
9use futures::Stream;
10use tokio::sync::mpsc;
11use tokio::time::{interval, Duration};
12use tracing::{info, error, debug};
13use base64::{Engine as _, engine::general_purpose};
14
15const DEFAULT_CLIENT_TIMEOUT: Duration = Duration::from_secs(30);
16const DEFAULT_WATCH_INTERVAL: Duration = Duration::from_secs(5);
17
18#[derive(Debug)]
19pub struct ConsulConfigProvider {
20 client: Client,
21 address: String,
22 namespace: Option<String>,
23 cache: ConfigCache,
24 watchers: Arc<RwLock<Vec<mpsc::UnboundedSender<ConfigChange>>>>,
25 watch_interval: Duration,
26}
27
28#[derive(Debug)]
29pub struct ConsulConfigOptions {
30 pub address: String,
31 pub namespace: Option<String>,
32 pub client_timeout: Duration,
33 pub watch_interval: Duration,
34 pub token: Option<String>,
35}
36
37impl Default for ConsulConfigOptions {
38 fn default() -> Self {
39 Self {
40 address: "http://localhost:8500".to_string(),
41 namespace: None,
42 client_timeout: DEFAULT_CLIENT_TIMEOUT,
43 watch_interval: DEFAULT_WATCH_INTERVAL,
44 token: None,
45 }
46 }
47}
48
49#[derive(Debug, Serialize, Deserialize)]
50struct ConsulKVPair {
51 #[serde(rename = "Key")]
52 key: String,
53 #[serde(rename = "Value")]
54 value: Option<String>,
55 #[serde(rename = "CreateIndex")]
56 create_index: u64,
57 #[serde(rename = "ModifyIndex")]
58 modify_index: u64,
59 #[serde(rename = "Flags")]
60 flags: u64,
61}
62
63impl ConsulConfigProvider {
64 pub fn new(options: ConsulConfigOptions) -> Self {
65 let client = Client::builder()
66 .timeout(options.client_timeout)
67 .build()
68 .unwrap_or_else(|_| Client::new());
69
70 Self {
71 client,
72 address: options.address,
73 namespace: options.namespace,
74 cache: ConfigCache::new(),
75 watchers: Arc::new(RwLock::new(Vec::new())),
76 watch_interval: options.watch_interval,
77 }
78 }
79
80 fn build_key(&self, key: &str) -> String {
81 match &self.namespace {
82 Some(ns) => format!("{}/{}", ns, key),
83 None => key.to_string(),
84 }
85 }
86
87 async fn get_from_consul(&self, key: &str) -> Result<Option<ConsulKVPair>> {
88 let url = format!("{}/v1/kv/{}", self.address, self.build_key(key));
89
90 let response = self.client
91 .get(&url)
92 .send()
93 .await
94 .map_err(|e| RevokeError::ConnectionError(e.to_string()))?;
95
96 if response.status() == 404 {
97 return Ok(None);
98 }
99
100 if !response.status().is_success() {
101 return Err(RevokeError::ConfigError(
102 format!("Consul returned status: {}", response.status())
103 ));
104 }
105
106 let pairs: Vec<ConsulKVPair> = response
107 .json()
108 .await
109 .map_err(|e| RevokeError::Unknown(format!("Failed to parse Consul response: {}", e)))?;
110
111 Ok(pairs.into_iter().next())
112 }
113
114 async fn put_to_consul(&self, key: &str, value: &str) -> Result<()> {
115 let url = format!("{}/v1/kv/{}", self.address, self.build_key(key));
116
117 let response = self.client
118 .put(&url)
119 .body(value.to_string())
120 .send()
121 .await
122 .map_err(|e| RevokeError::ConnectionError(e.to_string()))?;
123
124 if !response.status().is_success() {
125 return Err(RevokeError::ConfigError(
126 format!("Failed to put key: {}", response.status())
127 ));
128 }
129
130 Ok(())
131 }
132
133 async fn list_keys(&self, prefix: &str) -> Result<Vec<String>> {
134 let url = format!("{}/v1/kv/{}?keys", self.address, self.build_key(prefix));
135
136 let response = self.client
137 .get(&url)
138 .send()
139 .await
140 .map_err(|e| RevokeError::ConnectionError(e.to_string()))?;
141
142 if response.status() == 404 {
143 return Ok(Vec::new());
144 }
145
146 if !response.status().is_success() {
147 return Err(RevokeError::ConfigError(
148 format!("Failed to list keys: {}", response.status())
149 ));
150 }
151
152 let keys: Vec<String> = response
153 .json()
154 .await
155 .map_err(|e| RevokeError::Unknown(format!("Failed to parse Consul response: {}", e)))?;
156
157 Ok(keys)
158 }
159
160 fn notify_watchers(&self, change: ConfigChange) {
161 let mut watchers = self.watchers.write();
162 watchers.retain(|tx| tx.send(change.clone()).is_ok());
163 }
164
165 pub async fn start_watch_loop(&self) {
166 let provider = self.clone();
167 tokio::spawn(async move {
168 let mut ticker = interval(provider.watch_interval);
169 let mut known_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
170
171 loop {
172 ticker.tick().await;
173
174 let prefix = provider.namespace.as_deref().unwrap_or("");
176 match provider.list_keys(prefix).await {
177 Ok(keys) => {
178 for key in &keys {
179 let short_key = if let Some(ns) = &provider.namespace {
180 key.strip_prefix(&format!("{}/", ns)).unwrap_or(key)
181 } else {
182 key
183 };
184
185 match provider.get_from_consul(short_key).await {
186 Ok(Some(kv_pair)) => {
187 if let Some(encoded_value) = kv_pair.value {
188 if let Ok(decoded) = general_purpose::STANDARD.decode(&encoded_value) {
189 if let Ok(value_str) = String::from_utf8(decoded) {
190 let json_value: serde_json::Value = serde_json::from_str(&value_str)
191 .unwrap_or_else(|_| serde_json::Value::String(value_str.clone()));
192
193 let old_value = provider.cache.get(short_key);
194 let is_new = !known_keys.contains(short_key);
195 known_keys.insert(short_key.to_string());
196
197 if is_new || old_value.as_ref().map(|v| &v.value) != Some(&json_value) {
198 let version = old_value.as_ref().map(|v| v.version + 1).unwrap_or(1);
199
200 let config_value = ConfigValue {
201 key: short_key.to_string(),
202 value: json_value.clone(),
203 version,
204 metadata: ConfigMetadata::new(),
205 };
206
207 provider.cache.set(short_key.to_string(), config_value);
208
209 let change = ConfigChange {
210 key: short_key.to_string(),
211 old_value: old_value.as_ref().map(|v| v.value.clone()),
212 new_value: Some(json_value),
213 change_type: if is_new {
214 ChangeType::Created
215 } else {
216 ChangeType::Updated
217 },
218 };
219
220 provider.notify_watchers(change);
221 debug!("Config change detected for key: {}", short_key);
222 }
223 }
224 }
225 }
226 }
227 Ok(None) => {
228 if known_keys.remove(short_key) {
230 if let Some(old_value) = provider.cache.remove(short_key) {
231 let change = ConfigChange {
232 key: short_key.to_string(),
233 old_value: Some(old_value.value),
234 new_value: None,
235 change_type: ChangeType::Deleted,
236 };
237 provider.notify_watchers(change);
238 debug!("Config key deleted: {}", short_key);
239 }
240 }
241 }
242 Err(e) => {
243 error!("Error watching key {}: {}", short_key, e);
244 }
245 }
246 }
247 }
248 Err(e) => {
249 error!("Error listing keys: {}", e);
250 }
251 }
252 }
253 });
254 }
255}
256
257impl Clone for ConsulConfigProvider {
258 fn clone(&self) -> Self {
259 Self {
260 client: self.client.clone(),
261 address: self.address.clone(),
262 namespace: self.namespace.clone(),
263 cache: ConfigCache::new(), watchers: Arc::clone(&self.watchers),
265 watch_interval: self.watch_interval,
266 }
267 }
268}
269
270#[async_trait]
271impl ConfigProvider for ConsulConfigProvider {
272 async fn get(&self, key: &str) -> Result<String> {
273 if let Some(cached) = self.cache.get(key) {
275 return Ok(cached.value.to_string());
276 }
277
278 match self.get_from_consul(key).await? {
280 Some(kv_pair) => {
281 if let Some(encoded_value) = kv_pair.value {
282 let decoded = general_purpose::STANDARD
283 .decode(&encoded_value)
284 .map_err(|e| RevokeError::ConfigError(format!("Base64 decode error: {}", e)))?;
285
286 let value_str = String::from_utf8(decoded)
287 .map_err(|e| RevokeError::ConfigError(format!("UTF-8 decode error: {}", e)))?;
288
289 let json_value: serde_json::Value = serde_json::from_str(&value_str)
291 .unwrap_or_else(|_| serde_json::Value::String(value_str.clone()));
292
293 let config_value = ConfigValue {
294 key: key.to_string(),
295 value: json_value,
296 version: 1,
297 metadata: ConfigMetadata::new(),
298 };
299
300 self.cache.set(key.to_string(), config_value);
301 Ok(value_str)
302 } else {
303 Err(RevokeError::ConfigError(format!("Key {} has no value", key)))
304 }
305 }
306 None => Err(RevokeError::ConfigError(format!("Key not found: {}", key))),
307 }
308 }
309
310 async fn set(&self, key: &str, value: &str) -> Result<()> {
311 self.put_to_consul(key, value).await?;
312
313 let json_value: serde_json::Value = serde_json::from_str(value)
314 .unwrap_or_else(|_| serde_json::Value::String(value.to_string()));
315
316 let old_value = self.cache.get(key);
317 let version = old_value.as_ref().map(|v| v.version + 1).unwrap_or(1);
318
319 let config_value = ConfigValue {
320 key: key.to_string(),
321 value: json_value.clone(),
322 version,
323 metadata: ConfigMetadata::new(),
324 };
325
326 self.cache.set(key.to_string(), config_value);
327
328 let change = ConfigChange {
329 key: key.to_string(),
330 old_value: old_value.as_ref().map(|v| v.value.clone()),
331 new_value: Some(json_value),
332 change_type: if old_value.is_some() {
333 ChangeType::Updated
334 } else {
335 ChangeType::Created
336 },
337 };
338
339 self.notify_watchers(change);
340 info!("Config key '{}' updated in Consul", key);
341
342 Ok(())
343 }
344
345 async fn watch(&self, key: &str) -> Result<Box<dyn Stream<Item = String> + Send + Unpin>> {
346 let (tx, rx) = mpsc::unbounded_channel();
347 self.watchers.write().push(tx);
348
349 let key = key.to_string();
350 let stream = async_stream::stream! {
351 let mut rx = rx;
352 while let Some(change) = rx.recv().await {
353 if change.key == key {
354 if let Some(new_value) = change.new_value {
355 yield new_value.to_string();
356 }
357 }
358 }
359 };
360
361 Ok(Box::new(UnpinStream::new(stream)) as Box<dyn Stream<Item = String> + Send + Unpin>)
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368
369 #[test]
370 fn test_build_key() {
371 let provider = ConsulConfigProvider::new(ConsulConfigOptions {
372 namespace: Some("myapp".to_string()),
373 ..Default::default()
374 });
375
376 assert_eq!(provider.build_key("config"), "myapp/config");
377
378 let provider = ConsulConfigProvider::new(ConsulConfigOptions::default());
379 assert_eq!(provider.build_key("config"), "config");
380 }
381}