Skip to main content

commy_sdk_rust/
watcher.rs

1//! File watcher and change detection engine
2//!
3//! Monitors temporary variable files for changes and uses SIMD
4//! operations to efficiently identify which variables have changed.
5
6use crate::error::{CommyError, Result};
7use crate::virtual_file::VirtualVariableFile;
8use futures::FutureExt;
9use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
10use std::fs;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::RwLock;
15use tokio::sync::mpsc;
16
17/// Change event for a variable file
18#[derive(Debug, Clone)]
19pub struct FileChangeEvent {
20    /// Path to the changed file
21    pub file_path: PathBuf,
22
23    /// Service ID
24    pub service_id: String,
25
26    /// Variables that changed
27    pub changed_variables: Vec<String>,
28
29    /// Byte ranges that changed
30    pub byte_ranges: Vec<(u64, u64)>,
31}
32
33/// File watcher for variable file changes
34pub struct VariableFileWatcher {
35    /// Watch directory path
36    watch_dir: PathBuf,
37
38    /// Sender for change events
39    tx: mpsc::UnboundedSender<FileChangeEvent>,
40
41    /// Receiver for change events
42    rx: Arc<RwLock<mpsc::UnboundedReceiver<FileChangeEvent>>>,
43
44    /// Virtual files being watched (by service ID)
45    virtual_files: Arc<RwLock<std::collections::HashMap<String, Arc<VirtualVariableFile>>>>,
46
47    /// Stop signal
48    stop_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
49}
50
51impl VariableFileWatcher {
52    /// Create a new variable file watcher
53    pub async fn new(watch_dir: Option<PathBuf>) -> Result<Self> {
54        let watch_dir = match watch_dir {
55            Some(d) => d,
56            None => {
57                // Use system temp directory, create commy subdirectory
58                let temp = dirs::cache_dir().ok_or_else(|| {
59                    CommyError::FileError(std::io::Error::new(
60                        std::io::ErrorKind::NotFound,
61                        "No cache directory available",
62                    ))
63                })?;
64
65                let commy_dir = temp.join("commy_virtual_files");
66                fs::create_dir_all(&commy_dir)?;
67                commy_dir
68            }
69        };
70
71        let (tx, rx) = mpsc::unbounded_channel();
72
73        Ok(Self {
74            watch_dir,
75            tx,
76            rx: Arc::new(RwLock::new(rx)),
77            virtual_files: Arc::new(RwLock::new(std::collections::HashMap::new())),
78            stop_tx: Arc::new(RwLock::new(None)),
79        })
80    }
81
82    /// Get watch directory
83    pub fn watch_dir(&self) -> &Path {
84        &self.watch_dir
85    }
86
87    /// Register a virtual file for watching
88    pub async fn register_virtual_file(
89        &self,
90        service_id: String,
91        vf: Arc<VirtualVariableFile>,
92    ) -> Result<()> {
93        let mut files = self.virtual_files.write().await;
94        files.insert(service_id, vf);
95        Ok(())
96    }
97
98    /// Start watching for changes (spawns background task)
99    pub async fn start_watching(&self) -> Result<()> {
100        let watch_dir = self.watch_dir.clone();
101        let tx = self.tx.clone();
102        let virtual_files = Arc::clone(&self.virtual_files);
103
104        let (stop_tx, mut stop_rx) = tokio::sync::oneshot::channel();
105        *self.stop_tx.write().await = Some(stop_tx);
106
107        tokio::spawn(async move {
108            if let Err(e) = Self::watch_loop(watch_dir, tx, virtual_files, &mut stop_rx).await {
109                eprintln!("Watch loop error: {}", e);
110            }
111        });
112
113        Ok(())
114    }
115
116    /// Background watch loop
117    async fn watch_loop(
118        watch_dir: PathBuf,
119        tx: mpsc::UnboundedSender<FileChangeEvent>,
120        virtual_files: Arc<RwLock<std::collections::HashMap<String, Arc<VirtualVariableFile>>>>,
121        stop_rx: &mut tokio::sync::oneshot::Receiver<()>,
122    ) -> Result<()> {
123        let (file_tx, mut file_rx) = mpsc::unbounded_channel();
124
125        let mut watcher = RecommendedWatcher::new(
126            move |event: std::result::Result<Event, notify::Error>| {
127                if let Ok(evt) = event {
128                    let _ = file_tx.send(evt);
129                }
130            },
131            Config::default().with_poll_interval(Duration::from_millis(100)),
132        )
133        .map_err(|e: notify::Error| CommyError::WatcherError(e.to_string()))?;
134
135        watcher
136            .watch(&watch_dir, RecursiveMode::NonRecursive)
137            .map_err(|e: notify::Error| CommyError::WatcherError(e.to_string()))?;
138
139        loop {
140            tokio::select! {
141                Some(event) = file_rx.recv() => {
142                    match event.kind {
143                        EventKind::Modify(_) => {
144                            for path in event.paths {
145                                if let Err(e) = Self::handle_file_change(
146                                    &path,
147                                    &tx,
148                                    &virtual_files,
149                                ).await {
150                                    eprintln!("Error handling file change: {}", e);
151                                }
152                            }
153                        }
154                        _ => {}
155                    }
156                }
157                _ = &mut *stop_rx => {
158                    break;
159                }
160            }
161        }
162
163        Ok(())
164    }
165
166    /// Handle a file change event
167    async fn handle_file_change(
168        file_path: &Path,
169        tx: &mpsc::UnboundedSender<FileChangeEvent>,
170        virtual_files: &Arc<RwLock<std::collections::HashMap<String, Arc<VirtualVariableFile>>>>,
171    ) -> Result<()> {
172        // Extract service ID from filename (format: service_<id>.mem)
173        let filename = file_path
174            .file_name()
175            .ok_or_else(|| {
176                CommyError::FileError(std::io::Error::new(
177                    std::io::ErrorKind::InvalidInput,
178                    "Invalid filename",
179                ))
180            })?
181            .to_string_lossy();
182
183        if !filename.ends_with(".mem") {
184            return Ok(());
185        }
186
187        let service_id = filename
188            .strip_prefix("service_")
189            .and_then(|s| s.strip_suffix(".mem"))
190            .ok_or_else(|| {
191                CommyError::FileError(std::io::Error::new(
192                    std::io::ErrorKind::InvalidInput,
193                    "Invalid service filename",
194                ))
195            })?;
196
197        // Read the file
198        let new_bytes = tokio::fs::read(file_path).await?;
199
200        // Find the virtual file
201        let vfiles = virtual_files.read().await;
202        if let Some(vf) = vfiles.get(service_id) {
203            // Compare using SIMD
204            let _current = vf.bytes().await;
205            let shadow = vf.shadow_bytes().await;
206
207            if new_bytes == shadow {
208                // No changes detected
209                return Ok(());
210            }
211
212            // Use SIMD diff detection
213            let byte_ranges = VirtualVariableFile::compare_ranges(&new_bytes, &shadow).await?;
214
215            // Identify which variables changed
216            let changed_vars = vf.find_changed_variables_from_diff(&byte_ranges).await?;
217
218            // Update virtual file
219            vf.update_bytes(new_bytes.clone()).await?;
220            vf.mark_variables_changed(changed_vars.clone()).await;
221
222            // Send change event
223            let event = FileChangeEvent {
224                file_path: file_path.to_path_buf(),
225                service_id: service_id.to_string(),
226                changed_variables: changed_vars,
227                byte_ranges,
228            };
229
230            let _ = tx.send(event);
231        }
232
233        Ok(())
234    }
235
236    /// Stop watching
237    pub async fn stop_watching(&self) -> Result<()> {
238        if let Some(stop_tx) = self.stop_tx.write().await.take() {
239            let _ = stop_tx.send(());
240        }
241        Ok(())
242    }
243
244    /// Receive next change event (blocking)
245    pub async fn next_change(&self) -> Option<FileChangeEvent> {
246        let mut rx = self.rx.write().await;
247        rx.recv().await
248    }
249
250    /// Try to receive next change event (non-blocking)
251    pub async fn try_next_change(&self) -> Option<FileChangeEvent> {
252        let mut rx = self.rx.write().await;
253        rx.recv().now_or_never().flatten()
254    }
255}
256
257/// Create temporary file for a service
258pub async fn create_temp_service_file(service_id: &str) -> Result<PathBuf> {
259    let temp_dir = dirs::cache_dir().ok_or_else(|| {
260        CommyError::FileError(std::io::Error::new(
261            std::io::ErrorKind::NotFound,
262            "No cache directory",
263        ))
264    })?;
265
266    let commy_dir = temp_dir.join("commy_virtual_files");
267    fs::create_dir_all(&commy_dir)?;
268
269    let file_path = commy_dir.join(format!("service_{}.mem", service_id));
270
271    // Ensure only current user can read/write
272    #[cfg(unix)]
273    {
274        use std::fs::Permissions;
275        use std::os::unix::fs::PermissionsExt;
276        fs::set_permissions(&file_path, Permissions::from_mode(0o600))?;
277    }
278
279    Ok(file_path)
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[tokio::test]
287    async fn test_watcher_creation() {
288        let watcher = VariableFileWatcher::new(None).await.unwrap();
289        assert!(watcher.watch_dir().exists());
290    }
291
292    #[tokio::test]
293    async fn test_temp_file_creation() {
294        let path = create_temp_service_file("test_service").await.unwrap();
295        assert!(path.to_string_lossy().contains("service_test_service.mem"));
296    }
297}