camel-core 0.5.7

Core engine for rust-camel
Documentation
use std::time::{Duration, Instant};

use camel_api::CamelError;
use tracing::{debug, info, warn};

use crate::context::RuntimeExecutionHandle;

const DRAIN_POLL_INTERVAL_MS: u64 = 50;

pub(crate) enum DrainResult {
    Drained,
    Timeout,
}

pub(crate) async fn drain_route(
    route_id: &str,
    action: &str,
    controller: &RuntimeExecutionHandle,
    timeout: Duration,
) -> DrainResult {
    let timeout_ms = timeout.as_millis() as u64;

    let initial = match controller.in_flight_count(route_id).await {
        Ok(0) => {
            debug!(
                route_id = %route_id,
                action = %action,
                "hot-reload: no in-flight exchanges, proceeding"
            );
            return DrainResult::Drained;
        }
        Ok(n) => n,
        Err(CamelError::RouteError(_)) => {
            debug!(
                route_id = %route_id,
                action = %action,
                "hot-reload: route not found during drain, skipping"
            );
            return DrainResult::Drained;
        }
        Err(e) => {
            debug!(
                route_id = %route_id,
                action = %action,
                error = %e,
                "hot-reload: error querying in-flight count, skipping drain"
            );
            return DrainResult::Drained;
        }
    };

    info!(
        route_id = %route_id,
        action = %action,
        in_flight = initial,
        "hot-reload: consumer stopped, draining pipeline"
    );

    let start = Instant::now();

    loop {
        tokio::time::sleep(Duration::from_millis(DRAIN_POLL_INTERVAL_MS)).await;

        match controller.in_flight_count(route_id).await {
            Ok(0) => {
                let waited = start.elapsed().as_millis() as u64;
                info!(
                    route_id = %route_id,
                    action = %action,
                    waited_ms = waited,
                    in_flight = 0,
                    "hot-reload: route drained"
                );
                return DrainResult::Drained;
            }
            Ok(n) => {
                if start.elapsed() >= timeout {
                    let waited = start.elapsed().as_millis() as u64;
                    warn!(
                        route_id = %route_id,
                        action = %action,
                        waited_ms = waited,
                        timeout_ms = timeout_ms,
                        remaining = n,
                        "hot-reload: drain timeout expired, forcing stop"
                    );
                    return DrainResult::Timeout;
                }
            }
            Err(_) => {
                debug!(
                    route_id = %route_id,
                    action = %action,
                    "hot-reload: route not found during drain, skipping"
                );
                return DrainResult::Drained;
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn drain_result_variants_exist() {
        let _ = DrainResult::Drained;
        let _ = DrainResult::Timeout;
    }

    #[test]
    fn poll_interval_is_50ms() {
        assert_eq!(DRAIN_POLL_INTERVAL_MS, 50);
    }

    #[tokio::test]
    #[ignore = "RuntimeExecutionHandle::new_for_test is not available; covered by integration tests in Task 6"]
    async fn drain_returns_immediately_when_zero_in_flight() {
        panic!("requires RuntimeExecutionHandle::new_for_test");
    }

    #[tokio::test]
    #[ignore = "RuntimeExecutionHandle::new_for_test is not available; covered by integration tests in Task 6"]
    async fn drain_returns_drained_for_nonexistent_route_regardless_of_timeout() {
        panic!("requires RuntimeExecutionHandle::new_for_test");
    }
}