mmap_io/
watch.rs

1//! File change watching and notification support.
2
3use crate::errors::Result;
4use crate::mmap::MemoryMappedFile;
5use std::thread;
6use std::time::Duration;
7
8// Watch polling interval in milliseconds
9const WATCH_POLL_INTERVAL_MS: u64 = 100;
10
11/// Type of change detected in a watched file.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum ChangeKind {
14    /// File content was modified.
15    Modified,
16    /// File metadata changed (permissions, timestamps, etc.).
17    Metadata,
18    /// File was removed.
19    Removed,
20}
21
22/// Event describing a change to a watched memory-mapped file.
23#[derive(Debug, Clone)]
24pub struct ChangeEvent {
25    /// Offset where the change occurred (if known).
26    pub offset: Option<u64>,
27    /// Length of the changed region (if known).
28    pub len: Option<u64>,
29    /// Type of change.
30    pub kind: ChangeKind,
31}
32
33/// Handle for controlling a file watch operation.
34pub struct WatchHandle {
35    // Thread handle is kept to ensure the watch thread is properly joined on drop
36    thread: thread::JoinHandle<()>,
37}
38
39impl Drop for WatchHandle {
40    fn drop(&mut self) {
41        // The thread will naturally exit when it detects the file is removed
42        // or when the handle is dropped. We don't join here to avoid blocking.
43        // The thread will clean up on its own.
44    }
45}
46
47impl WatchHandle {
48    /// Check if the watch thread is still running.
49    #[allow(dead_code)]
50    pub fn is_active(&self) -> bool {
51        !self.thread.is_finished()
52    }
53}
54
55impl MemoryMappedFile {
56    /// Watch for changes to the mapped file.
57    ///
58    /// The callback will be invoked whenever changes are detected.
59    /// Returns a handle that stops watching when dropped.
60    ///
61    /// # Platform-specific behavior
62    ///
63    /// - **Linux**: Uses inotify for efficient monitoring
64    /// - **macOS**: Uses FSEvents or kqueue
65    /// - **Windows**: Uses ReadDirectoryChangesW
66    /// - **Fallback**: Polling-based implementation
67    ///
68    /// # Examples
69    ///
70    /// ```no_run
71    /// use mmap_io::{MemoryMappedFile, watch::ChangeEvent};
72    /// use std::sync::Arc;
73    /// use std::sync::atomic::{AtomicBool, Ordering};
74    ///
75    /// let mmap = MemoryMappedFile::open_ro("data.bin")?;
76    /// let changed = Arc::new(AtomicBool::new(false));
77    /// let changed_clone = Arc::clone(&changed);
78    ///
79    /// let handle = mmap.watch(move |event: ChangeEvent| {
80    ///     println!("File changed: {:?}", event);
81    ///     changed_clone.store(true, Ordering::SeqCst);
82    /// })?;
83    ///
84    /// // File is being watched...
85    /// // Handle is dropped when out of scope, stopping the watch
86    /// # Ok::<(), mmap_io::MmapIoError>(())
87    /// ```
88    #[cfg(feature = "watch")]
89    pub fn watch<F>(&self, callback: F) -> Result<WatchHandle>
90    where
91        F: Fn(ChangeEvent) + Send + 'static,
92    {
93        let path = self.path().to_path_buf();
94
95        // For this implementation, we'll use a simple polling approach
96        // In a production implementation, you'd use platform-specific APIs
97        let thread = thread::spawn(move || {
98            let mut last_modified = std::fs::metadata(&path)
99                .ok()
100                .and_then(|m| m.modified().ok());
101
102            loop {
103                thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS));
104
105                // Check if file still exists
106                let metadata = match std::fs::metadata(&path) {
107                    Ok(m) => m,
108                    Err(_) => {
109                        callback(ChangeEvent {
110                            offset: None,
111                            len: None,
112                            kind: ChangeKind::Removed,
113                        });
114                        break;
115                    }
116                };
117
118                // Check modification time
119                if let Ok(modified) = metadata.modified() {
120                    if Some(modified) != last_modified {
121                        callback(ChangeEvent {
122                            offset: None,
123                            len: None,
124                            kind: ChangeKind::Modified,
125                        });
126                        last_modified = Some(modified);
127                    }
128                }
129            }
130        });
131
132        Ok(WatchHandle { thread })
133    }
134}
135
136// Platform-specific implementations would go here
137// For now, we use polling for all platforms
138
139// Fallback polling implementation
140// This function is kept for potential future use when implementing platform-specific watchers
141#[cfg(feature = "watch")]
142fn _polling_watch<F>(path: &std::path::Path, callback: F) -> Result<WatchHandle>
143where
144    F: Fn(ChangeEvent) + Send + 'static,
145{
146    let path = path.to_path_buf();
147
148    let thread = thread::spawn(move || {
149        let mut last_modified = std::fs::metadata(&path)
150            .ok()
151            .and_then(|m| m.modified().ok());
152        let mut last_len = std::fs::metadata(&path).ok().map(|m| m.len());
153
154        loop {
155            thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS));
156
157            // Check if file still exists
158            let metadata = match std::fs::metadata(&path) {
159                Ok(m) => m,
160                Err(_) => {
161                    callback(ChangeEvent {
162                        offset: None,
163                        len: None,
164                        kind: ChangeKind::Removed,
165                    });
166                    break;
167                }
168            };
169
170            let current_len = metadata.len();
171            let current_modified = metadata.modified().ok();
172
173            // Check for changes
174            if current_modified != last_modified || Some(current_len) != last_len {
175                let kind = if Some(current_len) != last_len {
176                    ChangeKind::Modified
177                } else {
178                    ChangeKind::Metadata
179                };
180
181                callback(ChangeEvent {
182                    offset: None,
183                    len: None,
184                    kind,
185                });
186
187                last_modified = current_modified;
188                last_len = Some(current_len);
189            }
190        }
191    });
192
193    Ok(WatchHandle { thread })
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use crate::create_mmap;
200    use std::fs;
201    use std::path::PathBuf;
202    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
203    use std::sync::Arc;
204
205    fn tmp_path(name: &str) -> PathBuf {
206        let mut p = std::env::temp_dir();
207        p.push(format!(
208            "mmap_io_watch_test_{}_{}",
209            name,
210            std::process::id()
211        ));
212        p
213    }
214
215    #[test]
216    #[cfg(feature = "watch")]
217    fn test_watch_file_changes() {
218        let path = tmp_path("watch_changes");
219        let _ = fs::remove_file(&path);
220
221        // Create initial file
222        let mmap = create_mmap(&path, 1024).expect("create");
223        mmap.update_region(0, b"initial").expect("write");
224        mmap.flush().expect("flush");
225
226        // Set up watch
227        let changed = Arc::new(AtomicBool::new(false));
228        let changed_clone = Arc::clone(&changed);
229        let event_count = Arc::new(AtomicUsize::new(0));
230        let event_count_clone = Arc::clone(&event_count);
231
232        let _handle = mmap
233            .watch(move |event| {
234                println!("Detected change: {event:?}");
235                changed_clone.store(true, Ordering::SeqCst);
236                event_count_clone.fetch_add(1, Ordering::SeqCst);
237            })
238            .expect("watch");
239
240        // Give watcher time to start (5 polling intervals)
241        thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS * 5));
242
243        // Modify file and ensure mtime changes:
244        mmap.update_region(0, b"modified").expect("write");
245        mmap.flush()
246            .expect("flush after write for watch visibility");
247
248        // Force timestamp change using utime/utimes fallback to increase detection reliability
249        #[allow(unused_variables)]
250        {
251            #[cfg(unix)]
252            {
253                use std::ffi::CString;
254                use std::os::unix::ffi::OsStrExt;
255                let cpath = CString::new(path.as_os_str().as_bytes()).unwrap();
256                // SAFETY: utime with null sets times to current time
257                unsafe {
258                    libc::utime(cpath.as_ptr(), std::ptr::null());
259                }
260            }
261            #[cfg(windows)]
262            {
263                // Toggle readonly twice as a portable metadata change
264                if let Ok(meta) = std::fs::metadata(&path) {
265                    let mut perms = meta.permissions();
266                    perms.set_readonly(true);
267                    let _ = std::fs::set_permissions(&path, perms);
268                    let mut perms2 = std::fs::metadata(&path).unwrap().permissions();
269                    perms2.set_readonly(false);
270                    let _ = std::fs::set_permissions(&path, perms2);
271                }
272            }
273        }
274
275        // Wait for change detection (allow 15 polling cycles)
276        thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS * 15));
277
278        assert!(changed.load(Ordering::SeqCst), "Change should be detected");
279        assert!(event_count.load(Ordering::SeqCst) > 0, "Should have events");
280
281        fs::remove_file(&path).expect("cleanup");
282    }
283
284    #[test]
285    #[cfg(feature = "watch")]
286    fn test_watch_file_removal() {
287        let path = tmp_path("watch_removal");
288        let _ = fs::remove_file(&path);
289
290        // Create file
291        let mmap = create_mmap(&path, 1024).expect("create");
292
293        // Set up watch
294        let removed = Arc::new(AtomicBool::new(false));
295        let removed_clone = Arc::clone(&removed);
296
297        let _handle = mmap
298            .watch(move |event| {
299                if event.kind == ChangeKind::Removed {
300                    removed_clone.store(true, Ordering::SeqCst);
301                }
302            })
303            .expect("watch");
304
305        // Give watcher time to start (2 polling intervals)
306        thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS * 2));
307
308        // Remove file
309        fs::remove_file(&path).expect("remove");
310
311        // Wait for removal detection (3 polling intervals)
312        thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS * 3));
313
314        assert!(removed.load(Ordering::SeqCst), "Removal should be detected");
315    }
316
317    #[test]
318    #[cfg(feature = "watch")]
319    fn test_watch_with_different_modes() {
320        let path = tmp_path("watch_modes");
321        let _ = fs::remove_file(&path);
322
323        // Create file
324        create_mmap(&path, 1024).expect("create");
325
326        // Test watching with RO mode
327        let ro_mmap = MemoryMappedFile::open_ro(&path).expect("open ro");
328        let _handle = ro_mmap
329            .watch(|_event| {
330                // Just test that we can set up a watch
331            })
332            .expect("watch ro");
333
334        #[cfg(feature = "cow")]
335        {
336            // Test watching with COW mode
337            let cow_mmap = MemoryMappedFile::open_cow(&path).expect("open cow");
338            let _handle = cow_mmap
339                .watch(|_event| {
340                    // Just test that we can set up a watch
341                })
342                .expect("watch cow");
343        }
344
345        fs::remove_file(&path).expect("cleanup");
346    }
347
348    #[test]
349    #[cfg(feature = "watch")]
350    fn test_multiple_watchers() {
351        let path = tmp_path("multi_watch");
352        let _ = fs::remove_file(&path);
353
354        let mmap = create_mmap(&path, 1024).expect("create");
355
356        // Set up multiple watchers
357        let count1 = Arc::new(AtomicUsize::new(0));
358        let count1_clone = Arc::clone(&count1);
359        let _handle1 = mmap
360            .watch(move |_event| {
361                count1_clone.fetch_add(1, Ordering::SeqCst);
362            })
363            .expect("watch 1");
364
365        let count2 = Arc::new(AtomicUsize::new(0));
366        let count2_clone = Arc::clone(&count2);
367        let _handle2 = mmap
368            .watch(move |_event| {
369                count2_clone.fetch_add(1, Ordering::SeqCst);
370            })
371            .expect("watch 2");
372
373        // Give watchers time to start (6 polling intervals)
374        thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS * 6));
375
376        // Modify file and ensure mtime changes
377        mmap.update_region(0, b"change").expect("write");
378        mmap.flush()
379            .expect("flush after write for watch visibility");
380
381        #[allow(unused_variables)]
382        {
383            #[cfg(unix)]
384            {
385                use std::ffi::CString;
386                use std::os::unix::ffi::OsStrExt;
387                let cpath = CString::new(path.as_os_str().as_bytes()).unwrap();
388                unsafe { libc::utime(cpath.as_ptr(), std::ptr::null()) };
389            }
390            #[cfg(windows)]
391            {
392                if let Ok(meta) = std::fs::metadata(&path) {
393                    let mut perms = meta.permissions();
394                    perms.set_readonly(true);
395                    let _ = std::fs::set_permissions(&path, perms);
396                    let mut perms2 = std::fs::metadata(&path).unwrap().permissions();
397                    perms2.set_readonly(false);
398                    let _ = std::fs::set_permissions(&path, perms2);
399                }
400            }
401        }
402
403        // Wait for change detection (allow 15 polling cycles)
404        thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS * 15));
405
406        // Both watchers should detect the change
407        assert!(
408            count1.load(Ordering::SeqCst) > 0,
409            "Watcher 1 should detect change"
410        );
411        assert!(
412            count2.load(Ordering::SeqCst) > 0,
413            "Watcher 2 should detect change"
414        );
415
416        fs::remove_file(&path).expect("cleanup");
417    }
418}