vstorage 0.7.0

Common API for various icalendar/vcard storages.
Documentation
// Copyright 2025 Hugo Osvaldo Barrera
//
// SPDX-License-Identifier: EUPL-1.2

use camino::{Utf8Path, Utf8PathBuf};
use futures_util::{
    StreamExt as _,
    future::{BoxFuture, Either, select},
};
use inotify::{EventMask, EventStream, Inotify, WatchMask};
use log::{error, info, warn};
use std::{collections::HashMap, pin::pin, time::Duration};
use tokio::time::Interval;

use crate::{
    ErrorKind, Result,
    watch::{Event, EventKind, SpecificEvent, StorageMonitor},
};

use super::VdirStorage;

/// Monitor for a [`VdirStorage`] instance.
///
/// # Quirks
///
/// Due to underlying limitations of inotify(7), it's possible that some events are
/// missed. When this happens, an [`Event::General`][general] is returned.
///
/// [general]: [crate::watch::Event::General]
pub struct VdirMonitor {
    extension: String,
    timer: Interval,
    // Required to re-initialise in case of failure.
    path: Utf8PathBuf,
    watcher: Option<Watcher>,
}

struct Watcher {
    events: EventStream<Vec<u8>>,
    // Events filenames don't have a full path, so we need this mapping to determine the
    // corresponding directory for reach event. Keep them as string since they need to be
    // converted before storing them and after each read anyway.
    watched_collections: HashMap<i32, String>,
}

impl VdirMonitor {
    /// Create a new monitor for `storage`.
    ///
    /// # Errors
    ///
    /// If an error occurs setting up the underlying filesystem watcher.
    pub fn new(storage: &VdirStorage, interval: Duration) -> Result<VdirMonitor> {
        // TODO: errors don't help understand the root cause.
        let watcher = init_inotify(&storage.path)?;

        let mut timer = tokio::time::interval(interval);
        timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

        Ok(VdirMonitor {
            extension: storage.extension.clone(),
            timer,
            path: storage.path.clone(),
            watcher: Some(watcher),
        })
    }
}

fn init_inotify(path: &Utf8Path) -> Result<Watcher> {
    let inotify = Inotify::init().map_err(|err| ErrorKind::Io.error(err))?;
    let mut watched_collections = HashMap::new();

    // Watch the storage root for collection-level changes
    inotify
        .watches()
        .add(path, WatchMask::CREATE | WatchMask::DELETE)
        .map_err(|err| ErrorKind::Io.error(err))?;

    // FIXME: blocking I/O.

    // Discover and watch all existing collection subdirectories
    let entries = match std::fs::read_dir(path) {
        Ok(entries) => entries,
        Err(e) => return Err(ErrorKind::Io.error(e)),
    };

    for entry in entries.flatten() {
        let metadata = match entry.metadata() {
            Ok(metadata) => metadata,
            Err(e) => return Err(ErrorKind::Io.error(e)),
        };

        if metadata.is_dir() {
            match entry.file_name().into_string() {
                Ok(file_name) => {
                    if file_name.starts_with('.') {
                        continue;
                    }
                    let collection_path = path.join(&file_name);
                    match inotify.watches().add(
                        &collection_path,
                        WatchMask::MODIFY
                            | WatchMask::CREATE
                            | WatchMask::DELETE
                            | WatchMask::CLOSE_WRITE
                            | WatchMask::MOVE,
                    ) {
                        Ok(wd) => {
                            info!("Watching collection: {file_name}");
                            watched_collections.insert(wd.get_watch_descriptor_id(), file_name);
                        }
                        Err(err) => {
                            warn!("Failed to watch collection {file_name}: {err}");
                        }
                    }
                }
                Err(e) => warn!(
                    "Directory inside storage has non-UTF8 name: {}.",
                    e.to_string_lossy()
                ),
            }
        }
    }

    let buf = vec![0u8; 4096]; // XXX: Review the size of this buffer.
    let events = inotify
        .into_event_stream(buf)
        .map_err(|err| ErrorKind::Io.error(err))?;
    Ok(Watcher {
        events,
        watched_collections,
    })
}

impl StorageMonitor for VdirMonitor {
    fn next_event(&mut self) -> BoxFuture<'_, Event> {
        Box::pin(async {
            loop {
                // TODO: It should be possible to disable the timer.
                let next = if let Some(ref mut w) = self.watcher {
                    pin!(w.events.next())
                } else {
                    // No inotify listener exists.
                    self.timer.tick().await;
                    match init_inotify(&self.path) {
                        Ok(watcher) => {
                            info!("Re-initialised inotify listener");
                            self.watcher = Some(watcher);
                        }
                        Err(err) => {
                            // Should not happen.
                            error!("Could not re-initialise inotify: {err}");
                            self.watcher = None;
                        }
                    }
                    return Event::General;
                };

                let timeout = pin!(self.timer.tick());
                match select(next, timeout).await {
                    Either::Left((None, _)) => unreachable!("End of stream for inotify events."),
                    Either::Left((Some(Ok(event)), _)) => {
                        if event.mask.contains(EventMask::Q_OVERFLOW) {
                            // TODO: drain the whole queue as well.
                            return Event::General;
                        }

                        let Some(name) = event.name else {
                            // Event without a name is watched directory itself.
                            // For collection-directory changes, we'll get an event from the parent.
                            continue;
                        };

                        let name_str = match name.into_string() {
                            Ok(s) => s,
                            Err(os) => {
                                error!("Event for non-UTF8 file: {}.", os.to_string_lossy());
                                continue;
                            }
                        };

                        let wd_id = event.wd.get_watch_descriptor_id();

                        let watcher = self
                            .watcher
                            .as_ref()
                            .expect("watcher must be Some if we got events from it");

                        // Check if this event is from a watched collection
                        if let Some(collection_name) = watcher.watched_collections.get(&wd_id) {
                            // Event in a collection subdirectory - this is an item event
                            if !name_str.ends_with(&format!(".{}", self.extension)) {
                                continue; // Irrelevant file
                            }
                            // Construct the full href: collection/item.ics
                            let href = format!("{collection_name}/{name_str}");
                            return if event.mask.contains(EventMask::DELETE) {
                                Event::Specific(SpecificEvent {
                                    href,
                                    kind: EventKind::Delete,
                                })
                            } else {
                                Event::Specific(SpecificEvent {
                                    href,
                                    kind: EventKind::Change,
                                })
                            };
                        }
                        // Event at storage root - could be a new/deleted collection
                        // For now, return General event for collection-level changes
                        // TODO: Could add watches for newly created collections here
                        return Event::General;
                    }
                    Either::Left((Some(Err(err)), _)) => {
                        // This branch reached when a collection is deleted locally.
                        error!("Error reading from inotify: {err}");
                        match init_inotify(&self.path) {
                            Ok(watcher) => {
                                info!("Re-initialised inotify listener");
                                self.watcher = Some(watcher);
                            }
                            Err(err) => {
                                // Should not happen.
                                error!("Could not re-initialise inotify: {err}");
                                self.watcher = None;
                            }
                        }
                        return Event::General;
                    }
                    Either::Right(..) => return Event::General,
                }
            }
        })
    }
}

#[cfg(test)]
mod test {
    use std::{sync::Arc, time::Duration};

    use tempfile::tempdir;

    use crate::{
        base::Storage,
        vdir::{ItemKind, VdirStorage},
        watch::Event,
    };

    #[tokio::test]
    async fn inotify_after_deletion() {
        let path_a = tempdir().unwrap();

        std::fs::create_dir(path_a.path().join("one")).unwrap();
        let storage_a = VdirStorage::builder(path_a.path().to_path_buf().try_into().unwrap())
            .unwrap()
            .build(ItemKind::Calendar);
        let storage_a: Arc<dyn Storage> = Arc::new(storage_a);

        // Interval is high enough that we shouldn't reach it
        let mut monitor = storage_a.monitor(Duration::from_secs(5)).await.unwrap();

        // Initial general event.
        assert_eq!(Event::General, monitor.next_event().await);

        drop(path_a); // Removes directory;
        // Event for the directory deletion.
        let _ = tokio::time::timeout(Duration::from_secs(1), monitor.next_event()).await;

        // No more events should arrive after the first one.
        let next = tokio::time::timeout(Duration::from_secs(1), monitor.next_event()).await;
        assert!(next.is_err(), "Expected timeout, got event");
    }
}