camel-processor 0.6.3

Message processors for rust-camel
Documentation
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use tower::Service;

use camel_api::{CamelError, DelayConfig, Exchange, Value};

#[derive(Clone)]
pub struct DelayerService {
    config: DelayConfig,
}

impl DelayerService {
    pub fn new(config: DelayConfig) -> Self {
        Self { config }
    }

    fn effective_delay_ms(&self, exchange: &Exchange) -> u64 {
        if let Some(ref header) = self.config.dynamic_header
            && let Some(Value::Number(n)) = exchange.input.header(header)
            && let Some(n) = n.as_f64()
            && n >= 0.0
        {
            return n as u64;
        }
        self.config.delay_ms
    }
}

impl Service<Exchange> for DelayerService {
    type Response = Exchange;
    type Error = CamelError;
    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, exchange: Exchange) -> Self::Future {
        let delay_ms = self.effective_delay_ms(&exchange);
        Box::pin(async move {
            if delay_ms > 0 {
                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
            }
            Ok(exchange)
        })
    }
}

#[cfg(test)]
mod tests {
    use std::time::{Duration, Instant};

    use camel_api::DelayConfig;
    use camel_api::{Exchange, Message, Value};
    use tower::{Service, ServiceExt};

    use super::DelayerService;

    #[tokio::test]
    async fn test_delayer_fixed_delay() {
        let config = DelayConfig::new(100);
        let mut svc = DelayerService::new(config);

        let start = Instant::now();
        let ex = Exchange::new(Message::new("test"));
        let result = svc.ready().await.unwrap().call(ex).await;
        let elapsed = start.elapsed();

        assert!(result.is_ok());
        assert!(elapsed >= Duration::from_millis(90));
    }

    #[tokio::test]
    async fn test_delayer_dynamic_from_header() {
        let config = DelayConfig::new(5000).with_dynamic_header("CamelDelayMs");
        let mut svc = DelayerService::new(config);

        let mut ex = Exchange::new(Message::new("test"));
        ex.input
            .set_header("CamelDelayMs", Value::Number(50.into()));

        let start = Instant::now();
        let result = svc.ready().await.unwrap().call(ex).await;
        let elapsed = start.elapsed();

        assert!(result.is_ok());
        assert!(elapsed >= Duration::from_millis(40));
        assert!(elapsed < Duration::from_millis(200));
    }

    #[tokio::test]
    async fn test_delayer_zero_delay() {
        let config = DelayConfig::new(0);
        let mut svc = DelayerService::new(config);

        let start = Instant::now();
        let ex = Exchange::new(Message::new("test"));
        let result = svc.ready().await.unwrap().call(ex).await;
        let elapsed = start.elapsed();

        assert!(result.is_ok());
        assert!(elapsed < Duration::from_millis(50));
    }

    #[tokio::test]
    async fn test_delayer_zero_from_header() {
        let config = DelayConfig::new(5000).with_dynamic_header("CamelDelayMs");
        let mut svc = DelayerService::new(config);

        let mut ex = Exchange::new(Message::new("test"));
        ex.input.set_header("CamelDelayMs", Value::Number(0.into()));

        let start = Instant::now();
        let result = svc.ready().await.unwrap().call(ex).await;
        let elapsed = start.elapsed();

        assert!(result.is_ok());
        assert!(elapsed < Duration::from_millis(50));
    }

    #[tokio::test]
    async fn test_delayer_fallback_on_invalid_header() {
        let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
        let mut svc = DelayerService::new(config);

        let mut ex = Exchange::new(Message::new("test"));
        ex.input
            .set_header("CamelDelayMs", Value::String("not a number".into()));

        let start = Instant::now();
        let result = svc.ready().await.unwrap().call(ex).await;
        let elapsed = start.elapsed();

        assert!(result.is_ok());
        assert!(elapsed >= Duration::from_millis(70));
    }

    #[tokio::test]
    async fn test_delayer_negative_header() {
        let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
        let mut svc = DelayerService::new(config);

        let mut ex = Exchange::new(Message::new("test"));
        ex.input
            .set_header("CamelDelayMs", Value::Number((-100).into()));

        let start = Instant::now();
        let result = svc.ready().await.unwrap().call(ex).await;
        let elapsed = start.elapsed();

        assert!(result.is_ok());
        assert!(elapsed >= Duration::from_millis(70));
    }

    #[tokio::test]
    async fn test_delayer_missing_header() {
        let config = DelayConfig::new(80).with_dynamic_header("CamelDelayMs");
        let mut svc = DelayerService::new(config);

        let start = Instant::now();
        let ex = Exchange::new(Message::new("test"));
        let result = svc.ready().await.unwrap().call(ex).await;
        let elapsed = start.elapsed();

        assert!(result.is_ok());
        assert!(elapsed >= Duration::from_millis(70));
    }
}