vstorage 0.10.0

Common API for various icalendar/vcard storages.
Documentation
// Copyright 2026 Hugo Osvaldo Barrera
//
// SPDX-License-Identifier: ISC

use std::{
    collections::HashMap,
    fs::{File, read_dir},
    os::unix::io::AsRawFd,
    thread::sleep,
    time::Duration,
};

use camino::Utf8PathBuf;
use kqueue::{EventData, EventFilter, FilterFlag, Ident, Vnode, Watcher};
use log::{debug, error, trace, warn};
use tokio::sync::mpsc::Sender;

use crate::watch::{Event, EventKind};

const VNODE_FLAGS: FilterFlag = FilterFlag::NOTE_WRITE
    .union(FilterFlag::NOTE_EXTEND)
    .union(FilterFlag::NOTE_DELETE)
    .union(FilterFlag::NOTE_RENAME)
    .union(FilterFlag::NOTE_LINK)
    .union(FilterFlag::NOTE_ATTRIB);

struct CollectionWatch {
    /// File for this collection.
    file: File,
    /// `href`, as understood by the `Storage` itself.
    href: String,
}

struct State {
    watcher: Watcher,
    root_file: File,
    collections: HashMap<Utf8PathBuf, CollectionWatch>,
}

impl State {
    fn add_collection(&mut self, path: &Utf8PathBuf, href: String) -> Result<(), std::io::Error> {
        let file = File::open(path)?;
        self.watcher
            .add_file(&file, EventFilter::EVFILT_VNODE, VNODE_FLAGS)?;
        self.collections
            .insert(path.clone(), CollectionWatch { file, href });
        Ok(())
    }

    /// Returns None if nothing was deleted.
    /// Might happen when multiple events refer to a collection, only the latter being the deletion.
    fn remove_collection(&mut self, path: &Utf8PathBuf) -> Option<CollectionWatch> {
        // None: Racy events during removal.
        let watch = self.collections.remove(path)?;
        // Error here is in case kevent call fails, but FD is closed anyway.
        let _ = self
            .watcher
            .remove_file(&watch.file, EventFilter::EVFILT_VNODE);
        Some(watch)
    }
}

pub(super) fn run_kqueue_task(sender: &Sender<Event>, path: &Utf8PathBuf) {
    loop {
        if run_kqueue_inner(sender, path).is_err() {
            return;
        }
        sleep(Duration::from_secs(2)); // XXX: Magic number.
    }
}

/// Run the watcher and emit events.
///
/// Returns `Err` in case of an irrecoverable error, or `Ok` in case of a transient error which
/// requires resetting the watcher.
fn run_kqueue_inner(sender: &Sender<Event>, path: &Utf8PathBuf) -> Result<(), ()> {
    let mut state = init(path)?;

    // Initial event after setting up filesystem watch.
    if sender.blocking_send(Event::Storage).is_err() {
        return Err(());
    }

    loop {
        if sender.is_closed() {
            return Err(());
        }

        let Some(event) = state.watcher.poll_forever(None) else {
            unreachable!("poll_forever returns None only when not initialised.");
        };

        let is_delete = match &event.data {
            EventData::Vnode(Vnode::Delete) => true,
            EventData::Vnode(_) => false,
            EventData::Error(e) => {
                error!("kqueue: error event: {e}");
                break;
            }
            data => {
                error!("kqueue: Unexpected data: {data:?}");
                continue;
            }
        };

        let fd = match &event.ident {
            Ident::Fd(fd) => *fd,
            i => {
                error!("kqueue: Unexpected ident: {i:?}.");
                continue;
            }
        };

        if fd == state.root_file.as_raw_fd() {
            if is_delete {
                error!("kqueue: Storage directory was deleted; bailing.");
                return Err(());
            }

            if sender.blocking_send(Event::Storage).is_err() {
                return Err(());
            }

            if rescan_collections(sender, path, &mut state).is_err() {
                return Err(());
            }
        } else if let Some((collection_path, href)) = state
            .collections
            .iter()
            .find(|(_, w)| w.file.as_raw_fd() == fd)
            .map(|(p, w)| (p.clone(), w.href.clone()))
        {
            let kind = if is_delete {
                state.remove_collection(&collection_path);
                EventKind::Delete
            } else {
                EventKind::Change
            };

            if sender.blocking_send(Event::Collection(href, kind)).is_err() {
                return Err(());
            }
        } else {
            warn!("kqueue: Received event for untracked fd: {fd}");
        }
    }

    Ok(())
}

fn init(path: &Utf8PathBuf) -> Result<State, ()> {
    let mut watcher = Watcher::new().map_err(|e| {
        error!("kqueue: Failed to create watcher: {e}");
    })?;

    let root_file = File::open(path).map_err(|e| {
        error!("kqueue: Failed to open storage root: {e}");
    })?;
    watcher
        .add_file(&root_file, EventFilter::EVFILT_VNODE, VNODE_FLAGS)
        .map_err(|e| {
            error!("kqueue: Failed to watch storage root: {e}");
        })?;

    let mut state = State {
        watcher,
        root_file,
        collections: HashMap::new(),
    };

    // Logs errors internally.
    if discover_collections(path, &mut state).is_err() {
        return Err(());
    }

    if let Err(e) = state.watcher.watch() {
        error!("kqueue: Failed to start watcher: {e}");
        return Err(());
    }

    Ok(state)
}

/// Find collections in the storage and register those which are not already registered.
fn discover_collections(path: &Utf8PathBuf, state: &mut State) -> Result<(), ()> {
    let entries = match read_dir(path) {
        Ok(e) => e,
        Err(e) => {
            error!("kqueue: Failed to read storage root: {e}");
            return Err(());
        }
    };

    for entry in entries {
        let entry = match entry {
            Ok(e) => e,
            Err(e) => {
                warn!("kqueue: Failed to read directory entry: {e}");
                continue;
            }
        };

        let metadata = match entry.metadata() {
            Ok(m) => m,
            Err(e) => {
                warn!(
                    "kqueue: Failed to read metadata for {}: {e}",
                    entry.path().display()
                );
                continue;
            }
        };

        if !metadata.is_dir() {
            continue;
        }

        let name = entry.file_name();
        if name.as_encoded_bytes().first() == Some(&b'.') {
            continue;
        }
        let Ok(name) = name.into_string() else {
            warn!(
                "kqueue: Ignoring directory with non-UTF8 name: {}.",
                entry.path().display()
            );
            continue;
        };

        let collection_path = path.join(&name);
        if state.collections.contains_key(&collection_path) {
            trace!("kqueue: Already watching collection: {name}");
            continue;
        }

        if let Err(e) = state.add_collection(&collection_path, name.clone()) {
            warn!("kqueue: Failed to watch collection {name}: {e}");
        } else {
            debug!("kqueue: Watching collection: {name}");
        }
    }

    Ok(())
}

/// Scan for collection changes and update watcher.
fn rescan_collections(
    sender: &Sender<Event>,
    path: &Utf8PathBuf,
    state: &mut State,
) -> Result<(), ()> {
    let to_remove: Vec<Utf8PathBuf> = state
        .collections
        .keys()
        .filter(|p| !p.exists())
        .cloned()
        .collect();

    for collection_path in to_remove {
        if let Some(watch) = state.remove_collection(&collection_path) {
            sender
                .blocking_send(Event::Collection(watch.href, EventKind::Delete))
                .map_err(|_| ())?;
        }
    }

    discover_collections(path, state)?;

    if let Err(e) = state.watcher.watch() {
        error!("kqueue: Failed to re-register watches: {e}");
        return Err(());
    }

    Ok(())
}