Skip to main content

camel_api/
step_lifecycle.rs

1use crate::CamelError;
2use async_trait::async_trait;
3
4/// Why a stateful pipeline step is being shut down.
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub enum StepShutdownReason {
7    /// The route is stopping (`stop_route`).
8    RouteStop,
9    /// The pipeline is being replaced via hot reload (Restart path).
10    HotSwap,
11}
12
13/// Lifecycle hook for **stateful** pipeline steps that own background work
14/// (timers, buckets, gap-detectors, queues) beyond a single `process()` call.
15///
16/// Stateless processors do NOT implement this trait. The runtime collects
17/// `Arc<dyn StepLifecycle>` at compile time and drains them in route order
18/// during `stop_route` and hot-swap. See ADR-0022.
19///
20/// **Why `&self`, not `&mut self`?** `Lifecycle` uses `&mut self` for exclusive
21/// start/stop of services. `StepLifecycle` is dispatched through
22/// `Arc<dyn StepLifecycle>` carried inside `ArcSwap` pipeline snapshots, so it
23/// MUST be `&self` (shared-reference, interior-mutability) for `Arc` cloning and
24/// concurrent snapshots to work. See ADR-0022.
25///
26/// `shutdown` MUST be idempotent. By the time it is called, intake is cancelled
27/// and the pipeline task has been joined, so no `process()` is in flight.
28/// `Err` is best-effort: the runtime logs and continues (it does NOT fail
29/// `stop_route`), mirroring `CamelContext::stop` service handling.
30#[async_trait]
31pub trait StepLifecycle: std::fmt::Debug + Send + Sync + 'static {
32    /// Stable name for logging/diagnostics.
33    fn name(&self) -> &'static str;
34
35    async fn shutdown(&self, reason: StepShutdownReason) -> Result<(), CamelError>;
36}
37
38#[cfg(test)]
39mod tests {
40    use super::*;
41    use std::sync::{Arc, Mutex};
42
43    #[derive(Debug)]
44    struct FakeStep {
45        shutdowns: Mutex<Vec<StepShutdownReason>>,
46    }
47
48    #[async_trait]
49    impl StepLifecycle for FakeStep {
50        fn name(&self) -> &'static str {
51            "fake"
52        }
53        async fn shutdown(&self, reason: StepShutdownReason) -> Result<(), CamelError> {
54            self.shutdowns.lock().unwrap().push(reason);
55            Ok(())
56        }
57    }
58
59    #[tokio::test]
60    async fn dyn_dispatch_works() {
61        // Keep concrete handle for assertion, dispatch through Arc<dyn StepLifecycle>
62        // (the ArcSwap-snapshot shape).
63        let inner = Arc::new(FakeStep {
64            shutdowns: Mutex::new(vec![]),
65        });
66        let step: Arc<dyn StepLifecycle> = inner.clone();
67        step.shutdown(StepShutdownReason::RouteStop).await.unwrap();
68        step.shutdown(StepShutdownReason::HotSwap).await.unwrap();
69        assert_eq!(
70            *inner.shutdowns.lock().unwrap(),
71            vec![StepShutdownReason::RouteStop, StepShutdownReason::HotSwap,]
72        );
73    }
74
75    #[tokio::test]
76    async fn shutdown_err_is_not_fatal() {
77        #[derive(Debug)]
78        struct FailingStep;
79        #[async_trait]
80        impl StepLifecycle for FailingStep {
81            fn name(&self) -> &'static str {
82                "fail"
83            }
84            async fn shutdown(&self, _: StepShutdownReason) -> Result<(), CamelError> {
85                Err(CamelError::ProcessorError("boom".into()))
86            }
87        }
88        let step: Arc<dyn StepLifecycle> = Arc::new(FailingStep);
89        let result = step.shutdown(StepShutdownReason::RouteStop).await;
90        assert!(result.is_err());
91        // But typical drain loop continues (log + skip), see Task 5.
92    }
93}