vstorage 0.7.0

Common API for various icalendar/vcard storages.
Documentation
// Copyright 2025 Hugo Osvaldo Barrera
//
// SPDX-License-Identifier: EUPL-1.2

//! Operation ordering and synchronisation primitives.
//!
//! Handles for coordinating ordering between dependant operations.
//!
//! The main entry points are [`completion_pair`] and [`DeletionBarrier`].

use std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
};

use tokio::sync::{
    Notify,
    watch::{Receiver, Sender, channel, error::SendError},
};

use super::status::MappingUid;

// FIXME: some ordering operations can probably use Relaxed ordering.

/// Error returned when waiting for a completion that never happens.
#[derive(Debug, Clone, thiserror::Error)]
#[error("completion handle dropped before signaling")]
pub struct CompletionDroppedError;

/// Handle for signalling operation completion and broadcasting the result.
///
/// When a collection operation completes, it broadcasts the assigned mapping UID
/// via this handle. Dependent operations (items, properties) receive the UID
/// through their corresponding [`WaitHandle`].
#[derive(Debug, Clone)]
pub struct CompletionHandle(Sender<Option<MappingUid>>);

impl CompletionHandle {
    /// Broadcast that the operation has completed with the given mapping UID.
    ///
    /// Wakes all tasks waiting on the corresponding [`WaitHandle`], supplying the mapping UID.
    ///
    /// # Errors
    ///
    /// Returns an error if all receivers have been dropped. In practice, this
    /// should not occur during normal operation since operations hold wait handles.
    pub fn complete(&self, mapping_uid: MappingUid) -> Result<(), SendError<Option<MappingUid>>> {
        self.0.send(Some(mapping_uid))
    }
}

/// Handle for waiting on operation completion and receiving the result.
///
/// Operations that depend on collection creation hold a wait handle. Before executing,
/// they await the handle to receive the mapping UID assigned during collection creation.
#[derive(Debug, Clone)]
pub struct WaitHandle(Receiver<Option<MappingUid>>);

impl WaitHandle {
    /// Wait for the corresponding operation to complete and receive its mapping UID.
    ///
    /// Blocks until the operation holding the [`CompletionHandle`] calls
    /// [`CompletionHandle::complete`] with the mapping UID.
    ///
    /// # Errors
    ///
    /// Returns `Err(CompletionDroppedError)` if the completion handle is dropped
    /// before signalling completion. This indicates a programming error in the
    /// operation execution logic.
    pub async fn wait(&mut self) -> Result<MappingUid, CompletionDroppedError> {
        // Wait for value to change from None to Some
        let result = self
            .0
            .wait_for(Option::is_some)
            .await
            .map_err(|_| CompletionDroppedError)?;

        // The value is guaranteed to be Some after wait_for succeeds with is_some predicate
        result.ok_or(CompletionDroppedError)
    }

    /// Try to get the mapping UID without waiting.
    ///
    /// Returns `Some(mapping_uid)` if the operation has already completed,
    /// or `None` if it's still pending.
    #[must_use]
    pub fn try_get(&self) -> Option<MappingUid> {
        *self.0.borrow()
    }
}

/// Create a paired completion and wait handle.
///
/// The completion handle should be held by the operation that will complete.
/// The wait handle can be cloned and distributed to dependent operations.
#[must_use]
pub fn completion_pair() -> (CompletionHandle, WaitHandle) {
    let (sender, receiver) = channel(None);
    (CompletionHandle(sender), WaitHandle(receiver))
}

/// Barrier for coordinating collection deletion with item/property completion.
///
/// Ensures that collection deletion operations wait for all item operations
/// within that collection to complete first.
///
/// The barrier maintains a count of pending operations. Each operation signals
/// completion, decrementing the count. The deletion waits until the count reaches zero.
#[derive(Debug, Clone)]
pub struct DeletionBarrier {
    inner: Arc<DeletionBarrierInner>,
}

#[derive(Debug)]
struct DeletionBarrierInner {
    /// Number of operations that must complete before deletion can proceed.
    count: AtomicUsize,
    /// Notifies waiting deletion operation when count reaches zero.
    notify: Notify,
}

impl DeletionBarrier {
    /// Create a new deletion barrier starting at count 0.
    ///
    /// The count increases each time [`completion_handle`](Self::completion_handle) is called,
    /// and decreases when each handle is dropped.
    #[must_use]
    pub fn new() -> Self {
        Self {
            inner: Arc::new(DeletionBarrierInner {
                count: AtomicUsize::new(0),
                notify: Notify::new(),
            }),
        }
    }

    /// Create a completion handle for signaling operation completion.
    ///
    /// Increments the barrier's counter. When the returned handle is dropped, the counter
    /// is automatically decremented. This ensures proper cleanup even on panic or error.
    ///
    /// Each item or property operation should hold a completion handle which will
    /// automatically signal completion when dropped.
    #[must_use]
    pub fn completion_handle(&self) -> DeletionCompletionHandle {
        self.inner.count.fetch_add(1, Ordering::SeqCst);
        DeletionCompletionHandle {
            inner: Arc::clone(&self.inner),
        }
    }

    /// Create a wait handle for the deletion operation.
    ///
    /// The deletion operation should await [`DeletionWaitHandle::wait`] before
    /// proceeding with the actual deletion.
    #[must_use]
    pub fn wait_handle(&self) -> DeletionWaitHandle {
        DeletionWaitHandle {
            inner: Arc::clone(&self.inner),
        }
    }
}

impl Default for DeletionBarrier {
    fn default() -> Self {
        Self::new()
    }
}

/// Handle for signaling that an operation has completed.
///
/// When this handle is dropped, it automatically decrements the barrier's counter
/// and notifies the deletion operation if all operations have completed.
///
/// Ensures cleanup happens even on panic or early return.
///
/// Cloning this handle increments the barrier's counter, as each clone represents
/// another operation that must complete.
#[derive(Debug)]
pub struct DeletionCompletionHandle {
    inner: Arc<DeletionBarrierInner>,
}

impl Clone for DeletionCompletionHandle {
    fn clone(&self) -> Self {
        self.inner.count.fetch_add(1, Ordering::SeqCst);
        Self {
            inner: Arc::clone(&self.inner),
        }
    }
}

impl Drop for DeletionCompletionHandle {
    fn drop(&mut self) {
        let prev = self.inner.count.fetch_sub(1, Ordering::SeqCst);
        if prev == 1 {
            self.inner.notify.notify_waiters();
        }
    }
}

/// Handle for waiting until all operations have completed.
///
/// The collection deletion operation holds this handle and awaits it before
/// proceeding with the deletion.
#[derive(Debug, Clone)]
pub struct DeletionWaitHandle {
    inner: Arc<DeletionBarrierInner>,
}

impl DeletionWaitHandle {
    /// Wait until all operations have signalled completion.
    ///
    /// blocks until the barrier's counter reaches zero, meaning all
    /// item and property operations for the collection have finished.
    pub async fn wait(&self) {
        loop {
            if self.inner.count.load(Ordering::SeqCst) == 0 {
                return;
            }
            // Wait for notification, then check again (handles spurious wake-ups).
            self.inner.notify.notified().await;
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Note: Full integration tests with actual `MappingUid` values are in src/sync/status.rs
    /// These tests focus on the synchronisation behavior of completion/wait handles.

    #[tokio::test]
    async fn test_completion_handle_wakes_waiters() {
        let (completion, wait) = completion_pair();

        let mut wait_clone = wait.clone();
        let task = tokio::spawn(async move { wait_clone.wait().await });

        // Hack: give task time to settle.
        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;

        completion.complete(MappingUid::new_for_test(0)).unwrap();

        let result = task.await.unwrap().unwrap();
        assert_eq!(result, MappingUid::new_for_test(0));
    }

    #[tokio::test]
    async fn test_try_get_returns_none_before_completion() {
        let (_completion, wait) = completion_pair();
        assert_eq!(wait.try_get(), None);
    }

    #[tokio::test]
    async fn test_wait_handle_is_cloneable() {
        let (completion, wait) = completion_pair();

        let wait1 = wait.clone();
        let wait2 = wait.clone();

        // All clones receive the value.
        completion.complete(MappingUid::new_for_test(42)).unwrap();

        assert_eq!(wait1.try_get(), Some(MappingUid::new_for_test(42)));
        assert_eq!(wait2.try_get(), Some(MappingUid::new_for_test(42)));
    }
}