actflow 0.1.6

A lightweight, event-driven workflow engine written in Rust.
Documentation
//! Graceful shutdown coordination.
//!
//! Provides a thread-safe mechanism for signaling and waiting on shutdown events.
//! Used by Context and Channel to coordinate graceful termination.

use std::{
    fmt::Debug,
    future::Future,
    sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    },
};

use tokio::sync::Notify;

/// Thread-safe shutdown coordinator.
///
/// Provides a way to signal shutdown and wait for the signal asynchronously.
/// Uses an atomic boolean for the flag and tokio's Notify for async waiting.
///
/// # Example
///
/// ```rust,ignore
/// let shutdown = Shutdown::new();
///
/// // In one task: wait for shutdown
/// tokio::select! {
///     _ = shutdown.wait() => {
///         println!("Shutdown signaled");
///     }
///     _ = do_work() => {}
/// }
///
/// // In another task: signal shutdown
/// shutdown.shutdown();
/// ```
#[derive(Clone)]
pub struct Shutdown {
    /// Tuple of (shutdown flag, notification mechanism).
    /// Both wrapped in Arc for thread-safe sharing.
    inner: Arc<(AtomicBool, Notify)>,
}

#[allow(unused)]
impl Shutdown {
    /// Creates a new shutdown coordinator
    pub fn new() -> Self {
        Self {
            inner: Arc::new((AtomicBool::new(false), Notify::new())),
        }
    }

    /// Initiates shutdown
    pub fn shutdown(&self) {
        self.inner.0.swap(true, Ordering::Relaxed);
        self.inner.1.notify_waiters();
    }

    /// Resets the shutdown state
    pub fn reset(&self) {
        self.inner.0.store(false, Ordering::Relaxed);
    }

    /// Checks if shutdown has been initiated
    pub fn is_terminated(&self) -> bool {
        self.inner.0.load(Ordering::Relaxed)
    }

    /// Waits for shutdown to be initiated
    pub fn wait(&'_ self) -> impl Future<Output = ()> + Send + 'static {
        let inner = self.inner.clone();
        async move {
            // Initial fast check
            if !inner.0.load(Ordering::Relaxed) {
                let notify = inner.1.notified();
                // Second check to avoid "missed wakeup" race conditions
                if !inner.0.load(Ordering::Relaxed) {
                    notify.await;
                }
            }
        }
    }
}

impl Default for Shutdown {
    /// Creates a new shutdown coordinator with default settings
    fn default() -> Self {
        Self::new()
    }
}

impl Debug for Shutdown {
    /// Provides debug formatting for the shutdown coordinator
    fn fmt(
        &self,
        f: &mut std::fmt::Formatter<'_>,
    ) -> std::fmt::Result {
        f.debug_struct("Shutdown").field("is_terminated", &self.inner.0.load(Ordering::Relaxed)).finish()
    }
}