bzzz-core 0.1.0

Bzzz core library - Declarative orchestration engine for AI Agents
Documentation
//! Execution control utilities
//!
//! Provides timeout, cancel, and cleanup functionality for runtime executions.

use std::sync::Arc;
use std::time::Duration;

use tokio::sync::RwLock;
use tokio::time::{timeout, Instant};

use crate::RunError;

/// Cancellation token for execution control
#[derive(Clone)]
pub struct CancellationToken {
    cancelled: Arc<RwLock<bool>>,
}

impl CancellationToken {
    /// Create a new cancellation token
    pub fn new() -> Self {
        CancellationToken {
            cancelled: Arc::new(RwLock::new(false)),
        }
    }

    /// Check if cancelled
    pub async fn is_cancelled(&self) -> bool {
        *self.cancelled.read().await
    }

    /// Cancel the token
    pub async fn cancel(&self) {
        *self.cancelled.write().await = true;
    }

    /// Reset the token
    pub async fn reset(&self) {
        *self.cancelled.write().await = false;
    }
}

impl Default for CancellationToken {
    fn default() -> Self {
        Self::new()
    }
}

/// Execution controller for managing run lifecycle
pub struct ExecutionController {
    cancellation_token: CancellationToken,
    start_time: Instant,
    timeout_duration: Option<Duration>,
}

impl ExecutionController {
    /// Create a new execution controller
    pub fn new() -> Self {
        ExecutionController {
            cancellation_token: CancellationToken::new(),
            start_time: Instant::now(),
            timeout_duration: None,
        }
    }

    /// Create with timeout
    pub fn with_timeout(duration: Duration) -> Self {
        ExecutionController {
            cancellation_token: CancellationToken::new(),
            start_time: Instant::now(),
            timeout_duration: Some(duration),
        }
    }

    /// Check if cancelled
    pub async fn is_cancelled(&self) -> bool {
        self.cancellation_token.is_cancelled().await
    }

    /// Cancel the execution
    pub async fn cancel(&self) {
        self.cancellation_token.cancel().await;
    }

    /// Check if timed out
    pub fn is_timed_out(&self) -> bool {
        if let Some(duration) = self.timeout_duration {
            self.start_time.elapsed() >= duration
        } else {
            false
        }
    }

    /// Get elapsed time
    pub fn elapsed(&self) -> Duration {
        self.start_time.elapsed()
    }

    /// Get remaining time before timeout
    pub fn remaining(&self) -> Option<Duration> {
        self.timeout_duration.map(|d| {
            let elapsed = self.start_time.elapsed();
            if elapsed >= d {
                Duration::ZERO
            } else {
                d - elapsed
            }
        })
    }

    /// Wait for completion with timeout
    pub async fn wait_with_timeout<F, T>(&self, future: F) -> Result<T, RunError>
    where
        F: std::future::Future<Output = Result<T, RunError>>,
    {
        if let Some(duration) = self.timeout_duration {
            match timeout(duration, future).await {
                Ok(result) => result,
                Err(_) => Err(RunError::Timeout { after: duration }),
            }
        } else {
            future.await
        }
    }

    /// Get the cancellation token
    pub fn cancellation_token(&self) -> CancellationToken {
        self.cancellation_token.clone()
    }
}

impl Default for ExecutionController {
    fn default() -> Self {
        Self::new()
    }
}

/// Cleanup manager for execution resources
pub struct CleanupManager {
    cleanup_count: std::sync::atomic::AtomicUsize,
}

impl CleanupManager {
    /// Create a new cleanup manager
    pub fn new() -> Self {
        CleanupManager {
            cleanup_count: std::sync::atomic::AtomicUsize::new(0),
        }
    }

    /// Register a cleanup (increments counter)
    pub fn register(&self) {
        self.cleanup_count
            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
    }

    /// Run cleanup (decrements counter)
    pub async fn cleanup(&self) -> usize {
        self.cleanup_count
            .swap(0, std::sync::atomic::Ordering::SeqCst)
    }

    /// Get pending cleanup count
    pub fn pending(&self) -> usize {
        self.cleanup_count.load(std::sync::atomic::Ordering::SeqCst)
    }
}

impl Default for CleanupManager {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_cancellation_token() {
        let token = CancellationToken::new();
        assert!(!token.is_cancelled().await);

        token.cancel().await;
        assert!(token.is_cancelled().await);

        token.reset().await;
        assert!(!token.is_cancelled().await);
    }

    #[tokio::test]
    async fn test_execution_controller() {
        let controller = ExecutionController::new();
        assert!(!controller.is_timed_out());
        assert!(controller.remaining().is_none());
    }

    #[tokio::test]
    async fn test_execution_controller_with_timeout() {
        let controller = ExecutionController::with_timeout(Duration::from_millis(100));
        assert!(!controller.is_timed_out());

        tokio::time::sleep(Duration::from_millis(150)).await;
        assert!(controller.is_timed_out());
        assert_eq!(controller.remaining(), Some(Duration::ZERO));
    }

    #[tokio::test]
    async fn test_execution_controller_cancel() {
        let controller = ExecutionController::new();
        assert!(!controller.is_cancelled().await);

        controller.cancel().await;
        assert!(controller.is_cancelled().await);
    }

    #[tokio::test]
    async fn test_wait_with_timeout_success() {
        let controller = ExecutionController::with_timeout(Duration::from_secs(10));
        let future = async { Ok::<_, RunError>(42) };
        let result = controller.wait_with_timeout(future).await;
        assert_eq!(result.unwrap(), 42);
    }

    #[tokio::test]
    async fn test_wait_with_timeout_exceeded() {
        let controller = ExecutionController::with_timeout(Duration::from_millis(10));
        let future = async {
            tokio::time::sleep(Duration::from_millis(100)).await;
            Ok::<_, RunError>(42)
        };
        let result = controller.wait_with_timeout(future).await;
        assert!(matches!(result, Err(RunError::Timeout { .. })));
    }

    #[test]
    fn test_elapsed_time() {
        let controller = ExecutionController::new();
        let before = controller.elapsed();
        std::thread::sleep(Duration::from_millis(10));
        let after = controller.elapsed();
        assert!(after > before);
    }

    #[test]
    fn test_cleanup_manager() {
        let manager = CleanupManager::new();
        assert_eq!(manager.pending(), 0);

        manager.register();
        manager.register();
        assert_eq!(manager.pending(), 2);

        // Cleanup is async, but we can check the count synchronously
        assert_eq!(manager.pending(), 2);
    }
}