mmap_io/
watch.rs

1//! File change watching and notification support.
2
3use crate::errors::Result;
4use crate::mmap::MemoryMappedFile;
5use std::thread;
6use std::time::Duration;
7
8/// Type of change detected in a watched file.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ChangeKind {
11    /// File content was modified.
12    Modified,
13    /// File metadata changed (permissions, timestamps, etc.).
14    Metadata,
15    /// File was removed.
16    Removed,
17}
18
19/// Event describing a change to a watched memory-mapped file.
20#[derive(Debug, Clone)]
21pub struct ChangeEvent {
22    /// Offset where the change occurred (if known).
23    pub offset: Option<u64>,
24    /// Length of the changed region (if known).
25    pub len: Option<u64>,
26    /// Type of change.
27    pub kind: ChangeKind,
28}
29
30/// Handle for controlling a file watch operation.
31pub struct WatchHandle {
32    #[allow(dead_code)]
33    thread: thread::JoinHandle<()>,
34}
35
36impl MemoryMappedFile {
37    /// Watch for changes to the mapped file.
38    ///
39    /// The callback will be invoked whenever changes are detected.
40    /// Returns a handle that stops watching when dropped.
41    ///
42    /// # Platform-specific behavior
43    ///
44    /// - **Linux**: Uses inotify for efficient monitoring
45    /// - **macOS**: Uses FSEvents or kqueue
46    /// - **Windows**: Uses ReadDirectoryChangesW
47    /// - **Fallback**: Polling-based implementation
48    ///
49    /// # Examples
50    ///
51    /// ```no_run
52    /// use mmap_io::{MemoryMappedFile, watch::ChangeEvent};
53    /// use std::sync::Arc;
54    /// use std::sync::atomic::{AtomicBool, Ordering};
55    ///
56    /// let mmap = MemoryMappedFile::open_ro("data.bin")?;
57    /// let changed = Arc::new(AtomicBool::new(false));
58    /// let changed_clone = Arc::clone(&changed);
59    ///
60    /// let handle = mmap.watch(move |event: ChangeEvent| {
61    ///     println!("File changed: {:?}", event);
62    ///     changed_clone.store(true, Ordering::SeqCst);
63    /// })?;
64    ///
65    /// // File is being watched...
66    /// // Handle is dropped when out of scope, stopping the watch
67    /// # Ok::<(), mmap_io::MmapIoError>(())
68    /// ```
69    #[cfg(feature = "watch")]
70    pub fn watch<F>(&self, callback: F) -> Result<WatchHandle>
71    where
72        F: Fn(ChangeEvent) + Send + 'static,
73    {
74        let path = self.path().to_path_buf();
75        
76        // For this implementation, we'll use a simple polling approach
77        // In a production implementation, you'd use platform-specific APIs
78        let thread = thread::spawn(move || {
79            let mut last_modified = std::fs::metadata(&path)
80                .ok()
81                .and_then(|m| m.modified().ok());
82            
83            loop {
84                thread::sleep(Duration::from_millis(100));
85                
86                // Check if file still exists
87                let metadata = match std::fs::metadata(&path) {
88                    Ok(m) => m,
89                    Err(_) => {
90                        callback(ChangeEvent {
91                            offset: None,
92                            len: None,
93                            kind: ChangeKind::Removed,
94                        });
95                        break;
96                    }
97                };
98                
99                // Check modification time
100                if let Ok(modified) = metadata.modified() {
101                    if Some(modified) != last_modified {
102                        callback(ChangeEvent {
103                            offset: None,
104                            len: None,
105                            kind: ChangeKind::Modified,
106                        });
107                        last_modified = Some(modified);
108                    }
109                }
110            }
111        });
112        
113        Ok(WatchHandle { thread })
114    }
115}
116
117// Platform-specific implementations would go here
118// For now, we use polling for all platforms
119
120// Fallback polling implementation
121#[cfg(feature = "watch")]
122#[allow(dead_code)]
123fn polling_watch<F>(path: &std::path::Path, callback: F) -> Result<WatchHandle>
124where
125    F: Fn(ChangeEvent) + Send + 'static,
126{
127    let path = path.to_path_buf();
128    
129    let thread = thread::spawn(move || {
130        let mut last_modified = std::fs::metadata(&path)
131            .ok()
132            .and_then(|m| m.modified().ok());
133        let mut last_len = std::fs::metadata(&path)
134            .ok()
135            .map(|m| m.len());
136        
137        loop {
138            thread::sleep(Duration::from_millis(100));
139            
140            // Check if file still exists
141            let metadata = match std::fs::metadata(&path) {
142                Ok(m) => m,
143                Err(_) => {
144                    callback(ChangeEvent {
145                        offset: None,
146                        len: None,
147                        kind: ChangeKind::Removed,
148                    });
149                    break;
150                }
151            };
152            
153            let current_len = metadata.len();
154            let current_modified = metadata.modified().ok();
155            
156            // Check for changes
157            if current_modified != last_modified || Some(current_len) != last_len {
158                let kind = if Some(current_len) != last_len {
159                    ChangeKind::Modified
160                } else {
161                    ChangeKind::Metadata
162                };
163                
164                callback(ChangeEvent {
165                    offset: None,
166                    len: None,
167                    kind,
168                });
169                
170                last_modified = current_modified;
171                last_len = Some(current_len);
172            }
173        }
174    });
175    
176    Ok(WatchHandle { thread })
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::create_mmap;
183    use std::fs;
184    use std::path::PathBuf;
185    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
186    use std::sync::Arc;
187
188    fn tmp_path(name: &str) -> PathBuf {
189        let mut p = std::env::temp_dir();
190        p.push(format!("mmap_io_watch_test_{}_{}", name, std::process::id()));
191        p
192    }
193
194    #[test]
195    #[cfg(feature = "watch")]
196    fn test_watch_file_changes() {
197        let path = tmp_path("watch_changes");
198        let _ = fs::remove_file(&path);
199
200        // Create initial file
201        let mmap = create_mmap(&path, 1024).expect("create");
202        mmap.update_region(0, b"initial").expect("write");
203        mmap.flush().expect("flush");
204
205        // Set up watch
206        let changed = Arc::new(AtomicBool::new(false));
207        let changed_clone = Arc::clone(&changed);
208        let event_count = Arc::new(AtomicUsize::new(0));
209        let event_count_clone = Arc::clone(&event_count);
210
211        let _handle = mmap.watch(move |event| {
212            println!("Detected change: {event:?}");
213            changed_clone.store(true, Ordering::SeqCst);
214            event_count_clone.fetch_add(1, Ordering::SeqCst);
215        }).expect("watch");
216
217        // Give watcher time to start
218        thread::sleep(Duration::from_millis(500));
219
220        // Modify file and ensure mtime changes:
221        mmap.update_region(0, b"modified").expect("write");
222        mmap.flush().expect("flush after write for watch visibility");
223
224        // Force timestamp change using utime/utimes fallback to increase detection reliability
225        #[allow(unused_variables)]
226        {
227            #[cfg(unix)]
228            {
229                use std::ffi::CString;
230                use std::os::unix::ffi::OsStrExt;
231                let cpath = CString::new(path.as_os_str().as_bytes()).unwrap();
232                // SAFETY: utime with null sets times to current time
233                unsafe {
234                    libc::utime(cpath.as_ptr(), std::ptr::null());
235                }
236            }
237            #[cfg(windows)]
238            {
239                // Toggle readonly twice as a portable metadata change
240                if let Ok(meta) = std::fs::metadata(&path) {
241                    let mut perms = meta.permissions();
242                    perms.set_readonly(true);
243                    let _ = std::fs::set_permissions(&path, perms);
244                    let mut perms2 = std::fs::metadata(&path).unwrap().permissions();
245                    perms2.set_readonly(false);
246                    let _ = std::fs::set_permissions(&path, perms2);
247                }
248            }
249        }
250
251        // Wait for change detection (polling cadence is 100ms; allow multiple cycles)
252        thread::sleep(Duration::from_millis(1500));
253
254        assert!(changed.load(Ordering::SeqCst), "Change should be detected");
255        assert!(event_count.load(Ordering::SeqCst) > 0, "Should have events");
256
257        fs::remove_file(&path).expect("cleanup");
258    }
259
260    #[test]
261    #[cfg(feature = "watch")]
262    fn test_watch_file_removal() {
263        let path = tmp_path("watch_removal");
264        let _ = fs::remove_file(&path);
265
266        // Create file
267        let mmap = create_mmap(&path, 1024).expect("create");
268
269        // Set up watch
270        let removed = Arc::new(AtomicBool::new(false));
271        let removed_clone = Arc::clone(&removed);
272
273        let _handle = mmap.watch(move |event| {
274            if event.kind == ChangeKind::Removed {
275                removed_clone.store(true, Ordering::SeqCst);
276            }
277        }).expect("watch");
278
279        // Give watcher time to start
280        thread::sleep(Duration::from_millis(200));
281
282        // Remove file
283        fs::remove_file(&path).expect("remove");
284
285        // Wait for removal detection
286        thread::sleep(Duration::from_millis(300));
287
288        assert!(removed.load(Ordering::SeqCst), "Removal should be detected");
289    }
290
291    #[test]
292    #[cfg(feature = "watch")]
293    fn test_watch_with_different_modes() {
294        let path = tmp_path("watch_modes");
295        let _ = fs::remove_file(&path);
296
297        // Create file
298        create_mmap(&path, 1024).expect("create");
299
300        // Test watching with RO mode
301        let ro_mmap = MemoryMappedFile::open_ro(&path).expect("open ro");
302        let _handle = ro_mmap.watch(|_event| {
303            // Just test that we can set up a watch
304        }).expect("watch ro");
305
306        #[cfg(feature = "cow")]
307        {
308            // Test watching with COW mode
309            let cow_mmap = MemoryMappedFile::open_cow(&path).expect("open cow");
310            let _handle = cow_mmap.watch(|_event| {
311                // Just test that we can set up a watch
312            }).expect("watch cow");
313        }
314
315        fs::remove_file(&path).expect("cleanup");
316    }
317
318    #[test]
319    #[cfg(feature = "watch")]
320    fn test_multiple_watchers() {
321        let path = tmp_path("multi_watch");
322        let _ = fs::remove_file(&path);
323
324        let mmap = create_mmap(&path, 1024).expect("create");
325
326        // Set up multiple watchers
327        let count1 = Arc::new(AtomicUsize::new(0));
328        let count1_clone = Arc::clone(&count1);
329        let _handle1 = mmap.watch(move |_event| {
330            count1_clone.fetch_add(1, Ordering::SeqCst);
331        }).expect("watch 1");
332
333        let count2 = Arc::new(AtomicUsize::new(0));
334        let count2_clone = Arc::clone(&count2);
335        let _handle2 = mmap.watch(move |_event| {
336            count2_clone.fetch_add(1, Ordering::SeqCst);
337        }).expect("watch 2");
338
339        // Give watchers time to start
340        thread::sleep(Duration::from_millis(600));
341
342        // Modify file and ensure mtime changes
343        mmap.update_region(0, b"change").expect("write");
344        mmap.flush().expect("flush after write for watch visibility");
345
346        #[allow(unused_variables)]
347        {
348            #[cfg(unix)]
349            {
350                use std::ffi::CString;
351                use std::os::unix::ffi::OsStrExt;
352                let cpath = CString::new(path.as_os_str().as_bytes()).unwrap();
353                unsafe { libc::utime(cpath.as_ptr(), std::ptr::null()) };
354            }
355            #[cfg(windows)]
356            {
357                if let Ok(meta) = std::fs::metadata(&path) {
358                    let mut perms = meta.permissions();
359                    perms.set_readonly(true);
360                    let _ = std::fs::set_permissions(&path, perms);
361                    let mut perms2 = std::fs::metadata(&path).unwrap().permissions();
362                    perms2.set_readonly(false);
363                    let _ = std::fs::set_permissions(&path, perms2);
364                }
365            }
366        }
367
368        // Wait for change detection (polling cadence is 100ms; allow multiple cycles)
369        thread::sleep(Duration::from_millis(1500));
370
371        // Both watchers should detect the change
372        assert!(count1.load(Ordering::SeqCst) > 0, "Watcher 1 should detect change");
373        assert!(count2.load(Ordering::SeqCst) > 0, "Watcher 2 should detect change");
374
375        fs::remove_file(&path).expect("cleanup");
376    }
377}