vstorage 0.10.0

Common API for various icalendar/vcard storages.
Documentation
// Copyright 2025-2026 Hugo Osvaldo Barrera
//
// SPDX-License-Identifier: EUPL-1.2

use camino::{Utf8Path, Utf8PathBuf};
use futures_util::{FutureExt as _, StreamExt as _};
use inotify::{EventMask, EventStream, Inotify, WatchMask};
use log::{error, info, warn};
use std::{collections::HashMap, str::FromStr, time::Duration};
use tokio::{fs::read_dir, sync::mpsc, time::sleep};

use crate::{
    ErrorKind, Result,
    property::Property,
    watch::{Event, EventKind},
};

pub(super) struct Watcher {
    events: EventStream<Vec<u8>>,
    // Mapping to determine the corresponding directory for each event.
    // Keep them as string since they need to be converted before storing them and after each read.
    watched_collections: HashMap<i32, String>,
    // Reverse mapping for efficient lookup by name.
    collection_wds: HashMap<String, i32>,
    root_wd: i32,
}

pub(super) async fn init_inotify(path: &Utf8Path) -> Result<Watcher> {
    let inotify = Inotify::init().map_err(|err| ErrorKind::Io.error(err))?;
    let mut watched_collections = HashMap::new();
    let mut collection_wds = HashMap::new();

    // Watch the storage root for collection-level changes
    let root_wd = inotify
        .watches()
        .add(path, WatchMask::CREATE | WatchMask::DELETE)
        .map_err(|err| ErrorKind::Io.error(err))?;
    let root_wd = root_wd.get_watch_descriptor_id();

    // Discover and watch all existing collection subdirectories
    let mut entries = match read_dir(path).await {
        Ok(entries) => entries,
        Err(e) => return Err(ErrorKind::Io.error(e)),
    };

    while let Some(next) = entries.next_entry().await.transpose() {
        let entry = match next {
            Ok(entry) => entry,
            Err(e) => return Err(ErrorKind::Io.error(e)),
        };
        let metadata = match entry.metadata().await {
            Ok(metadata) => metadata,
            Err(e) => return Err(ErrorKind::Io.error(e)),
        };

        if metadata.is_dir() {
            match entry.file_name().into_string() {
                Ok(file_name) => {
                    if file_name.starts_with('.') {
                        continue;
                    }
                    let collection_path = path.join(&file_name);
                    match inotify.watches().add(
                        &collection_path,
                        WatchMask::MODIFY
                            | WatchMask::CREATE
                            | WatchMask::DELETE
                            | WatchMask::CLOSE_WRITE
                            | WatchMask::MOVE,
                    ) {
                        Ok(wd) => {
                            info!("Watching collection: {file_name}");
                            let wd_id = wd.get_watch_descriptor_id();
                            watched_collections.insert(wd_id, file_name.clone());
                            collection_wds.insert(file_name, wd_id);
                        }
                        Err(err) => {
                            warn!("Failed to watch collection {file_name}: {err}");
                        }
                    }
                }
                Err(e) => warn!(
                    "Ignoring directory inside storage with non-UTF8 name: {}.",
                    e.to_string_lossy()
                ),
            }
        }
    }

    let buf = vec![0u8; 4096]; // XXX: Review the size of this buffer.
    let events = inotify
        .into_event_stream(buf)
        .map_err(|err| ErrorKind::Io.error(err))?;
    Ok(Watcher {
        events,
        watched_collections,
        collection_wds,
        root_wd,
    })
}

pub(super) async fn run_inotify_task(
    sender: mpsc::Sender<Event>,
    path: Utf8PathBuf,
    ext: String,
    mut watcher: Watcher,
) {
    loop {
        let inotify_event = match watcher.events.next().await {
            Some(Ok(event)) => event,
            Some(Err(err)) => {
                error!("Error reading from inotify: {err}");
                // Dropping the old instance will close it properly.
                match init_inotify(&path).await {
                    Ok(w) => {
                        info!("Re-initialised inotify listener");
                        watcher = w;
                    }
                    Err(err) => {
                        // Consider back-off, but, realistically: when does this even happen?
                        error!("Could not re-initialise inotify: {err}");
                        sleep(Duration::from_secs(2)).await; // XXX: Magic number.
                        continue;
                    }
                }
                if sender.send(Event::Storage).await.is_err() {
                    return;
                }
                continue;
            }
            None => {
                // End of stream; should not happen.
                error!("Unexpected end of inotify stream.");
                sleep(Duration::from_secs(2)).await; // XXX: Magic number.
                continue;
            }
        };

        if inotify_event.mask.contains(EventMask::Q_OVERFLOW) {
            // Drain the queue to discard stale events.
            while let Some(Ok(_)) = watcher.events.next().now_or_never().flatten() {}
            if sender.send(Event::Storage).await.is_err() {
                return;
            }
            continue;
        }

        let Some(name) = inotify_event.name else {
            // Event without a name is for watched directory itself.
            if inotify_event.mask.contains(EventMask::DELETE_SELF) {
                let wd_id = inotify_event.wd.get_watch_descriptor_id();
                if wd_id == watcher.root_wd {
                    warn!("Storage directory was deleted; inotify watcher bailing.");
                    return;
                }
                if let Some(collection_name) = watcher.watched_collections.remove(&wd_id) {
                    watcher.collection_wds.remove(&collection_name);
                    if sender
                        .send(Event::Collection(collection_name, EventKind::Delete))
                        .await
                        .is_err()
                    {
                        return;
                    }
                }
            }
            continue;
        };

        if name.as_encoded_bytes().first() == Some(&b'.') {
            continue;
        }
        let name = match name.into_string() {
            Ok(s) => s,
            Err(os) => {
                error!(
                    "Ignoring event for file with non-UTF8 name: {}.",
                    os.to_string_lossy()
                );
                continue;
            }
        };

        let wd_id = inotify_event.wd.get_watch_descriptor_id();

        let result = if let Some(collection) = watcher.watched_collections.get(&wd_id) {
            handle_collection_event(&sender, collection, &name, &ext, &inotify_event.mask).await
        } else {
            handle_root_event(&sender, &mut watcher, &path, &name, &inotify_event.mask).await
        };
        if result.is_err() {
            return;
        }
    }
}

async fn handle_collection_event(
    sender: &mpsc::Sender<Event>,
    collection_name: &str,
    name_str: &str,
    extension: &str,
    mask: &EventMask,
) -> Result<(), ()> {
    let kind = if mask.contains(EventMask::DELETE) {
        EventKind::Delete
    } else {
        EventKind::Change
    };
    if name_str.ends_with(&format!(".{extension}")) {
        let href = format!("{collection_name}/{name_str}");
        sender
            .send(Event::Item(collection_name.to_string(), href, kind))
            .await
            .map_err(|_| ())?;
    } else if let Ok(property) = Property::from_str(name_str) {
        sender
            .send(Event::Property(collection_name.to_string(), property, kind))
            .await
            .map_err(|_| ())?;
    } // else: irrelevant file.
    Ok(())
}

async fn handle_root_event(
    sender: &mpsc::Sender<Event>,
    watcher: &mut Watcher,
    path: &Utf8Path,
    name_str: &str,
    mask: &EventMask,
) -> Result<(), ()> {
    if mask.contains(EventMask::CREATE) {
        let collection_path = path.join(name_str);
        match tokio::fs::metadata(&collection_path).await {
            Ok(meta) if meta.is_dir() => {
                sender
                    .send(Event::Collection(name_str.to_string(), EventKind::Change))
                    .await
                    .map_err(|_| ())?;
            }
            _ => {
                sender.send(Event::Storage).await.map_err(|_| ())?;
            }
        }
    } else if mask.contains(EventMask::DELETE) {
        if let Some(wd_id) = watcher.collection_wds.remove(name_str) {
            watcher.watched_collections.remove(&wd_id);
            sender
                .send(Event::Collection(name_str.to_string(), EventKind::Delete))
                .await
                .map_err(|_| ())?;
        }
    } else {
        sender.send(Event::Storage).await.map_err(|_| ())?;
    }
    Ok(())
}

#[cfg(test)]
mod test {
    use std::{sync::Arc, time::Duration};

    use tempfile::tempdir;

    use crate::{
        base::Storage,
        vdir::{ItemKind, VdirStorage},
        watch::Event,
    };

    #[tokio::test]
    async fn inotify_after_deletion() {
        let path_a = tempdir().unwrap();

        std::fs::create_dir(path_a.path().join("one")).unwrap();
        let storage_a = VdirStorage::builder(path_a.path().to_path_buf().try_into().unwrap())
            .unwrap()
            .build(ItemKind::Calendar);
        let storage_a: Arc<dyn Storage> = Arc::new(storage_a);

        let mut monitor = storage_a.monitor().await.unwrap();

        // Initial storage event.
        assert_eq!(Event::Storage, monitor.next_event().await.unwrap());

        drop(path_a); // Removes directory;
        // Event for the directory deletion.
        let _ = tokio::time::timeout(Duration::from_secs(1), monitor.next_event()).await;

        // No more events should arrive after the first one.
        let next = tokio::time::timeout(Duration::from_secs(1), monitor.next_event()).await;
        assert!(next.is_err(), "Expected timeout, got event");
    }
}