use camino::{Utf8Path, Utf8PathBuf};
use futures_util::{FutureExt as _, StreamExt as _};
use inotify::{EventMask, EventStream, Inotify, WatchMask};
use log::{error, info, warn};
use std::{collections::HashMap, time::Duration};
use tokio::{fs::read_dir, sync::mpsc, time::sleep};
use crate::{
ErrorKind, Result,
watch::{Event, EventKind, SpecificEvent},
};
pub(super) struct Watcher {
events: EventStream<Vec<u8>>,
watched_collections: HashMap<i32, String>,
}
pub(super) async fn init_inotify(path: &Utf8Path) -> Result<Watcher> {
let inotify = Inotify::init().map_err(|err| ErrorKind::Io.error(err))?;
let mut watched_collections = HashMap::new();
inotify
.watches()
.add(path, WatchMask::CREATE | WatchMask::DELETE)
.map_err(|err| ErrorKind::Io.error(err))?;
let mut entries = match read_dir(path).await {
Ok(entries) => entries,
Err(e) => return Err(ErrorKind::Io.error(e)),
};
while let Some(next) = entries.next_entry().await.transpose() {
let entry = match next {
Ok(entry) => entry,
Err(e) => return Err(ErrorKind::Io.error(e)),
};
let metadata = match entry.metadata().await {
Ok(metadata) => metadata,
Err(e) => return Err(ErrorKind::Io.error(e)),
};
if metadata.is_dir() {
match entry.file_name().into_string() {
Ok(file_name) => {
if file_name.starts_with('.') {
continue;
}
let collection_path = path.join(&file_name);
match inotify.watches().add(
&collection_path,
WatchMask::MODIFY
| WatchMask::CREATE
| WatchMask::DELETE
| WatchMask::CLOSE_WRITE
| WatchMask::MOVE,
) {
Ok(wd) => {
info!("Watching collection: {file_name}");
watched_collections.insert(wd.get_watch_descriptor_id(), file_name);
}
Err(err) => {
warn!("Failed to watch collection {file_name}: {err}");
}
}
}
Err(e) => warn!(
"Ignoring directory inside storage with non-UTF8 name: {}.",
e.to_string_lossy()
),
}
}
}
let buf = vec![0u8; 4096]; let events = inotify
.into_event_stream(buf)
.map_err(|err| ErrorKind::Io.error(err))?;
Ok(Watcher {
events,
watched_collections,
})
}
pub(super) async fn run_inotify_task(
sender: mpsc::Sender<Event>,
path: Utf8PathBuf,
extension: String,
mut watcher: Watcher,
) {
loop {
let inotify_event = match watcher.events.next().await {
Some(Ok(event)) => event,
Some(Err(err)) => {
error!("Error reading from inotify: {err}");
match init_inotify(&path).await {
Ok(w) => {
info!("Re-initialised inotify listener");
watcher = w;
}
Err(err) => {
error!("Could not re-initialise inotify: {err}");
sleep(Duration::from_secs(2)).await; continue;
}
}
if sender.send(Event::General).await.is_err() {
return;
}
continue;
}
None => {
error!("Unexpected end of inotify stream.");
sleep(Duration::from_secs(2)).await; continue;
}
};
if inotify_event.mask.contains(EventMask::Q_OVERFLOW) {
while let Some(Ok(_)) = watcher.events.next().now_or_never().flatten() {}
if sender.send(Event::General).await.is_err() {
return;
}
continue;
}
let Some(name) = inotify_event.name else {
if inotify_event.mask.contains(EventMask::DELETE_SELF) {
warn!("Storage directory was deleted; inotify watcher bailing.");
return;
}
continue;
};
let name_str = match name.into_string() {
Ok(s) => s,
Err(os) => {
error!(
"Ignoring event for file with non-UTF8 name: {}.",
os.to_string_lossy()
);
continue;
}
};
let wd_id = inotify_event.wd.get_watch_descriptor_id();
if let Some(collection_name) = watcher.watched_collections.get(&wd_id) {
if !name_str.ends_with(&format!(".{extension}")) {
continue; }
let href = format!("{collection_name}/{name_str}");
let event = if inotify_event.mask.contains(EventMask::DELETE) {
Event::Specific(SpecificEvent {
href,
kind: EventKind::Delete,
})
} else {
Event::Specific(SpecificEvent {
href,
kind: EventKind::Change,
})
};
if sender.send(event).await.is_err() {
return;
}
} else {
if sender.send(Event::General).await.is_err() {
return;
}
}
}
}
#[cfg(test)]
mod test {
use std::{sync::Arc, time::Duration};
use tempfile::tempdir;
use crate::{
base::Storage,
vdir::{ItemKind, VdirStorage},
watch::Event,
};
#[tokio::test]
async fn inotify_after_deletion() {
let path_a = tempdir().unwrap();
std::fs::create_dir(path_a.path().join("one")).unwrap();
let storage_a = VdirStorage::builder(path_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar);
let storage_a: Arc<dyn Storage> = Arc::new(storage_a);
let mut monitor = storage_a.monitor().await.unwrap();
assert_eq!(Event::General, monitor.next_event().await.unwrap());
drop(path_a); let _ = tokio::time::timeout(Duration::from_secs(1), monitor.next_event()).await;
let next = tokio::time::timeout(Duration::from_secs(1), monitor.next_event()).await;
assert!(next.is_err(), "Expected timeout, got event");
}
}