1use crate::errors::Result;
4use crate::mmap::MemoryMappedFile;
5use std::thread;
6use std::time::Duration;
7
8const WATCH_POLL_INTERVAL_MS: u64 = 100;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum ChangeKind {
14 Modified,
16 Metadata,
18 Removed,
20}
21
22#[derive(Debug, Clone)]
24pub struct ChangeEvent {
25 pub offset: Option<u64>,
27 pub len: Option<u64>,
29 pub kind: ChangeKind,
31}
32
33pub struct WatchHandle {
35 thread: thread::JoinHandle<()>,
37}
38
39impl Drop for WatchHandle {
40 fn drop(&mut self) {
41 }
45}
46
47impl WatchHandle {
48 #[allow(dead_code)]
50 pub fn is_active(&self) -> bool {
51 !self.thread.is_finished()
52 }
53}
54
55impl MemoryMappedFile {
56 #[cfg(feature = "watch")]
89 pub fn watch<F>(&self, callback: F) -> Result<WatchHandle>
90 where
91 F: Fn(ChangeEvent) + Send + 'static,
92 {
93 let path = self.path().to_path_buf();
94
95 let thread = thread::spawn(move || {
98 let mut last_modified = std::fs::metadata(&path)
99 .ok()
100 .and_then(|m| m.modified().ok());
101
102 loop {
103 thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS));
104
105 let metadata = match std::fs::metadata(&path) {
107 Ok(m) => m,
108 Err(_) => {
109 callback(ChangeEvent {
110 offset: None,
111 len: None,
112 kind: ChangeKind::Removed,
113 });
114 break;
115 }
116 };
117
118 if let Ok(modified) = metadata.modified() {
120 if Some(modified) != last_modified {
121 callback(ChangeEvent {
122 offset: None,
123 len: None,
124 kind: ChangeKind::Modified,
125 });
126 last_modified = Some(modified);
127 }
128 }
129 }
130 });
131
132 Ok(WatchHandle { thread })
133 }
134}
135
136#[cfg(feature = "watch")]
142fn _polling_watch<F>(path: &std::path::Path, callback: F) -> Result<WatchHandle>
143where
144 F: Fn(ChangeEvent) + Send + 'static,
145{
146 let path = path.to_path_buf();
147
148 let thread = thread::spawn(move || {
149 let mut last_modified = std::fs::metadata(&path)
150 .ok()
151 .and_then(|m| m.modified().ok());
152 let mut last_len = std::fs::metadata(&path).ok().map(|m| m.len());
153
154 loop {
155 thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS));
156
157 let metadata = match std::fs::metadata(&path) {
159 Ok(m) => m,
160 Err(_) => {
161 callback(ChangeEvent {
162 offset: None,
163 len: None,
164 kind: ChangeKind::Removed,
165 });
166 break;
167 }
168 };
169
170 let current_len = metadata.len();
171 let current_modified = metadata.modified().ok();
172
173 if current_modified != last_modified || Some(current_len) != last_len {
175 let kind = if Some(current_len) != last_len {
176 ChangeKind::Modified
177 } else {
178 ChangeKind::Metadata
179 };
180
181 callback(ChangeEvent {
182 offset: None,
183 len: None,
184 kind,
185 });
186
187 last_modified = current_modified;
188 last_len = Some(current_len);
189 }
190 }
191 });
192
193 Ok(WatchHandle { thread })
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use crate::create_mmap;
200 use std::fs;
201 use std::path::PathBuf;
202 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
203 use std::sync::Arc;
204
205 fn tmp_path(name: &str) -> PathBuf {
206 let mut p = std::env::temp_dir();
207 p.push(format!(
208 "mmap_io_watch_test_{}_{}",
209 name,
210 std::process::id()
211 ));
212 p
213 }
214
215 #[test]
216 #[cfg(feature = "watch")]
217 fn test_watch_file_changes() {
218 let path = tmp_path("watch_changes");
219 let _ = fs::remove_file(&path);
220
221 let mmap = create_mmap(&path, 1024).expect("create");
223 mmap.update_region(0, b"initial").expect("write");
224 mmap.flush().expect("flush");
225
226 let changed = Arc::new(AtomicBool::new(false));
228 let changed_clone = Arc::clone(&changed);
229 let event_count = Arc::new(AtomicUsize::new(0));
230 let event_count_clone = Arc::clone(&event_count);
231
232 let _handle = mmap
233 .watch(move |event| {
234 println!("Detected change: {event:?}");
235 changed_clone.store(true, Ordering::SeqCst);
236 event_count_clone.fetch_add(1, Ordering::SeqCst);
237 })
238 .expect("watch");
239
240 thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS * 5));
242
243 mmap.update_region(0, b"modified").expect("write");
245 mmap.flush()
246 .expect("flush after write for watch visibility");
247
248 #[allow(unused_variables)]
250 {
251 #[cfg(unix)]
252 {
253 use std::ffi::CString;
254 use std::os::unix::ffi::OsStrExt;
255 let cpath = CString::new(path.as_os_str().as_bytes()).unwrap();
256 unsafe {
258 libc::utime(cpath.as_ptr(), std::ptr::null());
259 }
260 }
261 #[cfg(windows)]
262 {
263 if let Ok(meta) = std::fs::metadata(&path) {
265 let mut perms = meta.permissions();
266 perms.set_readonly(true);
267 let _ = std::fs::set_permissions(&path, perms);
268 let mut perms2 = std::fs::metadata(&path).unwrap().permissions();
269 perms2.set_readonly(false);
270 let _ = std::fs::set_permissions(&path, perms2);
271 }
272 }
273 }
274
275 thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS * 15));
277
278 assert!(changed.load(Ordering::SeqCst), "Change should be detected");
279 assert!(event_count.load(Ordering::SeqCst) > 0, "Should have events");
280
281 fs::remove_file(&path).expect("cleanup");
282 }
283
284 #[test]
285 #[cfg(feature = "watch")]
286 fn test_watch_file_removal() {
287 let path = tmp_path("watch_removal");
288 let _ = fs::remove_file(&path);
289
290 let mmap = create_mmap(&path, 1024).expect("create");
292
293 let removed = Arc::new(AtomicBool::new(false));
295 let removed_clone = Arc::clone(&removed);
296
297 let _handle = mmap
298 .watch(move |event| {
299 if event.kind == ChangeKind::Removed {
300 removed_clone.store(true, Ordering::SeqCst);
301 }
302 })
303 .expect("watch");
304
305 thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS * 2));
307
308 fs::remove_file(&path).expect("remove");
310
311 thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS * 3));
313
314 assert!(removed.load(Ordering::SeqCst), "Removal should be detected");
315 }
316
317 #[test]
318 #[cfg(feature = "watch")]
319 fn test_watch_with_different_modes() {
320 let path = tmp_path("watch_modes");
321 let _ = fs::remove_file(&path);
322
323 create_mmap(&path, 1024).expect("create");
325
326 let ro_mmap = MemoryMappedFile::open_ro(&path).expect("open ro");
328 let _handle = ro_mmap
329 .watch(|_event| {
330 })
332 .expect("watch ro");
333
334 #[cfg(feature = "cow")]
335 {
336 let cow_mmap = MemoryMappedFile::open_cow(&path).expect("open cow");
338 let _handle = cow_mmap
339 .watch(|_event| {
340 })
342 .expect("watch cow");
343 }
344
345 fs::remove_file(&path).expect("cleanup");
346 }
347
348 #[test]
349 #[cfg(feature = "watch")]
350 fn test_multiple_watchers() {
351 let path = tmp_path("multi_watch");
352 let _ = fs::remove_file(&path);
353
354 let mmap = create_mmap(&path, 1024).expect("create");
355
356 let count1 = Arc::new(AtomicUsize::new(0));
358 let count1_clone = Arc::clone(&count1);
359 let _handle1 = mmap
360 .watch(move |_event| {
361 count1_clone.fetch_add(1, Ordering::SeqCst);
362 })
363 .expect("watch 1");
364
365 let count2 = Arc::new(AtomicUsize::new(0));
366 let count2_clone = Arc::clone(&count2);
367 let _handle2 = mmap
368 .watch(move |_event| {
369 count2_clone.fetch_add(1, Ordering::SeqCst);
370 })
371 .expect("watch 2");
372
373 thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS * 6));
375
376 mmap.update_region(0, b"change").expect("write");
378 mmap.flush()
379 .expect("flush after write for watch visibility");
380
381 #[allow(unused_variables)]
382 {
383 #[cfg(unix)]
384 {
385 use std::ffi::CString;
386 use std::os::unix::ffi::OsStrExt;
387 let cpath = CString::new(path.as_os_str().as_bytes()).unwrap();
388 unsafe { libc::utime(cpath.as_ptr(), std::ptr::null()) };
389 }
390 #[cfg(windows)]
391 {
392 if let Ok(meta) = std::fs::metadata(&path) {
393 let mut perms = meta.permissions();
394 perms.set_readonly(true);
395 let _ = std::fs::set_permissions(&path, perms);
396 let mut perms2 = std::fs::metadata(&path).unwrap().permissions();
397 perms2.set_readonly(false);
398 let _ = std::fs::set_permissions(&path, perms2);
399 }
400 }
401 }
402
403 thread::sleep(Duration::from_millis(WATCH_POLL_INTERVAL_MS * 15));
405
406 assert!(
408 count1.load(Ordering::SeqCst) > 0,
409 "Watcher 1 should detect change"
410 );
411 assert!(
412 count2.load(Ordering::SeqCst) > 0,
413 "Watcher 2 should detect change"
414 );
415
416 fs::remove_file(&path).expect("cleanup");
417 }
418}