coreon-eip 0.1.0

Enterprise Integration Pattern processors for camel-rs.
Documentation
//! Throttle — allow at most `max` exchanges per `period`, blocking callers
//! when the window is full.
//!
//! Implementation: fixed-window counter. Simpler than leaky-bucket, adequate
//! for coarse shaping. If smoother pacing is needed, later add a token-bucket
//! variant.

use async_trait::async_trait;
use coreon_core::{Exchange, Processor, Result};
use std::{
    sync::Arc,
    time::{Duration, Instant},
};
use tokio::sync::Mutex;

struct Window {
    start: Instant,
    count: u32,
}

pub struct Throttle {
    max: u32,
    period: Duration,
    window: Mutex<Window>,
}

impl Throttle {
    pub fn new(max: u32, period: Duration) -> Arc<Self> {
        Arc::new(Self {
            max,
            period,
            window: Mutex::new(Window {
                start: Instant::now(),
                count: 0,
            }),
        })
    }
}

#[async_trait]
impl Processor for Throttle {
    async fn process(&self, _exchange: &mut Exchange) -> Result<()> {
        loop {
            // Compute sleep outside the lock to avoid holding it across .await.
            let sleep_for = {
                let mut w = self.window.lock().await;
                let now = Instant::now();
                if now.duration_since(w.start) >= self.period {
                    w.start = now;
                    w.count = 0;
                }
                if w.count < self.max {
                    w.count += 1;
                    return Ok(());
                }
                self.period.saturating_sub(now.duration_since(w.start))
            };
            tokio::time::sleep(sleep_for).await;
        }
    }
}