use camino::{Utf8Path, Utf8PathBuf};
use futures_util::{
StreamExt as _,
future::{BoxFuture, Either, select},
};
use inotify::{EventMask, EventStream, Inotify, WatchMask};
use log::{error, info, warn};
use std::{collections::HashMap, pin::pin, time::Duration};
use tokio::time::Interval;
use crate::{
ErrorKind, Result,
watch::{Event, EventKind, SpecificEvent, StorageMonitor},
};
use super::VdirStorage;
pub struct VdirMonitor {
extension: String,
timer: Interval,
path: Utf8PathBuf,
watcher: Option<Watcher>,
}
struct Watcher {
events: EventStream<Vec<u8>>,
watched_collections: HashMap<i32, String>,
}
impl VdirMonitor {
pub fn new(storage: &VdirStorage, interval: Duration) -> Result<VdirMonitor> {
let watcher = init_inotify(&storage.path)?;
let mut timer = tokio::time::interval(interval);
timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
Ok(VdirMonitor {
extension: storage.extension.clone(),
timer,
path: storage.path.clone(),
watcher: Some(watcher),
})
}
}
fn init_inotify(path: &Utf8Path) -> Result<Watcher> {
let inotify = Inotify::init().map_err(|err| ErrorKind::Io.error(err))?;
let mut watched_collections = HashMap::new();
inotify
.watches()
.add(path, WatchMask::CREATE | WatchMask::DELETE)
.map_err(|err| ErrorKind::Io.error(err))?;
let entries = match std::fs::read_dir(path) {
Ok(entries) => entries,
Err(e) => return Err(ErrorKind::Io.error(e)),
};
for entry in entries.flatten() {
let metadata = match entry.metadata() {
Ok(metadata) => metadata,
Err(e) => return Err(ErrorKind::Io.error(e)),
};
if metadata.is_dir() {
match entry.file_name().into_string() {
Ok(file_name) => {
if file_name.starts_with('.') {
continue;
}
let collection_path = path.join(&file_name);
match inotify.watches().add(
&collection_path,
WatchMask::MODIFY
| WatchMask::CREATE
| WatchMask::DELETE
| WatchMask::CLOSE_WRITE
| WatchMask::MOVE,
) {
Ok(wd) => {
info!("Watching collection: {file_name}");
watched_collections.insert(wd.get_watch_descriptor_id(), file_name);
}
Err(err) => {
warn!("Failed to watch collection {file_name}: {err}");
}
}
}
Err(e) => warn!(
"Directory inside storage has non-UTF8 name: {}.",
e.to_string_lossy()
),
}
}
}
let buf = vec![0u8; 4096]; let events = inotify
.into_event_stream(buf)
.map_err(|err| ErrorKind::Io.error(err))?;
Ok(Watcher {
events,
watched_collections,
})
}
impl StorageMonitor for VdirMonitor {
fn next_event(&mut self) -> BoxFuture<'_, Event> {
Box::pin(async {
loop {
let next = if let Some(ref mut w) = self.watcher {
pin!(w.events.next())
} else {
self.timer.tick().await;
match init_inotify(&self.path) {
Ok(watcher) => {
info!("Re-initialised inotify listener");
self.watcher = Some(watcher);
}
Err(err) => {
error!("Could not re-initialise inotify: {err}");
self.watcher = None;
}
}
return Event::General;
};
let timeout = pin!(self.timer.tick());
match select(next, timeout).await {
Either::Left((None, _)) => unreachable!("End of stream for inotify events."),
Either::Left((Some(Ok(event)), _)) => {
if event.mask.contains(EventMask::Q_OVERFLOW) {
return Event::General;
}
let Some(name) = event.name else {
continue;
};
let name_str = match name.into_string() {
Ok(s) => s,
Err(os) => {
error!("Event for non-UTF8 file: {}.", os.to_string_lossy());
continue;
}
};
let wd_id = event.wd.get_watch_descriptor_id();
let watcher = self
.watcher
.as_ref()
.expect("watcher must be Some if we got events from it");
if let Some(collection_name) = watcher.watched_collections.get(&wd_id) {
if !name_str.ends_with(&format!(".{}", self.extension)) {
continue; }
let href = format!("{collection_name}/{name_str}");
return if event.mask.contains(EventMask::DELETE) {
Event::Specific(SpecificEvent {
href,
kind: EventKind::Delete,
})
} else {
Event::Specific(SpecificEvent {
href,
kind: EventKind::Change,
})
};
}
return Event::General;
}
Either::Left((Some(Err(err)), _)) => {
error!("Error reading from inotify: {err}");
match init_inotify(&self.path) {
Ok(watcher) => {
info!("Re-initialised inotify listener");
self.watcher = Some(watcher);
}
Err(err) => {
error!("Could not re-initialise inotify: {err}");
self.watcher = None;
}
}
return Event::General;
}
Either::Right(..) => return Event::General,
}
}
})
}
}
#[cfg(test)]
mod test {
use std::{sync::Arc, time::Duration};
use tempfile::tempdir;
use crate::{
base::Storage,
vdir::{ItemKind, VdirStorage},
watch::Event,
};
#[tokio::test]
async fn inotify_after_deletion() {
let path_a = tempdir().unwrap();
std::fs::create_dir(path_a.path().join("one")).unwrap();
let storage_a = VdirStorage::builder(path_a.path().to_path_buf().try_into().unwrap())
.unwrap()
.build(ItemKind::Calendar);
let storage_a: Arc<dyn Storage> = Arc::new(storage_a);
let mut monitor = storage_a.monitor(Duration::from_secs(5)).await.unwrap();
assert_eq!(Event::General, monitor.next_event().await);
drop(path_a); let _ = tokio::time::timeout(Duration::from_secs(1), monitor.next_event()).await;
let next = tokio::time::timeout(Duration::from_secs(1), monitor.next_event()).await;
assert!(next.is_err(), "Expected timeout, got event");
}
}