camel_api/step_lifecycle.rs
1use crate::CamelError;
2use async_trait::async_trait;
3
4/// Why a stateful pipeline step is being shut down.
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub enum StepShutdownReason {
7 /// The route is stopping (`stop_route`).
8 RouteStop,
9 /// The pipeline is being replaced via hot reload (Restart path).
10 HotSwap,
11}
12
13/// Lifecycle hook for **stateful** pipeline steps that own background work
14/// (timers, buckets, gap-detectors, queues) beyond a single `process()` call.
15///
16/// Stateless processors do NOT implement this trait. The runtime collects
17/// `Arc<dyn StepLifecycle>` at compile time and drains them in route order
18/// during `stop_route` and hot-swap. See ADR-0022.
19///
20/// **Why `&self`, not `&mut self`?** `Lifecycle` uses `&mut self` for exclusive
21/// start/stop of services. `StepLifecycle` is dispatched through
22/// `Arc<dyn StepLifecycle>` carried inside `ArcSwap` pipeline snapshots, so it
23/// MUST be `&self` (shared-reference, interior-mutability) for `Arc` cloning and
24/// concurrent snapshots to work. See ADR-0022.
25///
26/// `shutdown` MUST be idempotent. By the time it is called, intake is cancelled
27/// and the pipeline task has been joined, so no `process()` is in flight.
28/// `Err` is best-effort: the runtime logs and continues (it does NOT fail
29/// `stop_route`), mirroring `CamelContext::stop` service handling.
30#[async_trait]
31pub trait StepLifecycle: std::fmt::Debug + Send + Sync + 'static {
32 /// Stable name for logging/diagnostics.
33 fn name(&self) -> &'static str;
34
35 async fn shutdown(&self, reason: StepShutdownReason) -> Result<(), CamelError>;
36}
37
38#[cfg(test)]
39mod tests {
40 use super::*;
41 use std::sync::{Arc, Mutex};
42
43 #[derive(Debug)]
44 struct FakeStep {
45 shutdowns: Mutex<Vec<StepShutdownReason>>,
46 }
47
48 #[async_trait]
49 impl StepLifecycle for FakeStep {
50 fn name(&self) -> &'static str {
51 "fake"
52 }
53 async fn shutdown(&self, reason: StepShutdownReason) -> Result<(), CamelError> {
54 self.shutdowns.lock().unwrap().push(reason);
55 Ok(())
56 }
57 }
58
59 #[tokio::test]
60 async fn dyn_dispatch_works() {
61 // Keep concrete handle for assertion, dispatch through Arc<dyn StepLifecycle>
62 // (the ArcSwap-snapshot shape).
63 let inner = Arc::new(FakeStep {
64 shutdowns: Mutex::new(vec![]),
65 });
66 let step: Arc<dyn StepLifecycle> = inner.clone();
67 step.shutdown(StepShutdownReason::RouteStop).await.unwrap();
68 step.shutdown(StepShutdownReason::HotSwap).await.unwrap();
69 assert_eq!(
70 *inner.shutdowns.lock().unwrap(),
71 vec![StepShutdownReason::RouteStop, StepShutdownReason::HotSwap,]
72 );
73 }
74
75 #[tokio::test]
76 async fn shutdown_err_is_not_fatal() {
77 #[derive(Debug)]
78 struct FailingStep;
79 #[async_trait]
80 impl StepLifecycle for FailingStep {
81 fn name(&self) -> &'static str {
82 "fail"
83 }
84 async fn shutdown(&self, _: StepShutdownReason) -> Result<(), CamelError> {
85 Err(CamelError::ProcessorError("boom".into()))
86 }
87 }
88 let step: Arc<dyn StepLifecycle> = Arc::new(FailingStep);
89 let result = step.shutdown(StepShutdownReason::RouteStop).await;
90 assert!(result.is_err());
91 // But typical drain loop continues (log + skip), see Task 5.
92 }
93}