dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! A thread-safe circuit breaker.

use parking_lot::Mutex;
use std::time::{Duration, Instant};

/// The three classic circuit-breaker states.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
    /// Requests flow normally.
    Closed,
    /// Requests are rejected immediately.
    Open,
    /// A trial request is allowed to test recovery.
    HalfOpen,
}

#[derive(Debug)]
struct Inner {
    state: CircuitState,
    consecutive_failures: u32,
    opened_at: Option<Instant>,
}

/// Isolates a failing dependency by short-circuiting calls once failures cross
/// a threshold, then probing for recovery after a cooldown.
///
/// Lifecycle: `Closed` → (failures ≥ `failure_threshold`) → `Open` →
/// (after `cooldown`) → `HalfOpen` → (`success_threshold` successes) → `Closed`,
/// or back to `Open` on a failed probe.
pub struct CircuitBreaker {
    failure_threshold: u32,
    success_threshold: u32,
    cooldown: Duration,
    inner: Mutex<Inner>,
}

impl CircuitBreaker {
    /// Create a breaker.
    ///
    /// * `failure_threshold` — consecutive failures that trip the breaker open.
    /// * `success_threshold` — consecutive successes in half-open that re-close it.
    /// * `cooldown` — how long to stay open before allowing a probe.
    pub fn new(failure_threshold: u32, success_threshold: u32, cooldown: Duration) -> Self {
        CircuitBreaker {
            failure_threshold: failure_threshold.max(1),
            success_threshold: success_threshold.max(1),
            cooldown,
            inner: Mutex::new(Inner {
                state: CircuitState::Closed,
                consecutive_failures: 0,
                opened_at: None,
            }),
        }
    }

    /// The current state, accounting for an elapsed cooldown (which transitions
    /// `Open` → `HalfOpen` lazily on inspection).
    pub fn state(&self) -> CircuitState {
        let mut inner = self.inner.lock();
        self.refresh(&mut inner);
        inner.state
    }

    /// Whether a call should be permitted right now.
    ///
    /// Returns `true` in `Closed` and `HalfOpen` (allowing a probe), `false`
    /// while `Open`.
    pub fn allow_request(&self) -> bool {
        let mut inner = self.inner.lock();
        self.refresh(&mut inner);
        !matches!(inner.state, CircuitState::Open)
    }

    /// Record a successful call.
    pub fn record_success(&self) {
        let mut inner = self.inner.lock();
        match inner.state {
            CircuitState::HalfOpen => {
                // Reuse the failure counter as a success counter in half-open.
                inner.consecutive_failures += 1;
                if inner.consecutive_failures >= self.success_threshold {
                    inner.state = CircuitState::Closed;
                    inner.consecutive_failures = 0;
                    inner.opened_at = None;
                }
            }
            _ => {
                inner.state = CircuitState::Closed;
                inner.consecutive_failures = 0;
                inner.opened_at = None;
            }
        }
    }

    /// Record a failed call.
    pub fn record_failure(&self) {
        let mut inner = self.inner.lock();
        match inner.state {
            CircuitState::HalfOpen => {
                // A failed probe re-opens immediately.
                inner.state = CircuitState::Open;
                inner.consecutive_failures = 0;
                inner.opened_at = Some(Instant::now());
            }
            _ => {
                inner.consecutive_failures += 1;
                if inner.consecutive_failures >= self.failure_threshold {
                    inner.state = CircuitState::Open;
                    inner.opened_at = Some(Instant::now());
                }
            }
        }
    }

    /// Move an expired `Open` breaker into `HalfOpen`.
    fn refresh(&self, inner: &mut Inner) {
        if inner.state == CircuitState::Open {
            if let Some(opened) = inner.opened_at {
                if opened.elapsed() >= self.cooldown {
                    inner.state = CircuitState::HalfOpen;
                    // Reset counter so successes can accumulate toward closing.
                    inner.consecutive_failures = 0;
                }
            }
        }
    }
}

impl Default for CircuitBreaker {
    fn default() -> Self {
        CircuitBreaker::new(5, 2, Duration::from_secs(30))
    }
}