Skip to main content

camel_core/
reload.rs

1//! Route reload coordinator.
2//!
3//! Compares a new set of route definitions against the currently running routes
4//! and computes the minimal set of actions: SWAP, RESTART, ADD, or REMOVE.
5//!
6//! # Why no Skip action?
7//!
8//! We cannot reliably detect when a route is truly unchanged because:
9//! 1. `BoxProcessor` (the pipeline) is type-erased and cannot be compared for equality
10//! 2. Partial comparison (only metadata) risks false negatives—silently ignoring changes
11//!
12//! Since `Swap` is an atomic pointer swap via ArcSwap (nanoseconds), the cost of
13//! "unnecessary" swaps is negligible. Simplicity and correctness outweigh the
14//! theoretical benefit of Skip.
15
16use std::sync::Arc;
17
18use camel_api::CamelError;
19use tokio::sync::Mutex;
20
21use crate::route::RouteDefinition;
22use crate::route_controller::RouteControllerInternal;
23
24/// Actions the coordinator can take per route.
25#[derive(Debug, Clone, PartialEq)]
26pub enum ReloadAction {
27    /// Pipeline may have changed — atomic swap (zero-downtime).
28    ///
29    /// This action is taken when the route exists and `from_uri` is unchanged.
30    /// Even if the pipeline is identical, swapping is harmless (atomic pointer swap).
31    Swap { route_id: String },
32    /// Consumer (from_uri) changed — must stop and restart.
33    Restart { route_id: String },
34    /// New route — add and start.
35    Add { route_id: String },
36    /// Route removed from config — stop and delete.
37    Remove { route_id: String },
38}
39
40/// A non-fatal error during reload action execution.
41///
42/// The watcher logs these and continues watching for future changes.
43#[derive(Debug)]
44pub struct ReloadError {
45    pub route_id: String,
46    pub action: String,
47    pub error: CamelError,
48}
49
50/// Compute the diff between new definitions and active routes.
51pub fn compute_reload_actions(
52    new_definitions: &[RouteDefinition],
53    controller: &dyn RouteControllerInternal,
54) -> Vec<ReloadAction> {
55    let active_ids: std::collections::HashSet<String> =
56        controller.route_ids().into_iter().collect();
57    let mut new_ids = std::collections::HashSet::new();
58    let mut actions = Vec::new();
59
60    for def in new_definitions {
61        let route_id = def.route_id().to_string();
62        new_ids.insert(route_id.clone());
63
64        if active_ids.contains(&route_id) {
65            // Route exists — check what changed
66            if let Some(from_uri) = controller.route_from_uri(&route_id) {
67                if from_uri != def.from_uri() {
68                    actions.push(ReloadAction::Restart { route_id });
69                } else {
70                    // from_uri same — assume steps changed (we can't cheaply diff BoxProcessor)
71                    actions.push(ReloadAction::Swap { route_id });
72                }
73            }
74        } else {
75            actions.push(ReloadAction::Add { route_id });
76        }
77    }
78
79    // Routes in active but not in new definitions → remove
80    for id in &active_ids {
81        if !new_ids.contains(id) {
82            actions.push(ReloadAction::Remove {
83                route_id: id.clone(),
84            });
85        }
86    }
87
88    actions
89}
90
91/// Execute a list of reload actions against a live controller.
92///
93/// Non-fatal: errors for individual routes are collected and returned.
94/// The caller should log them as warnings and continue watching.
95///
96/// `new_definitions` is consumed — each definition is moved to the controller for Add/Swap/Restart.
97pub async fn execute_reload_actions(
98    actions: Vec<ReloadAction>,
99    mut new_definitions: Vec<RouteDefinition>,
100    controller: &Arc<Mutex<dyn RouteControllerInternal>>,
101) -> Vec<ReloadError> {
102    let mut errors = Vec::new();
103
104    for action in actions {
105        match action {
106            ReloadAction::Swap { route_id } => {
107                // Find and remove the matching definition by route_id
108                let def_pos = new_definitions
109                    .iter()
110                    .position(|d| d.route_id() == route_id);
111                let def = match def_pos {
112                    Some(pos) => new_definitions.remove(pos),
113                    None => {
114                        errors.push(ReloadError {
115                            route_id: route_id.clone(),
116                            action: "Swap".into(),
117                            error: CamelError::RouteError(format!(
118                                "No definition found for route '{}'",
119                                route_id
120                            )),
121                        });
122                        continue;
123                    }
124                };
125
126                // Compile new pipeline then swap — two separate lock acquisitions
127                let pipeline = controller.lock().await.compile_route_definition(def);
128                match pipeline {
129                    Ok(p) => {
130                        let result = controller.lock().await.swap_pipeline(&route_id, p);
131                        if let Err(e) = result {
132                            errors.push(ReloadError {
133                                route_id,
134                                action: "Swap".into(),
135                                error: e,
136                            });
137                        } else {
138                            tracing::info!(route_id = %route_id, "hot-reload: swapped route pipeline");
139                        }
140                    }
141                    Err(e) => {
142                        errors.push(ReloadError {
143                            route_id,
144                            action: "Swap (compile)".into(),
145                            error: e,
146                        });
147                    }
148                }
149            }
150
151            ReloadAction::Add { route_id } => {
152                let def_pos = new_definitions
153                    .iter()
154                    .position(|d| d.route_id() == route_id);
155                let def = match def_pos {
156                    Some(pos) => new_definitions.remove(pos),
157                    None => {
158                        errors.push(ReloadError {
159                            route_id: route_id.clone(),
160                            action: "Add".into(),
161                            error: CamelError::RouteError(format!(
162                                "No definition found for route '{}'",
163                                route_id
164                            )),
165                        });
166                        continue;
167                    }
168                };
169
170                let add_result = controller.lock().await.add_route(def);
171                match add_result {
172                    Ok(()) => {
173                        // Start the new route — lock held across await via tokio::sync::Mutex
174                        let start_result =
175                            controller.lock().await.start_route_reload(&route_id).await;
176                        if let Err(e) = start_result {
177                            errors.push(ReloadError {
178                                route_id,
179                                action: "Add (start)".into(),
180                                error: e,
181                            });
182                        } else {
183                            tracing::info!(route_id = %route_id, "hot-reload: added and started route");
184                        }
185                    }
186                    Err(e) => {
187                        errors.push(ReloadError {
188                            route_id,
189                            action: "Add".into(),
190                            error: e,
191                        });
192                    }
193                }
194            }
195
196            ReloadAction::Remove { route_id } => {
197                // Stop first, then remove
198                let stop_result = controller.lock().await.stop_route_reload(&route_id).await;
199                if let Err(e) = stop_result {
200                    errors.push(ReloadError {
201                        route_id: route_id.clone(),
202                        action: "Remove (stop)".into(),
203                        error: e,
204                    });
205                    continue;
206                }
207
208                let remove_result = controller.lock().await.remove_route(&route_id);
209                match remove_result {
210                    Ok(()) => {
211                        tracing::info!(route_id = %route_id, "hot-reload: stopped and removed route");
212                    }
213                    Err(e) => {
214                        errors.push(ReloadError {
215                            route_id,
216                            action: "Remove".into(),
217                            error: e,
218                        });
219                    }
220                }
221            }
222
223            ReloadAction::Restart { route_id } => {
224                tracing::info!(route_id = %route_id, "hot-reload: restarting route (from_uri changed)");
225
226                let def_pos = new_definitions
227                    .iter()
228                    .position(|d| d.route_id() == route_id);
229                let def = match def_pos {
230                    Some(pos) => new_definitions.remove(pos),
231                    None => {
232                        errors.push(ReloadError {
233                            route_id: route_id.clone(),
234                            action: "Restart".into(),
235                            error: CamelError::RouteError(format!(
236                                "No definition found for route '{}'",
237                                route_id
238                            )),
239                        });
240                        continue;
241                    }
242                };
243
244                // Stop → remove → add → start
245                let stop_result = controller.lock().await.stop_route_reload(&route_id).await;
246                if let Err(e) = stop_result {
247                    errors.push(ReloadError {
248                        route_id,
249                        action: "Restart (stop)".into(),
250                        error: e,
251                    });
252                    continue;
253                }
254
255                if let Err(e) = controller.lock().await.remove_route(&route_id) {
256                    errors.push(ReloadError {
257                        route_id,
258                        action: "Restart (remove)".into(),
259                        error: e,
260                    });
261                    continue;
262                }
263
264                if let Err(e) = controller.lock().await.add_route(def) {
265                    errors.push(ReloadError {
266                        route_id,
267                        action: "Restart (add)".into(),
268                        error: e,
269                    });
270                    continue;
271                }
272
273                let start_result = controller.lock().await.start_route_reload(&route_id).await;
274                if let Err(e) = start_result {
275                    errors.push(ReloadError {
276                        route_id,
277                        action: "Restart (start)".into(),
278                        error: e,
279                    });
280                } else {
281                    tracing::info!(route_id = %route_id, "hot-reload: route restarted successfully");
282                }
283            }
284        }
285    }
286
287    errors
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use crate::registry::Registry;
294    use crate::route_controller::DefaultRouteController;
295    use camel_api::RouteController;
296    use std::sync::Arc;
297
298    fn make_controller() -> DefaultRouteController {
299        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
300        let mut controller = DefaultRouteController::new(registry);
301        let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
302            DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
303        ));
304        controller.set_self_ref(controller_arc);
305        controller
306    }
307
308    #[test]
309    fn test_new_route_detected_as_add() {
310        let controller = make_controller();
311        let defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("new-route")];
312        let actions = compute_reload_actions(&defs, &controller);
313        assert_eq!(
314            actions,
315            vec![ReloadAction::Add {
316                route_id: "new-route".into()
317            }]
318        );
319    }
320
321    #[test]
322    fn test_removed_route_detected() {
323        let mut controller = make_controller();
324        let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("old-route");
325        controller.add_route(def).unwrap();
326
327        let actions = compute_reload_actions(&[], &controller);
328        assert_eq!(
329            actions,
330            vec![ReloadAction::Remove {
331                route_id: "old-route".into()
332            }]
333        );
334    }
335
336    #[test]
337    fn test_same_from_uri_detected_as_swap() {
338        let mut controller = make_controller();
339        let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
340        controller.add_route(def).unwrap();
341
342        let new_defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route")];
343        let actions = compute_reload_actions(&new_defs, &controller);
344        assert_eq!(
345            actions,
346            vec![ReloadAction::Swap {
347                route_id: "my-route".into()
348            }]
349        );
350    }
351
352    #[test]
353    fn test_changed_from_uri_detected_as_restart() {
354        let mut controller = make_controller();
355        let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
356        controller.add_route(def).unwrap();
357
358        let new_defs =
359            vec![RouteDefinition::new("timer:tock?period=500", vec![]).with_route_id("my-route")];
360        let actions = compute_reload_actions(&new_defs, &controller);
361        assert_eq!(
362            actions,
363            vec![ReloadAction::Restart {
364                route_id: "my-route".into()
365            }]
366        );
367    }
368
369    // ---- execute_reload_actions tests ----
370    // These use a full CamelContext with real components so that start/stop work.
371
372    #[tokio::test]
373    async fn test_execute_add_action_inserts_route() {
374        use crate::CamelContext;
375        use camel_component_timer::TimerComponent;
376
377        let mut ctx = CamelContext::new();
378        ctx.register_component(TimerComponent::new());
379        ctx.start().await.unwrap();
380
381        let def = RouteDefinition::new("timer:tick?period=50&repeatCount=1", vec![])
382            .with_route_id("exec-add-test");
383        let actions = vec![ReloadAction::Add {
384            route_id: "exec-add-test".into(),
385        }];
386        let errors = execute_reload_actions(actions, vec![def], ctx.route_controller()).await;
387        assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
388
389        assert_eq!(ctx.route_controller().lock().await.route_count(), 1);
390
391        ctx.stop().await.unwrap();
392    }
393
394    #[tokio::test]
395    async fn test_execute_remove_action_deletes_route() {
396        use crate::CamelContext;
397        use camel_component_timer::TimerComponent;
398
399        let mut ctx = CamelContext::new();
400        ctx.register_component(TimerComponent::new());
401        ctx.start().await.unwrap();
402
403        // Add a route first via the controller directly, then start it
404        let def =
405            RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-remove-test");
406        ctx.route_controller().lock().await.add_route(def).unwrap();
407        ctx.route_controller()
408            .lock()
409            .await
410            .start_route_reload("exec-remove-test")
411            .await
412            .unwrap();
413        assert_eq!(ctx.route_controller().lock().await.route_count(), 1);
414
415        let actions = vec![ReloadAction::Remove {
416            route_id: "exec-remove-test".into(),
417        }];
418        let errors = execute_reload_actions(actions, vec![], ctx.route_controller()).await;
419        assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
420
421        assert_eq!(ctx.route_controller().lock().await.route_count(), 0);
422
423        ctx.stop().await.unwrap();
424    }
425
426    #[tokio::test]
427    async fn test_execute_swap_action_replaces_pipeline() {
428        use crate::CamelContext;
429        use camel_component_timer::TimerComponent;
430
431        let mut ctx = CamelContext::new();
432        ctx.register_component(TimerComponent::new());
433        ctx.start().await.unwrap();
434
435        // Add a route via the controller directly
436        let def =
437            RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
438        ctx.route_controller().lock().await.add_route(def).unwrap();
439
440        // Swap with same from_uri (exercises compile + swap_pipeline code path)
441        let new_def =
442            RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
443        let actions = vec![ReloadAction::Swap {
444            route_id: "exec-swap-test".into(),
445        }];
446        let errors = execute_reload_actions(actions, vec![new_def], ctx.route_controller()).await;
447        assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
448
449        // Route should still exist after swap
450        assert_eq!(ctx.route_controller().lock().await.route_count(), 1);
451
452        ctx.stop().await.unwrap();
453    }
454}