commy_sdk_rust/
watcher.rs1use crate::error::{CommyError, Result};
7use crate::virtual_file::VirtualVariableFile;
8use futures::FutureExt;
9use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
10use std::fs;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::RwLock;
15use tokio::sync::mpsc;
16
17#[derive(Debug, Clone)]
19pub struct FileChangeEvent {
20 pub file_path: PathBuf,
22
23 pub service_id: String,
25
26 pub changed_variables: Vec<String>,
28
29 pub byte_ranges: Vec<(u64, u64)>,
31}
32
33pub struct VariableFileWatcher {
35 watch_dir: PathBuf,
37
38 tx: mpsc::UnboundedSender<FileChangeEvent>,
40
41 rx: Arc<RwLock<mpsc::UnboundedReceiver<FileChangeEvent>>>,
43
44 virtual_files: Arc<RwLock<std::collections::HashMap<String, Arc<VirtualVariableFile>>>>,
46
47 stop_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
49}
50
51impl VariableFileWatcher {
52 pub async fn new(watch_dir: Option<PathBuf>) -> Result<Self> {
54 let watch_dir = match watch_dir {
55 Some(d) => d,
56 None => {
57 let temp = dirs::cache_dir().ok_or_else(|| {
59 CommyError::FileError(std::io::Error::new(
60 std::io::ErrorKind::NotFound,
61 "No cache directory available",
62 ))
63 })?;
64
65 let commy_dir = temp.join("commy_virtual_files");
66 fs::create_dir_all(&commy_dir)?;
67 commy_dir
68 }
69 };
70
71 let (tx, rx) = mpsc::unbounded_channel();
72
73 Ok(Self {
74 watch_dir,
75 tx,
76 rx: Arc::new(RwLock::new(rx)),
77 virtual_files: Arc::new(RwLock::new(std::collections::HashMap::new())),
78 stop_tx: Arc::new(RwLock::new(None)),
79 })
80 }
81
82 pub fn watch_dir(&self) -> &Path {
84 &self.watch_dir
85 }
86
87 pub async fn register_virtual_file(
89 &self,
90 service_id: String,
91 vf: Arc<VirtualVariableFile>,
92 ) -> Result<()> {
93 let mut files = self.virtual_files.write().await;
94 files.insert(service_id, vf);
95 Ok(())
96 }
97
98 pub async fn start_watching(&self) -> Result<()> {
100 let watch_dir = self.watch_dir.clone();
101 let tx = self.tx.clone();
102 let virtual_files = Arc::clone(&self.virtual_files);
103
104 let (stop_tx, mut stop_rx) = tokio::sync::oneshot::channel();
105 *self.stop_tx.write().await = Some(stop_tx);
106
107 tokio::spawn(async move {
108 if let Err(e) = Self::watch_loop(watch_dir, tx, virtual_files, &mut stop_rx).await {
109 eprintln!("Watch loop error: {}", e);
110 }
111 });
112
113 Ok(())
114 }
115
116 async fn watch_loop(
118 watch_dir: PathBuf,
119 tx: mpsc::UnboundedSender<FileChangeEvent>,
120 virtual_files: Arc<RwLock<std::collections::HashMap<String, Arc<VirtualVariableFile>>>>,
121 stop_rx: &mut tokio::sync::oneshot::Receiver<()>,
122 ) -> Result<()> {
123 let (file_tx, mut file_rx) = mpsc::unbounded_channel();
124
125 let mut watcher = RecommendedWatcher::new(
126 move |event: std::result::Result<Event, notify::Error>| {
127 if let Ok(evt) = event {
128 let _ = file_tx.send(evt);
129 }
130 },
131 Config::default().with_poll_interval(Duration::from_millis(100)),
132 )
133 .map_err(|e: notify::Error| CommyError::WatcherError(e.to_string()))?;
134
135 watcher
136 .watch(&watch_dir, RecursiveMode::NonRecursive)
137 .map_err(|e: notify::Error| CommyError::WatcherError(e.to_string()))?;
138
139 loop {
140 tokio::select! {
141 Some(event) = file_rx.recv() => {
142 match event.kind {
143 EventKind::Modify(_) => {
144 for path in event.paths {
145 if let Err(e) = Self::handle_file_change(
146 &path,
147 &tx,
148 &virtual_files,
149 ).await {
150 eprintln!("Error handling file change: {}", e);
151 }
152 }
153 }
154 _ => {}
155 }
156 }
157 _ = &mut *stop_rx => {
158 break;
159 }
160 }
161 }
162
163 Ok(())
164 }
165
166 async fn handle_file_change(
168 file_path: &Path,
169 tx: &mpsc::UnboundedSender<FileChangeEvent>,
170 virtual_files: &Arc<RwLock<std::collections::HashMap<String, Arc<VirtualVariableFile>>>>,
171 ) -> Result<()> {
172 let filename = file_path
174 .file_name()
175 .ok_or_else(|| {
176 CommyError::FileError(std::io::Error::new(
177 std::io::ErrorKind::InvalidInput,
178 "Invalid filename",
179 ))
180 })?
181 .to_string_lossy();
182
183 if !filename.ends_with(".mem") {
184 return Ok(());
185 }
186
187 let service_id = filename
188 .strip_prefix("service_")
189 .and_then(|s| s.strip_suffix(".mem"))
190 .ok_or_else(|| {
191 CommyError::FileError(std::io::Error::new(
192 std::io::ErrorKind::InvalidInput,
193 "Invalid service filename",
194 ))
195 })?;
196
197 let new_bytes = tokio::fs::read(file_path).await?;
199
200 let vfiles = virtual_files.read().await;
202 if let Some(vf) = vfiles.get(service_id) {
203 let _current = vf.bytes().await;
205 let shadow = vf.shadow_bytes().await;
206
207 if new_bytes == shadow {
208 return Ok(());
210 }
211
212 let byte_ranges = VirtualVariableFile::compare_ranges(&new_bytes, &shadow).await?;
214
215 let changed_vars = vf.find_changed_variables_from_diff(&byte_ranges).await?;
217
218 vf.update_bytes(new_bytes.clone()).await?;
220 vf.mark_variables_changed(changed_vars.clone()).await;
221
222 let event = FileChangeEvent {
224 file_path: file_path.to_path_buf(),
225 service_id: service_id.to_string(),
226 changed_variables: changed_vars,
227 byte_ranges,
228 };
229
230 let _ = tx.send(event);
231 }
232
233 Ok(())
234 }
235
236 pub async fn stop_watching(&self) -> Result<()> {
238 if let Some(stop_tx) = self.stop_tx.write().await.take() {
239 let _ = stop_tx.send(());
240 }
241 Ok(())
242 }
243
244 pub async fn next_change(&self) -> Option<FileChangeEvent> {
246 let mut rx = self.rx.write().await;
247 rx.recv().await
248 }
249
250 pub async fn try_next_change(&self) -> Option<FileChangeEvent> {
252 let mut rx = self.rx.write().await;
253 rx.recv().now_or_never().flatten()
254 }
255}
256
257pub async fn create_temp_service_file(service_id: &str) -> Result<PathBuf> {
259 let temp_dir = dirs::cache_dir().ok_or_else(|| {
260 CommyError::FileError(std::io::Error::new(
261 std::io::ErrorKind::NotFound,
262 "No cache directory",
263 ))
264 })?;
265
266 let commy_dir = temp_dir.join("commy_virtual_files");
267 fs::create_dir_all(&commy_dir)?;
268
269 let file_path = commy_dir.join(format!("service_{}.mem", service_id));
270
271 #[cfg(unix)]
273 {
274 use std::fs::Permissions;
275 use std::os::unix::fs::PermissionsExt;
276 fs::set_permissions(&file_path, Permissions::from_mode(0o600))?;
277 }
278
279 Ok(file_path)
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[tokio::test]
287 async fn test_watcher_creation() {
288 let watcher = VariableFileWatcher::new(None).await.unwrap();
289 assert!(watcher.watch_dir().exists());
290 }
291
292 #[tokio::test]
293 async fn test_temp_file_creation() {
294 let path = create_temp_service_file("test_service").await.unwrap();
295 assert!(path.to_string_lossy().contains("service_test_service.mem"));
296 }
297}