use crate::errors::{MmapIoError, Result};
use crate::mmap::MemoryMappedFile;
use notify::event::EventKind as NotifyKind;
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
const SHUTDOWN_JOIN_TIMEOUT: Duration = Duration::from_millis(500);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangeKind {
Modified,
Metadata,
Removed,
}
#[derive(Debug, Clone)]
pub struct ChangeEvent {
pub offset: Option<u64>,
pub len: Option<u64>,
pub kind: ChangeKind,
}
pub struct WatchHandle {
watcher: Option<RecommendedWatcher>,
thread: Option<thread::JoinHandle<()>>,
}
impl Drop for WatchHandle {
fn drop(&mut self) {
self.watcher.take();
if let Some(handle) = self.thread.take() {
let _ = thread::spawn(move || {
let _ = handle.join();
});
}
let _ = SHUTDOWN_JOIN_TIMEOUT;
}
}
impl WatchHandle {
#[allow(dead_code)]
pub fn is_active(&self) -> bool {
self.thread.as_ref().is_some_and(|h| !h.is_finished())
}
}
fn map_notify_kind(kind: &NotifyKind) -> Option<ChangeKind> {
use notify::event::{ModifyKind, RemoveKind};
match kind {
NotifyKind::Create(_) => {
Some(ChangeKind::Modified)
}
NotifyKind::Modify(ModifyKind::Data(_)) => Some(ChangeKind::Modified),
NotifyKind::Modify(ModifyKind::Metadata(_)) => Some(ChangeKind::Metadata),
NotifyKind::Modify(ModifyKind::Name(_)) => {
Some(ChangeKind::Removed)
}
NotifyKind::Modify(_) => Some(ChangeKind::Modified),
NotifyKind::Remove(RemoveKind::File | RemoveKind::Any) => Some(ChangeKind::Removed),
NotifyKind::Remove(_) => Some(ChangeKind::Removed),
NotifyKind::Access(_) => None,
NotifyKind::Any | NotifyKind::Other => Some(ChangeKind::Modified),
}
}
impl MemoryMappedFile {
#[cfg(feature = "watch")]
pub fn watch<F>(&self, callback: F) -> Result<WatchHandle>
where
F: Fn(ChangeEvent) + Send + 'static,
{
let path = self.path().to_path_buf();
let (tx, rx) = mpsc::channel::<notify::Result<Event>>();
let mut watcher: RecommendedWatcher =
notify::recommended_watcher(move |res: notify::Result<Event>| {
let _ = tx.send(res);
})
.map_err(|e| MmapIoError::WatchFailed(format!("watcher init failed: {e}")))?;
watcher
.watch(&path, RecursiveMode::NonRecursive)
.map_err(|e| MmapIoError::WatchFailed(format!("watch({:?}) failed: {e}", path)))?;
let thread = thread::Builder::new()
.name(format!("mmap-io-watch:{}", path.display()))
.spawn(move || {
while let Ok(res) = rx.recv() {
let event = match res {
Ok(ev) => ev,
Err(_) => {
callback(ChangeEvent {
offset: None,
len: None,
kind: ChangeKind::Modified,
});
continue;
}
};
if let Some(kind) = map_notify_kind(&event.kind) {
callback(ChangeEvent {
offset: None,
len: None,
kind,
});
if matches!(kind, ChangeKind::Removed) {
break;
}
}
}
})
.map_err(|e| MmapIoError::WatchFailed(format!("watch thread spawn failed: {e}")))?;
Ok(WatchHandle {
watcher: Some(watcher),
thread: Some(thread),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::create_mmap;
use std::fs;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
fn tmp_path(name: &str) -> PathBuf {
let mut p = std::env::temp_dir();
p.push(format!(
"mmap_io_watch_test_{}_{}",
name,
std::process::id()
));
p
}
fn wait_until<F: Fn() -> bool>(timeout: Duration, pred: F) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if pred() {
return true;
}
thread::sleep(Duration::from_millis(10));
}
pred()
}
fn touch_file_externally(path: &std::path::Path, payload: &[u8]) {
use std::io::Write;
let mut f = fs::OpenOptions::new()
.write(true)
.open(path)
.expect("reopen for external write");
f.write_all(payload).expect("external write");
f.sync_all().expect("external sync");
}
#[test]
#[cfg(feature = "watch")]
fn test_watch_file_changes() {
let path = tmp_path("watch_changes");
let _ = fs::remove_file(&path);
let mmap = create_mmap(&path, 1024).expect("create");
mmap.update_region(0, b"initial").expect("write");
mmap.flush().expect("flush");
let changed = Arc::new(AtomicBool::new(false));
let event_count = Arc::new(AtomicUsize::new(0));
let _handle = {
let changed = Arc::clone(&changed);
let counter = Arc::clone(&event_count);
mmap.watch(move |_event| {
changed.store(true, Ordering::SeqCst);
counter.fetch_add(1, Ordering::SeqCst);
})
.expect("watch")
};
thread::sleep(Duration::from_millis(200));
touch_file_externally(&path, b"modified-externally");
let detected = wait_until(Duration::from_secs(3), || changed.load(Ordering::SeqCst));
assert!(
detected,
"watcher must detect the external modify within the deadline; observed {} events",
event_count.load(Ordering::SeqCst)
);
drop(mmap);
let _ = fs::remove_file(&path);
}
#[test]
#[cfg(feature = "watch")]
fn test_multiple_watchers() {
let path = tmp_path("multi_watch");
let _ = fs::remove_file(&path);
let mmap = create_mmap(&path, 1024).expect("create");
let count1 = Arc::new(AtomicUsize::new(0));
let count2 = Arc::new(AtomicUsize::new(0));
let _h1 = {
let counter = Arc::clone(&count1);
mmap.watch(move |_| {
counter.fetch_add(1, Ordering::SeqCst);
})
.expect("watch 1")
};
let _h2 = {
let counter = Arc::clone(&count2);
mmap.watch(move |_| {
counter.fetch_add(1, Ordering::SeqCst);
})
.expect("watch 2")
};
thread::sleep(Duration::from_millis(200));
touch_file_externally(&path, b"change");
let both_saw = wait_until(Duration::from_secs(3), || {
count1.load(Ordering::SeqCst) > 0 && count2.load(Ordering::SeqCst) > 0
});
assert!(
both_saw,
"both watchers must detect the change; saw count1={}, count2={}",
count1.load(Ordering::SeqCst),
count2.load(Ordering::SeqCst)
);
drop(mmap);
let _ = fs::remove_file(&path);
}
#[test]
#[cfg(feature = "watch")]
fn test_watch_handle_drop_stops_watching() {
let path = tmp_path("watch_drop");
let _ = fs::remove_file(&path);
let mmap = create_mmap(&path, 1024).expect("create");
let count = Arc::new(AtomicUsize::new(0));
let handle = {
let counter = Arc::clone(&count);
mmap.watch(move |_| {
counter.fetch_add(1, Ordering::SeqCst);
})
.expect("watch")
};
thread::sleep(Duration::from_millis(100));
assert!(handle.is_active(), "watch thread should be alive");
drop(handle);
thread::sleep(Duration::from_millis(100));
let baseline = count.load(Ordering::SeqCst);
mmap.update_region(0, b"after-drop").expect("write");
mmap.flush().expect("flush");
thread::sleep(Duration::from_millis(300));
assert_eq!(
count.load(Ordering::SeqCst),
baseline,
"no events should fire after handle drop"
);
drop(mmap);
let _ = fs::remove_file(&path);
}
}