revoke_config/
file.rs

1use async_trait::async_trait;
2use revoke_core::{ConfigProvider, Result, RevokeError};
3use crate::types::{ConfigValue, ConfigMetadata, ConfigCache, ConfigChange, ChangeType};
4use crate::format::{ConfigFormat, detect_format};
5use crate::utils::UnpinStream;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use parking_lot::RwLock;
9use futures::Stream;
10use tokio::sync::mpsc;
11use tokio::fs;
12use tracing::{info, error, debug};
13
14pub struct FileConfigProvider {
15    file_path: PathBuf,
16    format: ConfigFormat,
17    cache: ConfigCache,
18    watchers: Arc<RwLock<Vec<mpsc::UnboundedSender<ConfigChange>>>>,
19}
20
21impl FileConfigProvider {
22    pub async fn new(file_path: impl AsRef<Path>) -> Result<Self> {
23        let file_path = file_path.as_ref().to_path_buf();
24        
25        let format = detect_format(&file_path)
26            .ok_or_else(|| RevokeError::ConfigError("Unable to detect config format".to_string()))?;
27
28        let provider = Self {
29            file_path,
30            format,
31            cache: ConfigCache::new(),
32            watchers: Arc::new(RwLock::new(Vec::new())),
33        };
34
35        provider.reload().await?;
36        Ok(provider)
37    }
38
39    pub async fn with_format(file_path: impl AsRef<Path>, format: ConfigFormat) -> Result<Self> {
40        let file_path = file_path.as_ref().to_path_buf();
41        
42        let provider = Self {
43            file_path,
44            format,
45            cache: ConfigCache::new(),
46            watchers: Arc::new(RwLock::new(Vec::new())),
47        };
48
49        provider.reload().await?;
50        Ok(provider)
51    }
52
53    async fn reload(&self) -> Result<()> {
54        let content = fs::read_to_string(&self.file_path)
55            .await
56            .map_err(|e| RevokeError::IoError(e))?;
57
58        let root_value = self.format
59            .parse(&content)
60            .map_err(|e| RevokeError::ConfigError(format!("Failed to parse config: {}", e)))?;
61
62        // Clear cache and reload all values
63        self.cache.clear();
64
65        if let serde_json::Value::Object(map) = root_value {
66            for (key, value) in map {
67                let config_value = ConfigValue {
68                    key: key.clone(),
69                    value,
70                    version: 1,
71                    metadata: ConfigMetadata::new(),
72                };
73                self.cache.set(key, config_value);
74            }
75        }
76
77        info!("Config reloaded from {}", self.file_path.display());
78        Ok(())
79    }
80
81    async fn save(&self) -> Result<()> {
82        let mut root = serde_json::Map::new();
83        
84        for key in self.cache.keys() {
85            if let Some(config_value) = self.cache.get(&key) {
86                root.insert(key, config_value.value);
87            }
88        }
89
90        let root_value = serde_json::Value::Object(root);
91        let content = self.format
92            .serialize(&root_value)
93            .map_err(|e| RevokeError::ConfigError(format!("Failed to serialize config: {}", e)))?;
94
95        fs::write(&self.file_path, content)
96            .await
97            .map_err(|e| RevokeError::IoError(e))?;
98
99        debug!("Config saved to {}", self.file_path.display());
100        Ok(())
101    }
102
103    fn notify_watchers(&self, change: ConfigChange) {
104        let mut watchers = self.watchers.write();
105        watchers.retain(|tx| tx.send(change.clone()).is_ok());
106    }
107
108    #[cfg(feature = "notify")]
109    pub async fn start_file_watcher(&self) -> Result<()> {
110        use notify::{Watcher, RecursiveMode, RecommendedWatcher, Config};
111        use std::time::Duration;
112
113        let (tx, mut rx) = mpsc::channel(100);
114        let file_path = self.file_path.clone();
115        
116        let mut watcher = RecommendedWatcher::new(
117            move |res| {
118                if let Ok(event) = res {
119                    let _ = tx.blocking_send(event);
120                }
121            },
122            Config::default().with_poll_interval(Duration::from_secs(2)),
123        ).map_err(|e| RevokeError::IoError(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
124
125        watcher.watch(self.file_path.parent().unwrap_or(Path::new(".")), RecursiveMode::NonRecursive)
126            .map_err(|e| RevokeError::IoError(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
127
128        let provider_clone = self.clone();
129        tokio::spawn(async move {
130            while let Some(event) = rx.recv().await {
131                if event.paths.contains(&file_path) {
132                    debug!("File change detected, reloading config");
133                    if let Err(e) = provider_clone.reload().await {
134                        error!("Failed to reload config: {}", e);
135                    }
136                }
137            }
138        });
139
140        Ok(())
141    }
142}
143
144impl Clone for FileConfigProvider {
145    fn clone(&self) -> Self {
146        Self {
147            file_path: self.file_path.clone(),
148            format: self.format,
149            cache: ConfigCache::new(), // Note: This creates a new cache
150            watchers: Arc::clone(&self.watchers),
151        }
152    }
153}
154
155#[async_trait]
156impl ConfigProvider for FileConfigProvider {
157    async fn get(&self, key: &str) -> Result<String> {
158        self.cache
159            .get(key)
160            .map(|v| v.value.to_string())
161            .ok_or_else(|| RevokeError::ConfigError(format!("Key not found: {}", key)))
162    }
163
164    async fn set(&self, key: &str, value: &str) -> Result<()> {
165        let json_value: serde_json::Value = serde_json::from_str(value)
166            .unwrap_or_else(|_| serde_json::Value::String(value.to_string()));
167
168        let old_value = self.cache.get(key);
169        let version = old_value.as_ref().map(|v| v.version + 1).unwrap_or(1);
170        
171        let mut metadata = old_value
172            .as_ref()
173            .map(|v| v.metadata.clone())
174            .unwrap_or_else(ConfigMetadata::new);
175        metadata.updated_at = chrono::Utc::now();
176
177        let config_value = ConfigValue {
178            key: key.to_string(),
179            value: json_value.clone(),
180            version,
181            metadata,
182        };
183
184        self.cache.set(key.to_string(), config_value);
185
186        // Save to file
187        self.save().await?;
188
189        let change = ConfigChange {
190            key: key.to_string(),
191            old_value: old_value.as_ref().map(|v| v.value.clone()),
192            new_value: Some(json_value),
193            change_type: if old_value.is_some() {
194                ChangeType::Updated
195            } else {
196                ChangeType::Created
197            },
198        };
199
200        self.notify_watchers(change);
201        info!("Config key '{}' updated and saved", key);
202
203        Ok(())
204    }
205
206    async fn watch(&self, key: &str) -> Result<Box<dyn Stream<Item = String> + Send + Unpin>> {
207        let (tx, rx) = mpsc::unbounded_channel();
208        self.watchers.write().push(tx);
209
210        let key = key.to_string();
211        let stream = async_stream::stream! {
212            let mut rx = rx;
213            while let Some(change) = rx.recv().await {
214                if change.key == key {
215                    if let Some(new_value) = change.new_value {
216                        yield new_value.to_string();
217                    }
218                }
219            }
220        };
221
222        Ok(Box::new(UnpinStream::new(stream)) as Box<dyn Stream<Item = String> + Send + Unpin>)
223    }
224}