halldyll-core 0.1.0

Core scraping engine for Halldyll - high-performance async web scraper for AI agents
Documentation
//! Graceful Shutdown - Clean shutdown with in-flight request handling
//!
//! Provides graceful shutdown support for cloud deployments:
//! - Stops accepting new requests
//! - Waits for in-flight requests to complete
//! - Has a timeout for forced shutdown
//!
//! ## Usage
//!
//! ```rust,ignore
//! let shutdown = GracefulShutdown::new(Duration::from_secs(30));
//! 
//! // In your request handler
//! let guard = shutdown.start_request()?;
//! // ... do work ...
//! drop(guard); // Request complete
//!
//! // When shutting down
//! shutdown.initiate();
//! shutdown.wait_for_completion().await;
//! ```

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use tokio::time::timeout;

/// Graceful shutdown coordinator
pub struct GracefulShutdown {
    /// Is shutdown initiated?
    shutting_down: AtomicBool,
    /// Number of in-flight requests
    in_flight: AtomicUsize,
    /// Notify when all requests complete
    all_complete: Arc<Notify>,
    /// Shutdown timeout
    shutdown_timeout: Duration,
    /// Is shutdown complete?
    completed: AtomicBool,
}

impl GracefulShutdown {
    /// Create new shutdown coordinator
    pub fn new(shutdown_timeout: Duration) -> Self {
        Self {
            shutting_down: AtomicBool::new(false),
            in_flight: AtomicUsize::new(0),
            all_complete: Arc::new(Notify::new()),
            shutdown_timeout,
            completed: AtomicBool::new(false),
        }
    }

    /// Default with 30 second timeout
    pub fn default_timeout() -> Self {
        Self::new(Duration::from_secs(30))
    }

    /// Is shutdown in progress?
    pub fn is_shutting_down(&self) -> bool {
        self.shutting_down.load(Ordering::SeqCst)
    }

    /// Number of in-flight requests
    pub fn in_flight_count(&self) -> usize {
        self.in_flight.load(Ordering::SeqCst)
    }

    /// Start a new request - returns None if shutting down
    pub fn start_request(&self) -> Option<RequestGuard<'_>> {
        if self.is_shutting_down() {
            return None;
        }

        self.in_flight.fetch_add(1, Ordering::SeqCst);

        // Double-check after incrementing
        if self.is_shutting_down() {
            self.finish_request();
            return None;
        }

        Some(RequestGuard {
            shutdown: self,
        })
    }

    /// Mark a request as complete (internal)
    fn finish_request(&self) {
        let prev = self.in_flight.fetch_sub(1, Ordering::SeqCst);
        if prev == 1 && self.is_shutting_down() {
            // We were the last request
            self.all_complete.notify_waiters();
        }
    }

    /// Initiate graceful shutdown
    pub fn initiate(&self) {
        self.shutting_down.store(true, Ordering::SeqCst);
        
        // If no in-flight requests, notify immediately
        if self.in_flight.load(Ordering::SeqCst) == 0 {
            self.all_complete.notify_waiters();
        }
    }

    /// Wait for all in-flight requests to complete (or timeout)
    pub async fn wait_for_completion(&self) -> ShutdownResult {
        if !self.is_shutting_down() {
            self.initiate();
        }

        if self.in_flight.load(Ordering::SeqCst) == 0 {
            self.completed.store(true, Ordering::SeqCst);
            return ShutdownResult::Clean;
        }

        match timeout(self.shutdown_timeout, self.all_complete.notified()).await {
            Ok(_) => {
                self.completed.store(true, Ordering::SeqCst);
                ShutdownResult::Clean
            }
            Err(_) => {
                let remaining = self.in_flight.load(Ordering::SeqCst);
                self.completed.store(true, Ordering::SeqCst);
                ShutdownResult::Timeout { remaining_requests: remaining }
            }
        }
    }

    /// Is shutdown complete?
    pub fn is_completed(&self) -> bool {
        self.completed.load(Ordering::SeqCst)
    }

    /// Get shutdown status
    pub fn status(&self) -> ShutdownStatus {
        ShutdownStatus {
            shutting_down: self.is_shutting_down(),
            in_flight: self.in_flight_count(),
            completed: self.is_completed(),
        }
    }
}

/// Result of shutdown wait
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ShutdownResult {
    /// All requests completed cleanly
    Clean,
    /// Timeout reached with remaining requests
    Timeout {
        /// Number of requests that were still in-flight
        remaining_requests: usize,
    },
}

impl ShutdownResult {
    /// Was shutdown clean?
    pub fn is_clean(&self) -> bool {
        matches!(self, ShutdownResult::Clean)
    }
}

/// RAII guard for tracking in-flight requests
pub struct RequestGuard<'a> {
    shutdown: &'a GracefulShutdown,
}

impl Drop for RequestGuard<'_> {
    fn drop(&mut self) {
        self.shutdown.finish_request();
    }
}

/// Shutdown status
#[derive(Debug, Clone)]
pub struct ShutdownStatus {
    /// Is shutdown initiated?
    pub shutting_down: bool,
    /// Number of in-flight requests
    pub in_flight: usize,
    /// Is shutdown complete?
    pub completed: bool,
}

/// Shutdown signal handler for Unix systems
#[cfg(unix)]
pub async fn wait_for_shutdown_signal() {
    use tokio::signal::unix::{signal, SignalKind};

    let mut sigterm = signal(SignalKind::terminate()).expect("Failed to install SIGTERM handler");
    let mut sigint = signal(SignalKind::interrupt()).expect("Failed to install SIGINT handler");

    tokio::select! {
        _ = sigterm.recv() => {
            tracing::info!("Received SIGTERM, initiating graceful shutdown");
        }
        _ = sigint.recv() => {
            tracing::info!("Received SIGINT, initiating graceful shutdown");
        }
    }
}

/// Shutdown signal handler for Windows
#[cfg(windows)]
pub async fn wait_for_shutdown_signal() {
    use tokio::signal::ctrl_c;

    ctrl_c().await.expect("Failed to listen for Ctrl+C");
    tracing::info!("Received Ctrl+C, initiating graceful shutdown");
}

/// Helper to run shutdown in a task
pub async fn run_with_graceful_shutdown<F, Fut>(
    shutdown: Arc<GracefulShutdown>,
    main_task: F,
) -> ShutdownResult
where
    F: FnOnce() -> Fut,
    Fut: std::future::Future<Output = ()>,
{
    // Spawn signal handler
    let shutdown_clone = shutdown.clone();
    tokio::spawn(async move {
        wait_for_shutdown_signal().await;
        shutdown_clone.initiate();
    });

    // Run main task
    main_task().await;

    // Wait for completion
    shutdown.wait_for_completion().await
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_clean_shutdown() {
        let shutdown = GracefulShutdown::new(Duration::from_secs(5));

        // Start some requests
        let guard1 = shutdown.start_request().unwrap();
        let guard2 = shutdown.start_request().unwrap();

        assert_eq!(shutdown.in_flight_count(), 2);

        // Initiate shutdown
        shutdown.initiate();

        // New requests should be rejected
        assert!(shutdown.start_request().is_none());

        // Complete requests
        drop(guard1);
        assert_eq!(shutdown.in_flight_count(), 1);
        drop(guard2);
        assert_eq!(shutdown.in_flight_count(), 0);

        // Wait should complete immediately
        let result = shutdown.wait_for_completion().await;
        assert_eq!(result, ShutdownResult::Clean);
    }

    #[tokio::test]
    async fn test_shutdown_timeout() {
        let shutdown = GracefulShutdown::new(Duration::from_millis(100));

        // Start a request but don't complete it
        let _guard = shutdown.start_request().unwrap();

        shutdown.initiate();

        let result = shutdown.wait_for_completion().await;
        assert!(matches!(result, ShutdownResult::Timeout { remaining_requests: 1 }));
    }

    #[tokio::test]
    async fn test_empty_shutdown() {
        let shutdown = GracefulShutdown::new(Duration::from_secs(5));

        let result = shutdown.wait_for_completion().await;
        assert_eq!(result, ShutdownResult::Clean);
    }

    #[test]
    fn test_status() {
        let shutdown = GracefulShutdown::default_timeout();

        let status = shutdown.status();
        assert!(!status.shutting_down);
        assert_eq!(status.in_flight, 0);
        assert!(!status.completed);
    }
}