1use crate::errors::Result;
4use crate::mmap::MemoryMappedFile;
5use std::thread;
6use std::time::Duration;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ChangeKind {
11 Modified,
13 Metadata,
15 Removed,
17}
18
19#[derive(Debug, Clone)]
21pub struct ChangeEvent {
22 pub offset: Option<u64>,
24 pub len: Option<u64>,
26 pub kind: ChangeKind,
28}
29
30pub struct WatchHandle {
32 #[allow(dead_code)]
33 thread: thread::JoinHandle<()>,
34}
35
36impl MemoryMappedFile {
37 #[cfg(feature = "watch")]
70 pub fn watch<F>(&self, callback: F) -> Result<WatchHandle>
71 where
72 F: Fn(ChangeEvent) + Send + 'static,
73 {
74 let path = self.path().to_path_buf();
75
76 let thread = thread::spawn(move || {
79 let mut last_modified = std::fs::metadata(&path)
80 .ok()
81 .and_then(|m| m.modified().ok());
82
83 loop {
84 thread::sleep(Duration::from_millis(100));
85
86 let metadata = match std::fs::metadata(&path) {
88 Ok(m) => m,
89 Err(_) => {
90 callback(ChangeEvent {
91 offset: None,
92 len: None,
93 kind: ChangeKind::Removed,
94 });
95 break;
96 }
97 };
98
99 if let Ok(modified) = metadata.modified() {
101 if Some(modified) != last_modified {
102 callback(ChangeEvent {
103 offset: None,
104 len: None,
105 kind: ChangeKind::Modified,
106 });
107 last_modified = Some(modified);
108 }
109 }
110 }
111 });
112
113 Ok(WatchHandle { thread })
114 }
115}
116
117#[cfg(feature = "watch")]
122#[allow(dead_code)]
123fn polling_watch<F>(path: &std::path::Path, callback: F) -> Result<WatchHandle>
124where
125 F: Fn(ChangeEvent) + Send + 'static,
126{
127 let path = path.to_path_buf();
128
129 let thread = thread::spawn(move || {
130 let mut last_modified = std::fs::metadata(&path)
131 .ok()
132 .and_then(|m| m.modified().ok());
133 let mut last_len = std::fs::metadata(&path)
134 .ok()
135 .map(|m| m.len());
136
137 loop {
138 thread::sleep(Duration::from_millis(100));
139
140 let metadata = match std::fs::metadata(&path) {
142 Ok(m) => m,
143 Err(_) => {
144 callback(ChangeEvent {
145 offset: None,
146 len: None,
147 kind: ChangeKind::Removed,
148 });
149 break;
150 }
151 };
152
153 let current_len = metadata.len();
154 let current_modified = metadata.modified().ok();
155
156 if current_modified != last_modified || Some(current_len) != last_len {
158 let kind = if Some(current_len) != last_len {
159 ChangeKind::Modified
160 } else {
161 ChangeKind::Metadata
162 };
163
164 callback(ChangeEvent {
165 offset: None,
166 len: None,
167 kind,
168 });
169
170 last_modified = current_modified;
171 last_len = Some(current_len);
172 }
173 }
174 });
175
176 Ok(WatchHandle { thread })
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::create_mmap;
183 use std::fs;
184 use std::path::PathBuf;
185 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
186 use std::sync::Arc;
187
188 fn tmp_path(name: &str) -> PathBuf {
189 let mut p = std::env::temp_dir();
190 p.push(format!("mmap_io_watch_test_{}_{}", name, std::process::id()));
191 p
192 }
193
194 #[test]
195 #[cfg(feature = "watch")]
196 fn test_watch_file_changes() {
197 let path = tmp_path("watch_changes");
198 let _ = fs::remove_file(&path);
199
200 let mmap = create_mmap(&path, 1024).expect("create");
202 mmap.update_region(0, b"initial").expect("write");
203 mmap.flush().expect("flush");
204
205 let changed = Arc::new(AtomicBool::new(false));
207 let changed_clone = Arc::clone(&changed);
208 let event_count = Arc::new(AtomicUsize::new(0));
209 let event_count_clone = Arc::clone(&event_count);
210
211 let _handle = mmap.watch(move |event| {
212 println!("Detected change: {event:?}");
213 changed_clone.store(true, Ordering::SeqCst);
214 event_count_clone.fetch_add(1, Ordering::SeqCst);
215 }).expect("watch");
216
217 thread::sleep(Duration::from_millis(500));
219
220 mmap.update_region(0, b"modified").expect("write");
222 mmap.flush().expect("flush after write for watch visibility");
223
224 #[allow(unused_variables)]
226 {
227 #[cfg(unix)]
228 {
229 use std::ffi::CString;
230 use std::os::unix::ffi::OsStrExt;
231 let cpath = CString::new(path.as_os_str().as_bytes()).unwrap();
232 unsafe {
234 libc::utime(cpath.as_ptr(), std::ptr::null());
235 }
236 }
237 #[cfg(windows)]
238 {
239 if let Ok(meta) = std::fs::metadata(&path) {
241 let mut perms = meta.permissions();
242 perms.set_readonly(true);
243 let _ = std::fs::set_permissions(&path, perms);
244 let mut perms2 = std::fs::metadata(&path).unwrap().permissions();
245 perms2.set_readonly(false);
246 let _ = std::fs::set_permissions(&path, perms2);
247 }
248 }
249 }
250
251 thread::sleep(Duration::from_millis(1500));
253
254 assert!(changed.load(Ordering::SeqCst), "Change should be detected");
255 assert!(event_count.load(Ordering::SeqCst) > 0, "Should have events");
256
257 fs::remove_file(&path).expect("cleanup");
258 }
259
260 #[test]
261 #[cfg(feature = "watch")]
262 fn test_watch_file_removal() {
263 let path = tmp_path("watch_removal");
264 let _ = fs::remove_file(&path);
265
266 let mmap = create_mmap(&path, 1024).expect("create");
268
269 let removed = Arc::new(AtomicBool::new(false));
271 let removed_clone = Arc::clone(&removed);
272
273 let _handle = mmap.watch(move |event| {
274 if event.kind == ChangeKind::Removed {
275 removed_clone.store(true, Ordering::SeqCst);
276 }
277 }).expect("watch");
278
279 thread::sleep(Duration::from_millis(200));
281
282 fs::remove_file(&path).expect("remove");
284
285 thread::sleep(Duration::from_millis(300));
287
288 assert!(removed.load(Ordering::SeqCst), "Removal should be detected");
289 }
290
291 #[test]
292 #[cfg(feature = "watch")]
293 fn test_watch_with_different_modes() {
294 let path = tmp_path("watch_modes");
295 let _ = fs::remove_file(&path);
296
297 create_mmap(&path, 1024).expect("create");
299
300 let ro_mmap = MemoryMappedFile::open_ro(&path).expect("open ro");
302 let _handle = ro_mmap.watch(|_event| {
303 }).expect("watch ro");
305
306 #[cfg(feature = "cow")]
307 {
308 let cow_mmap = MemoryMappedFile::open_cow(&path).expect("open cow");
310 let _handle = cow_mmap.watch(|_event| {
311 }).expect("watch cow");
313 }
314
315 fs::remove_file(&path).expect("cleanup");
316 }
317
318 #[test]
319 #[cfg(feature = "watch")]
320 fn test_multiple_watchers() {
321 let path = tmp_path("multi_watch");
322 let _ = fs::remove_file(&path);
323
324 let mmap = create_mmap(&path, 1024).expect("create");
325
326 let count1 = Arc::new(AtomicUsize::new(0));
328 let count1_clone = Arc::clone(&count1);
329 let _handle1 = mmap.watch(move |_event| {
330 count1_clone.fetch_add(1, Ordering::SeqCst);
331 }).expect("watch 1");
332
333 let count2 = Arc::new(AtomicUsize::new(0));
334 let count2_clone = Arc::clone(&count2);
335 let _handle2 = mmap.watch(move |_event| {
336 count2_clone.fetch_add(1, Ordering::SeqCst);
337 }).expect("watch 2");
338
339 thread::sleep(Duration::from_millis(600));
341
342 mmap.update_region(0, b"change").expect("write");
344 mmap.flush().expect("flush after write for watch visibility");
345
346 #[allow(unused_variables)]
347 {
348 #[cfg(unix)]
349 {
350 use std::ffi::CString;
351 use std::os::unix::ffi::OsStrExt;
352 let cpath = CString::new(path.as_os_str().as_bytes()).unwrap();
353 unsafe { libc::utime(cpath.as_ptr(), std::ptr::null()) };
354 }
355 #[cfg(windows)]
356 {
357 if let Ok(meta) = std::fs::metadata(&path) {
358 let mut perms = meta.permissions();
359 perms.set_readonly(true);
360 let _ = std::fs::set_permissions(&path, perms);
361 let mut perms2 = std::fs::metadata(&path).unwrap().permissions();
362 perms2.set_readonly(false);
363 let _ = std::fs::set_permissions(&path, perms2);
364 }
365 }
366 }
367
368 thread::sleep(Duration::from_millis(1500));
370
371 assert!(count1.load(Ordering::SeqCst) > 0, "Watcher 1 should detect change");
373 assert!(count2.load(Ordering::SeqCst) > 0, "Watcher 2 should detect change");
374
375 fs::remove_file(&path).expect("cleanup");
376 }
377}