camel-component-ws 0.13.0

WebSocket component for rust-camel
Documentation
use async_trait::async_trait;
use camel_api::{AsyncHealthCheck, CheckResult};
use camel_component_api::CamelError;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;

type ProbeFuture = Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send>>;

trait WsHealthProbe: Send + Sync {
    fn probe(&self) -> ProbeFuture;
}

struct TcpListenerProbe {
    host: String,
    port: u16,
}

impl TcpListenerProbe {
    fn new(host: String, port: u16) -> Self {
        let probe_host = match host.as_str() {
            "0.0.0.0" | "localhost" => "127.0.0.1".to_string(),
            "::" | "[::]" => "::1".to_string(),
            _ => host,
        };
        Self {
            host: probe_host,
            port,
        }
    }
}

impl WsHealthProbe for TcpListenerProbe {
    fn probe(&self) -> ProbeFuture {
        let addr = format!("{}:{}", self.host, self.port);
        Box::pin(async move {
            tokio::net::TcpStream::connect(&addr)
                .await
                .map(|_| ())
                .map_err(|e| {
                    CamelError::ProcessorError(format!(
                        "WebSocket listener health check failed for '{}': {}",
                        addr, e
                    ))
                })
        })
    }
}

pub struct WsHealthCheck {
    probe: Box<dyn WsHealthProbe>,
    timeout: Duration,
}

impl WsHealthCheck {
    pub fn new(host: String, port: u16) -> Self {
        Self {
            probe: Box::new(TcpListenerProbe::new(host, port)),
            timeout: Duration::from_secs(3),
        }
    }

    #[cfg(test)]
    fn with_probe_for_tests(probe: Box<dyn WsHealthProbe>, timeout: Duration) -> Self {
        Self { probe, timeout }
    }
}

#[async_trait]
impl AsyncHealthCheck for WsHealthCheck {
    fn name(&self) -> &str {
        "ws"
    }

    async fn check(&self) -> CheckResult {
        match tokio::time::timeout(self.timeout, self.probe.probe()).await {
            Ok(Ok(())) => CheckResult::healthy(self.name()),
            Ok(Err(err)) => CheckResult::unhealthy(self.name(), &err.to_string()),
            Err(_) => CheckResult::unhealthy(self.name(), "WebSocket listener probe timed out"),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use camel_api::HealthStatus;

    struct MockProbe {
        responder: std::sync::Arc<dyn Fn() -> ProbeFuture + Send + Sync>,
    }

    impl MockProbe {
        fn new<F>(f: F) -> Self
        where
            F: Fn() -> ProbeFuture + Send + Sync + 'static,
        {
            Self {
                responder: std::sync::Arc::new(f),
            }
        }
    }

    impl WsHealthProbe for MockProbe {
        fn probe(&self) -> ProbeFuture {
            (self.responder)()
        }
    }

    #[tokio::test]
    async fn ws_health_check_healthy_when_probe_succeeds() {
        let probe = Box::new(MockProbe::new(|| Box::pin(async { Ok(()) })));
        let check = WsHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
        let result = check.check().await;
        assert_eq!(result.name, "ws");
        assert_eq!(result.status, HealthStatus::Healthy);
        assert!(result.message.is_none());
    }

    #[tokio::test]
    async fn ws_health_check_unhealthy_when_probe_fails() {
        let probe = Box::new(MockProbe::new(|| {
            Box::pin(async { Err(CamelError::ProcessorError("simulated ws error".to_string())) })
        }));
        let check = WsHealthCheck::with_probe_for_tests(probe, Duration::from_millis(50));
        let result = check.check().await;
        assert_eq!(result.name, "ws");
        assert_eq!(result.status, HealthStatus::Unhealthy);
        assert!(
            result
                .message
                .as_deref()
                .is_some_and(|m| m.contains("simulated ws error"))
        );
    }

    #[tokio::test]
    async fn ws_health_check_unhealthy_when_probe_times_out() {
        let probe = Box::new(MockProbe::new(|| {
            Box::pin(async {
                tokio::time::sleep(Duration::from_millis(50)).await;
                Ok(())
            })
        }));
        let check = WsHealthCheck::with_probe_for_tests(probe, Duration::from_millis(5));
        let result = check.check().await;
        assert_eq!(result.name, "ws");
        assert_eq!(result.status, HealthStatus::Unhealthy);
        assert_eq!(
            result.message.as_deref(),
            Some("WebSocket listener probe timed out")
        );
    }
}