do-over 0.1.0

Async resilience policies for Rust inspired by Polly
Documentation
//! Bulkhead policy for concurrency limiting.
//!
//! The bulkhead policy limits the number of concurrent executions,
//! preventing resource exhaustion and isolating failures.
//!
//! # Examples
//!
//! ```rust
//! use do_over::{policy::Policy, bulkhead::Bulkhead, error::DoOverError};
//! use std::time::Duration;
//!
//! # async fn example() -> Result<(), DoOverError<std::io::Error>> {
//! // Allow maximum 10 concurrent executions
//! let bulkhead = Bulkhead::new(10);
//!
//! // With queue timeout - wait up to 1 second for a slot
//! let bulkhead = Bulkhead::new(10)
//!     .with_queue_timeout(Duration::from_secs(1));
//!
//! match bulkhead.execute(|| async {
//!     Ok::<_, DoOverError<std::io::Error>>("completed")
//! }).await {
//!     Ok(result) => println!("Success: {}", result),
//!     Err(DoOverError::BulkheadFull) => println!("No capacity available"),
//!     Err(e) => println!("Error: {:?}", e),
//! }
//! # Ok(())
//! # }
//! ```

use tokio::sync::{Semaphore, OwnedSemaphorePermit};
use std::{time::Duration, sync::Arc};
use crate::{policy::Policy, error::DoOverError};

/// A policy that limits concurrent executions.
///
/// The bulkhead uses a semaphore to control how many operations can run
/// simultaneously. When the limit is reached, new requests are either
/// rejected immediately or queued (if a queue timeout is configured).
///
/// # Examples
///
/// ```rust
/// use do_over::{policy::Policy, bulkhead::Bulkhead, error::DoOverError};
/// use std::time::Duration;
///
/// # async fn example() {
/// // Basic bulkhead - reject immediately when full
/// let bulkhead = Bulkhead::new(5);
///
/// // With queue timeout - wait for a slot
/// let bulkhead = Bulkhead::new(5)
///     .with_queue_timeout(Duration::from_millis(500));
/// # }
/// ```
pub struct Bulkhead {
    semaphore: Arc<Semaphore>,
    queue_timeout: Option<Duration>,
}

impl Clone for Bulkhead {
    fn clone(&self) -> Self {
        Self {
            semaphore: Arc::clone(&self.semaphore),
            queue_timeout: self.queue_timeout,
        }
    }
}

impl Bulkhead {
    /// Create a new bulkhead with the specified concurrency limit.
    ///
    /// Without a queue timeout, requests are rejected immediately when
    /// no slots are available.
    ///
    /// # Arguments
    ///
    /// * `max_concurrent` - Maximum number of concurrent executions
    ///
    /// # Examples
    ///
    /// ```rust
    /// use do_over::bulkhead::Bulkhead;
    ///
    /// // Allow up to 10 concurrent operations
    /// let bulkhead = Bulkhead::new(10);
    /// ```
    pub fn new(max_concurrent: usize) -> Self {
        Self { semaphore: Arc::new(Semaphore::new(max_concurrent)), queue_timeout: None }
    }

    /// Set a queue timeout for waiting on a slot.
    ///
    /// When set, requests will wait up to the specified duration for a slot
    /// to become available before being rejected.
    ///
    /// # Arguments
    ///
    /// * `timeout` - Maximum time to wait for a slot
    ///
    /// # Examples
    ///
    /// ```rust
    /// use do_over::bulkhead::Bulkhead;
    /// use std::time::Duration;
    ///
    /// let bulkhead = Bulkhead::new(10)
    ///     .with_queue_timeout(Duration::from_secs(1));
    /// ```
    pub fn with_queue_timeout(mut self, timeout: Duration) -> Self {
        self.queue_timeout = Some(timeout);
        self
    }

    async fn acquire(&self) -> Result<OwnedSemaphorePermit, DoOverError<()>> {
        match self.queue_timeout {
            Some(t) => tokio::time::timeout(t, self.semaphore.clone().acquire_owned())
                .await
                .map_err(|_| DoOverError::BulkheadFull)?
                .map_err(|_| DoOverError::BulkheadFull),
            None => self.semaphore.clone().try_acquire_owned()
                .map_err(|_| DoOverError::BulkheadFull),
        }
    }
}

#[async_trait::async_trait]
impl<E> Policy<DoOverError<E>> for Bulkhead
where
    E: Send + Sync,
{
    async fn execute<F, Fut, T>(&self, f: F) -> Result<T, DoOverError<E>>
    where
        F: Fn() -> Fut + Send + Sync,
        Fut: std::future::Future<Output = Result<T, DoOverError<E>>> + Send,
        T: Send,
    {
        let permit = self.acquire().await.map_err(|_| DoOverError::BulkheadFull)?;
        let r = f().await;
        drop(permit);
        r
    }
}