use std::{
collections::HashMap,
fs::{File, read_dir},
os::unix::io::AsRawFd,
thread::sleep,
time::Duration,
};
use camino::Utf8PathBuf;
use kqueue::{EventData, EventFilter, FilterFlag, Ident, Vnode, Watcher};
use log::{debug, error, trace, warn};
use tokio::sync::mpsc::Sender;
use crate::watch::{Event, EventKind};
const VNODE_FLAGS: FilterFlag = FilterFlag::NOTE_WRITE
.union(FilterFlag::NOTE_EXTEND)
.union(FilterFlag::NOTE_DELETE)
.union(FilterFlag::NOTE_RENAME)
.union(FilterFlag::NOTE_LINK)
.union(FilterFlag::NOTE_ATTRIB);
struct CollectionWatch {
file: File,
href: String,
}
struct State {
watcher: Watcher,
root_file: File,
collections: HashMap<Utf8PathBuf, CollectionWatch>,
}
impl State {
fn add_collection(&mut self, path: &Utf8PathBuf, href: String) -> Result<(), std::io::Error> {
let file = File::open(path)?;
self.watcher
.add_file(&file, EventFilter::EVFILT_VNODE, VNODE_FLAGS)?;
self.collections
.insert(path.clone(), CollectionWatch { file, href });
Ok(())
}
fn remove_collection(&mut self, path: &Utf8PathBuf) -> Option<CollectionWatch> {
let watch = self.collections.remove(path)?;
let _ = self
.watcher
.remove_file(&watch.file, EventFilter::EVFILT_VNODE);
Some(watch)
}
}
pub(super) fn run_kqueue_task(sender: &Sender<Event>, path: &Utf8PathBuf) {
loop {
if run_kqueue_inner(sender, path).is_err() {
return;
}
sleep(Duration::from_secs(2)); }
}
fn run_kqueue_inner(sender: &Sender<Event>, path: &Utf8PathBuf) -> Result<(), ()> {
let mut state = init(path)?;
if sender.blocking_send(Event::Storage).is_err() {
return Err(());
}
loop {
if sender.is_closed() {
return Err(());
}
let Some(event) = state.watcher.poll_forever(None) else {
unreachable!("poll_forever returns None only when not initialised.");
};
let is_delete = match &event.data {
EventData::Vnode(Vnode::Delete) => true,
EventData::Vnode(_) => false,
EventData::Error(e) => {
error!("kqueue: error event: {e}");
break;
}
data => {
error!("kqueue: Unexpected data: {data:?}");
continue;
}
};
let fd = match &event.ident {
Ident::Fd(fd) => *fd,
i => {
error!("kqueue: Unexpected ident: {i:?}.");
continue;
}
};
if fd == state.root_file.as_raw_fd() {
if is_delete {
error!("kqueue: Storage directory was deleted; bailing.");
return Err(());
}
if sender.blocking_send(Event::Storage).is_err() {
return Err(());
}
if rescan_collections(sender, path, &mut state).is_err() {
return Err(());
}
} else if let Some((collection_path, href)) = state
.collections
.iter()
.find(|(_, w)| w.file.as_raw_fd() == fd)
.map(|(p, w)| (p.clone(), w.href.clone()))
{
let kind = if is_delete {
state.remove_collection(&collection_path);
EventKind::Delete
} else {
EventKind::Change
};
if sender.blocking_send(Event::Collection(href, kind)).is_err() {
return Err(());
}
} else {
warn!("kqueue: Received event for untracked fd: {fd}");
}
}
Ok(())
}
fn init(path: &Utf8PathBuf) -> Result<State, ()> {
let mut watcher = Watcher::new().map_err(|e| {
error!("kqueue: Failed to create watcher: {e}");
})?;
let root_file = File::open(path).map_err(|e| {
error!("kqueue: Failed to open storage root: {e}");
})?;
watcher
.add_file(&root_file, EventFilter::EVFILT_VNODE, VNODE_FLAGS)
.map_err(|e| {
error!("kqueue: Failed to watch storage root: {e}");
})?;
let mut state = State {
watcher,
root_file,
collections: HashMap::new(),
};
if discover_collections(path, &mut state).is_err() {
return Err(());
}
if let Err(e) = state.watcher.watch() {
error!("kqueue: Failed to start watcher: {e}");
return Err(());
}
Ok(state)
}
fn discover_collections(path: &Utf8PathBuf, state: &mut State) -> Result<(), ()> {
let entries = match read_dir(path) {
Ok(e) => e,
Err(e) => {
error!("kqueue: Failed to read storage root: {e}");
return Err(());
}
};
for entry in entries {
let entry = match entry {
Ok(e) => e,
Err(e) => {
warn!("kqueue: Failed to read directory entry: {e}");
continue;
}
};
let metadata = match entry.metadata() {
Ok(m) => m,
Err(e) => {
warn!(
"kqueue: Failed to read metadata for {}: {e}",
entry.path().display()
);
continue;
}
};
if !metadata.is_dir() {
continue;
}
let name = entry.file_name();
if name.as_encoded_bytes().first() == Some(&b'.') {
continue;
}
let Ok(name) = name.into_string() else {
warn!(
"kqueue: Ignoring directory with non-UTF8 name: {}.",
entry.path().display()
);
continue;
};
let collection_path = path.join(&name);
if state.collections.contains_key(&collection_path) {
trace!("kqueue: Already watching collection: {name}");
continue;
}
if let Err(e) = state.add_collection(&collection_path, name.clone()) {
warn!("kqueue: Failed to watch collection {name}: {e}");
} else {
debug!("kqueue: Watching collection: {name}");
}
}
Ok(())
}
fn rescan_collections(
sender: &Sender<Event>,
path: &Utf8PathBuf,
state: &mut State,
) -> Result<(), ()> {
let to_remove: Vec<Utf8PathBuf> = state
.collections
.keys()
.filter(|p| !p.exists())
.cloned()
.collect();
for collection_path in to_remove {
if let Some(watch) = state.remove_collection(&collection_path) {
sender
.blocking_send(Event::Collection(watch.href, EventKind::Delete))
.map_err(|_| ())?;
}
}
discover_collections(path, state)?;
if let Err(e) = state.watcher.watch() {
error!("kqueue: Failed to re-register watches: {e}");
return Err(());
}
Ok(())
}