1use async_trait::async_trait;
2use revoke_core::{ConfigProvider, Result, RevokeError};
3use crate::types::{ConfigValue, ConfigMetadata, ConfigCache, ConfigChange, ChangeType};
4use crate::format::{ConfigFormat, detect_format};
5use crate::utils::UnpinStream;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use parking_lot::RwLock;
9use futures::Stream;
10use tokio::sync::mpsc;
11use tokio::fs;
12use tracing::{info, error, debug};
13
14pub struct FileConfigProvider {
15 file_path: PathBuf,
16 format: ConfigFormat,
17 cache: ConfigCache,
18 watchers: Arc<RwLock<Vec<mpsc::UnboundedSender<ConfigChange>>>>,
19}
20
21impl FileConfigProvider {
22 pub async fn new(file_path: impl AsRef<Path>) -> Result<Self> {
23 let file_path = file_path.as_ref().to_path_buf();
24
25 let format = detect_format(&file_path)
26 .ok_or_else(|| RevokeError::ConfigError("Unable to detect config format".to_string()))?;
27
28 let provider = Self {
29 file_path,
30 format,
31 cache: ConfigCache::new(),
32 watchers: Arc::new(RwLock::new(Vec::new())),
33 };
34
35 provider.reload().await?;
36 Ok(provider)
37 }
38
39 pub async fn with_format(file_path: impl AsRef<Path>, format: ConfigFormat) -> Result<Self> {
40 let file_path = file_path.as_ref().to_path_buf();
41
42 let provider = Self {
43 file_path,
44 format,
45 cache: ConfigCache::new(),
46 watchers: Arc::new(RwLock::new(Vec::new())),
47 };
48
49 provider.reload().await?;
50 Ok(provider)
51 }
52
53 async fn reload(&self) -> Result<()> {
54 let content = fs::read_to_string(&self.file_path)
55 .await
56 .map_err(|e| RevokeError::IoError(e))?;
57
58 let root_value = self.format
59 .parse(&content)
60 .map_err(|e| RevokeError::ConfigError(format!("Failed to parse config: {}", e)))?;
61
62 self.cache.clear();
64
65 if let serde_json::Value::Object(map) = root_value {
66 for (key, value) in map {
67 let config_value = ConfigValue {
68 key: key.clone(),
69 value,
70 version: 1,
71 metadata: ConfigMetadata::new(),
72 };
73 self.cache.set(key, config_value);
74 }
75 }
76
77 info!("Config reloaded from {}", self.file_path.display());
78 Ok(())
79 }
80
81 async fn save(&self) -> Result<()> {
82 let mut root = serde_json::Map::new();
83
84 for key in self.cache.keys() {
85 if let Some(config_value) = self.cache.get(&key) {
86 root.insert(key, config_value.value);
87 }
88 }
89
90 let root_value = serde_json::Value::Object(root);
91 let content = self.format
92 .serialize(&root_value)
93 .map_err(|e| RevokeError::ConfigError(format!("Failed to serialize config: {}", e)))?;
94
95 fs::write(&self.file_path, content)
96 .await
97 .map_err(|e| RevokeError::IoError(e))?;
98
99 debug!("Config saved to {}", self.file_path.display());
100 Ok(())
101 }
102
103 fn notify_watchers(&self, change: ConfigChange) {
104 let mut watchers = self.watchers.write();
105 watchers.retain(|tx| tx.send(change.clone()).is_ok());
106 }
107
108 #[cfg(feature = "notify")]
109 pub async fn start_file_watcher(&self) -> Result<()> {
110 use notify::{Watcher, RecursiveMode, RecommendedWatcher, Config};
111 use std::time::Duration;
112
113 let (tx, mut rx) = mpsc::channel(100);
114 let file_path = self.file_path.clone();
115
116 let mut watcher = RecommendedWatcher::new(
117 move |res| {
118 if let Ok(event) = res {
119 let _ = tx.blocking_send(event);
120 }
121 },
122 Config::default().with_poll_interval(Duration::from_secs(2)),
123 ).map_err(|e| RevokeError::IoError(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
124
125 watcher.watch(self.file_path.parent().unwrap_or(Path::new(".")), RecursiveMode::NonRecursive)
126 .map_err(|e| RevokeError::IoError(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
127
128 let provider_clone = self.clone();
129 tokio::spawn(async move {
130 while let Some(event) = rx.recv().await {
131 if event.paths.contains(&file_path) {
132 debug!("File change detected, reloading config");
133 if let Err(e) = provider_clone.reload().await {
134 error!("Failed to reload config: {}", e);
135 }
136 }
137 }
138 });
139
140 Ok(())
141 }
142}
143
144impl Clone for FileConfigProvider {
145 fn clone(&self) -> Self {
146 Self {
147 file_path: self.file_path.clone(),
148 format: self.format,
149 cache: ConfigCache::new(), watchers: Arc::clone(&self.watchers),
151 }
152 }
153}
154
155#[async_trait]
156impl ConfigProvider for FileConfigProvider {
157 async fn get(&self, key: &str) -> Result<String> {
158 self.cache
159 .get(key)
160 .map(|v| v.value.to_string())
161 .ok_or_else(|| RevokeError::ConfigError(format!("Key not found: {}", key)))
162 }
163
164 async fn set(&self, key: &str, value: &str) -> Result<()> {
165 let json_value: serde_json::Value = serde_json::from_str(value)
166 .unwrap_or_else(|_| serde_json::Value::String(value.to_string()));
167
168 let old_value = self.cache.get(key);
169 let version = old_value.as_ref().map(|v| v.version + 1).unwrap_or(1);
170
171 let mut metadata = old_value
172 .as_ref()
173 .map(|v| v.metadata.clone())
174 .unwrap_or_else(ConfigMetadata::new);
175 metadata.updated_at = chrono::Utc::now();
176
177 let config_value = ConfigValue {
178 key: key.to_string(),
179 value: json_value.clone(),
180 version,
181 metadata,
182 };
183
184 self.cache.set(key.to_string(), config_value);
185
186 self.save().await?;
188
189 let change = ConfigChange {
190 key: key.to_string(),
191 old_value: old_value.as_ref().map(|v| v.value.clone()),
192 new_value: Some(json_value),
193 change_type: if old_value.is_some() {
194 ChangeType::Updated
195 } else {
196 ChangeType::Created
197 },
198 };
199
200 self.notify_watchers(change);
201 info!("Config key '{}' updated and saved", key);
202
203 Ok(())
204 }
205
206 async fn watch(&self, key: &str) -> Result<Box<dyn Stream<Item = String> + Send + Unpin>> {
207 let (tx, rx) = mpsc::unbounded_channel();
208 self.watchers.write().push(tx);
209
210 let key = key.to_string();
211 let stream = async_stream::stream! {
212 let mut rx = rx;
213 while let Some(change) = rx.recv().await {
214 if change.key == key {
215 if let Some(new_value) = change.new_value {
216 yield new_value.to_string();
217 }
218 }
219 }
220 };
221
222 Ok(Box::new(UnpinStream::new(stream)) as Box<dyn Stream<Item = String> + Send + Unpin>)
223 }
224}