Skip to main content

camel_core/
reload.rs

1//! Route reload coordinator.
2//!
3//! Compares a new set of route definitions against the currently running routes
4//! and computes the minimal set of actions: SWAP, RESTART, ADD, or REMOVE.
5//!
6//! # Why no Skip action?
7//!
8//! We cannot reliably detect when a route is truly unchanged because:
9//! 1. `BoxProcessor` (the pipeline) is type-erased and cannot be compared for equality
10//! 2. Partial comparison (only metadata) risks false negatives—silently ignoring changes
11//!
12//! Since `Swap` is an atomic pointer swap via ArcSwap (nanoseconds), the cost of
13//! "unnecessary" swaps is negligible. Simplicity and correctness outweigh the
14//! theoretical benefit of Skip.
15
16use crate::route::RouteDefinition;
17use crate::route_controller::DefaultRouteController;
18
19/// Actions the coordinator can take per route.
20#[derive(Debug, Clone, PartialEq)]
21pub enum ReloadAction {
22    /// Pipeline may have changed — atomic swap (zero-downtime).
23    ///
24    /// This action is taken when the route exists and `from_uri` is unchanged.
25    /// Even if the pipeline is identical, swapping is harmless (atomic pointer swap).
26    Swap { route_id: String },
27    /// Consumer (from_uri) changed — must stop and restart.
28    Restart { route_id: String },
29    /// New route — add and start.
30    Add { route_id: String },
31    /// Route removed from config — stop and delete.
32    Remove { route_id: String },
33}
34
35/// Compute the diff between new definitions and active routes.
36pub fn compute_reload_actions(
37    new_definitions: &[RouteDefinition],
38    controller: &DefaultRouteController,
39) -> Vec<ReloadAction> {
40    let active_ids: std::collections::HashSet<String> =
41        controller.route_ids().into_iter().collect();
42    let mut new_ids = std::collections::HashSet::new();
43    let mut actions = Vec::new();
44
45    for def in new_definitions {
46        let route_id = def.route_id().to_string();
47        new_ids.insert(route_id.clone());
48
49        if active_ids.contains(&route_id) {
50            // Route exists — check what changed
51            if let Some(from_uri) = controller.route_from_uri(&route_id) {
52                if from_uri != def.from_uri() {
53                    actions.push(ReloadAction::Restart { route_id });
54                } else {
55                    // from_uri same — assume steps changed (we can't cheaply diff BoxProcessor)
56                    actions.push(ReloadAction::Swap { route_id });
57                }
58            }
59        } else {
60            actions.push(ReloadAction::Add { route_id });
61        }
62    }
63
64    // Routes in active but not in new definitions → remove
65    for id in &active_ids {
66        if !new_ids.contains(id) {
67            actions.push(ReloadAction::Remove {
68                route_id: id.clone(),
69            });
70        }
71    }
72
73    actions
74}
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79    use crate::registry::Registry;
80    use camel_api::RouteController;
81    use std::sync::Arc;
82    use tokio::sync::Mutex;
83
84    fn make_controller() -> DefaultRouteController {
85        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
86        let mut controller = DefaultRouteController::new(registry);
87        let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
88            DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
89        ));
90        controller.set_self_ref(controller_arc);
91        controller
92    }
93
94    #[test]
95    fn test_new_route_detected_as_add() {
96        let controller = make_controller();
97        let defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("new-route")];
98        let actions = compute_reload_actions(&defs, &controller);
99        assert_eq!(
100            actions,
101            vec![ReloadAction::Add {
102                route_id: "new-route".into()
103            }]
104        );
105    }
106
107    #[test]
108    fn test_removed_route_detected() {
109        let mut controller = make_controller();
110        let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("old-route");
111        controller.add_route(def).unwrap();
112
113        let actions = compute_reload_actions(&[], &controller);
114        assert_eq!(
115            actions,
116            vec![ReloadAction::Remove {
117                route_id: "old-route".into()
118            }]
119        );
120    }
121
122    #[test]
123    fn test_same_from_uri_detected_as_swap() {
124        let mut controller = make_controller();
125        let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
126        controller.add_route(def).unwrap();
127
128        let new_defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route")];
129        let actions = compute_reload_actions(&new_defs, &controller);
130        assert_eq!(
131            actions,
132            vec![ReloadAction::Swap {
133                route_id: "my-route".into()
134            }]
135        );
136    }
137
138    #[test]
139    fn test_changed_from_uri_detected_as_restart() {
140        let mut controller = make_controller();
141        let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
142        controller.add_route(def).unwrap();
143
144        let new_defs =
145            vec![RouteDefinition::new("timer:tock?period=500", vec![]).with_route_id("my-route")];
146        let actions = compute_reload_actions(&new_defs, &controller);
147        assert_eq!(
148            actions,
149            vec![ReloadAction::Restart {
150                route_id: "my-route".into()
151            }]
152        );
153    }
154}