1use std::sync::Arc;
17
18use camel_api::CamelError;
19use tokio::sync::Mutex;
20
21use crate::route::RouteDefinition;
22use crate::route_controller::RouteControllerInternal;
23
24#[derive(Debug, Clone, PartialEq)]
26pub enum ReloadAction {
27 Swap { route_id: String },
32 Restart { route_id: String },
34 Add { route_id: String },
36 Remove { route_id: String },
38}
39
40#[derive(Debug)]
44pub struct ReloadError {
45 pub route_id: String,
46 pub action: String,
47 pub error: CamelError,
48}
49
50pub fn compute_reload_actions(
52 new_definitions: &[RouteDefinition],
53 controller: &dyn RouteControllerInternal,
54) -> Vec<ReloadAction> {
55 let active_ids: std::collections::HashSet<String> =
56 controller.route_ids().into_iter().collect();
57 let mut new_ids = std::collections::HashSet::new();
58 let mut actions = Vec::new();
59
60 for def in new_definitions {
61 let route_id = def.route_id().to_string();
62 new_ids.insert(route_id.clone());
63
64 if active_ids.contains(&route_id) {
65 if let Some(from_uri) = controller.route_from_uri(&route_id) {
67 if from_uri != def.from_uri() {
68 actions.push(ReloadAction::Restart { route_id });
69 } else {
70 actions.push(ReloadAction::Swap { route_id });
72 }
73 }
74 } else {
75 actions.push(ReloadAction::Add { route_id });
76 }
77 }
78
79 for id in &active_ids {
81 if !new_ids.contains(id) {
82 actions.push(ReloadAction::Remove {
83 route_id: id.clone(),
84 });
85 }
86 }
87
88 actions
89}
90
91pub async fn execute_reload_actions(
98 actions: Vec<ReloadAction>,
99 mut new_definitions: Vec<RouteDefinition>,
100 controller: &Arc<Mutex<dyn RouteControllerInternal>>,
101) -> Vec<ReloadError> {
102 let mut errors = Vec::new();
103
104 for action in actions {
105 match action {
106 ReloadAction::Swap { route_id } => {
107 let def_pos = new_definitions
109 .iter()
110 .position(|d| d.route_id() == route_id);
111 let def = match def_pos {
112 Some(pos) => new_definitions.remove(pos),
113 None => {
114 errors.push(ReloadError {
115 route_id: route_id.clone(),
116 action: "Swap".into(),
117 error: CamelError::RouteError(format!(
118 "No definition found for route '{}'",
119 route_id
120 )),
121 });
122 continue;
123 }
124 };
125
126 let pipeline = controller.lock().await.compile_route_definition(def);
128 match pipeline {
129 Ok(p) => {
130 let result = controller.lock().await.swap_pipeline(&route_id, p);
131 if let Err(e) = result {
132 errors.push(ReloadError {
133 route_id,
134 action: "Swap".into(),
135 error: e,
136 });
137 } else {
138 tracing::info!(route_id = %route_id, "hot-reload: swapped route pipeline");
139 }
140 }
141 Err(e) => {
142 errors.push(ReloadError {
143 route_id,
144 action: "Swap (compile)".into(),
145 error: e,
146 });
147 }
148 }
149 }
150
151 ReloadAction::Add { route_id } => {
152 let def_pos = new_definitions
153 .iter()
154 .position(|d| d.route_id() == route_id);
155 let def = match def_pos {
156 Some(pos) => new_definitions.remove(pos),
157 None => {
158 errors.push(ReloadError {
159 route_id: route_id.clone(),
160 action: "Add".into(),
161 error: CamelError::RouteError(format!(
162 "No definition found for route '{}'",
163 route_id
164 )),
165 });
166 continue;
167 }
168 };
169
170 let add_result = controller.lock().await.add_route(def);
171 match add_result {
172 Ok(()) => {
173 let start_result =
175 controller.lock().await.start_route_reload(&route_id).await;
176 if let Err(e) = start_result {
177 errors.push(ReloadError {
178 route_id,
179 action: "Add (start)".into(),
180 error: e,
181 });
182 } else {
183 tracing::info!(route_id = %route_id, "hot-reload: added and started route");
184 }
185 }
186 Err(e) => {
187 errors.push(ReloadError {
188 route_id,
189 action: "Add".into(),
190 error: e,
191 });
192 }
193 }
194 }
195
196 ReloadAction::Remove { route_id } => {
197 let stop_result = controller.lock().await.stop_route_reload(&route_id).await;
199 if let Err(e) = stop_result {
200 errors.push(ReloadError {
201 route_id: route_id.clone(),
202 action: "Remove (stop)".into(),
203 error: e,
204 });
205 continue;
206 }
207
208 let remove_result = controller.lock().await.remove_route(&route_id);
209 match remove_result {
210 Ok(()) => {
211 tracing::info!(route_id = %route_id, "hot-reload: stopped and removed route");
212 }
213 Err(e) => {
214 errors.push(ReloadError {
215 route_id,
216 action: "Remove".into(),
217 error: e,
218 });
219 }
220 }
221 }
222
223 ReloadAction::Restart { route_id } => {
224 tracing::info!(route_id = %route_id, "hot-reload: restarting route (from_uri changed)");
225
226 let def_pos = new_definitions
227 .iter()
228 .position(|d| d.route_id() == route_id);
229 let def = match def_pos {
230 Some(pos) => new_definitions.remove(pos),
231 None => {
232 errors.push(ReloadError {
233 route_id: route_id.clone(),
234 action: "Restart".into(),
235 error: CamelError::RouteError(format!(
236 "No definition found for route '{}'",
237 route_id
238 )),
239 });
240 continue;
241 }
242 };
243
244 let stop_result = controller.lock().await.stop_route_reload(&route_id).await;
246 if let Err(e) = stop_result {
247 errors.push(ReloadError {
248 route_id,
249 action: "Restart (stop)".into(),
250 error: e,
251 });
252 continue;
253 }
254
255 if let Err(e) = controller.lock().await.remove_route(&route_id) {
256 errors.push(ReloadError {
257 route_id,
258 action: "Restart (remove)".into(),
259 error: e,
260 });
261 continue;
262 }
263
264 if let Err(e) = controller.lock().await.add_route(def) {
265 errors.push(ReloadError {
266 route_id,
267 action: "Restart (add)".into(),
268 error: e,
269 });
270 continue;
271 }
272
273 let start_result = controller.lock().await.start_route_reload(&route_id).await;
274 if let Err(e) = start_result {
275 errors.push(ReloadError {
276 route_id,
277 action: "Restart (start)".into(),
278 error: e,
279 });
280 } else {
281 tracing::info!(route_id = %route_id, "hot-reload: route restarted successfully");
282 }
283 }
284 }
285 }
286
287 errors
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293 use crate::registry::Registry;
294 use crate::route_controller::DefaultRouteController;
295 use camel_api::RouteController;
296 use std::sync::Arc;
297
298 fn make_controller() -> DefaultRouteController {
299 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
300 let mut controller = DefaultRouteController::new(registry);
301 let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
302 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
303 ));
304 controller.set_self_ref(controller_arc);
305 controller
306 }
307
308 #[test]
309 fn test_new_route_detected_as_add() {
310 let controller = make_controller();
311 let defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("new-route")];
312 let actions = compute_reload_actions(&defs, &controller);
313 assert_eq!(
314 actions,
315 vec![ReloadAction::Add {
316 route_id: "new-route".into()
317 }]
318 );
319 }
320
321 #[test]
322 fn test_removed_route_detected() {
323 let mut controller = make_controller();
324 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("old-route");
325 controller.add_route(def).unwrap();
326
327 let actions = compute_reload_actions(&[], &controller);
328 assert_eq!(
329 actions,
330 vec![ReloadAction::Remove {
331 route_id: "old-route".into()
332 }]
333 );
334 }
335
336 #[test]
337 fn test_same_from_uri_detected_as_swap() {
338 let mut controller = make_controller();
339 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
340 controller.add_route(def).unwrap();
341
342 let new_defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route")];
343 let actions = compute_reload_actions(&new_defs, &controller);
344 assert_eq!(
345 actions,
346 vec![ReloadAction::Swap {
347 route_id: "my-route".into()
348 }]
349 );
350 }
351
352 #[test]
353 fn test_changed_from_uri_detected_as_restart() {
354 let mut controller = make_controller();
355 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
356 controller.add_route(def).unwrap();
357
358 let new_defs =
359 vec![RouteDefinition::new("timer:tock?period=500", vec![]).with_route_id("my-route")];
360 let actions = compute_reload_actions(&new_defs, &controller);
361 assert_eq!(
362 actions,
363 vec![ReloadAction::Restart {
364 route_id: "my-route".into()
365 }]
366 );
367 }
368
369 #[tokio::test]
373 async fn test_execute_add_action_inserts_route() {
374 use crate::CamelContext;
375 use camel_component_timer::TimerComponent;
376
377 let mut ctx = CamelContext::new();
378 ctx.register_component(TimerComponent::new());
379 ctx.start().await.unwrap();
380
381 let def = RouteDefinition::new("timer:tick?period=50&repeatCount=1", vec![])
382 .with_route_id("exec-add-test");
383 let actions = vec![ReloadAction::Add {
384 route_id: "exec-add-test".into(),
385 }];
386 let errors = execute_reload_actions(actions, vec![def], ctx.route_controller()).await;
387 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
388
389 assert_eq!(ctx.route_controller().lock().await.route_count(), 1);
390
391 ctx.stop().await.unwrap();
392 }
393
394 #[tokio::test]
395 async fn test_execute_remove_action_deletes_route() {
396 use crate::CamelContext;
397 use camel_component_timer::TimerComponent;
398
399 let mut ctx = CamelContext::new();
400 ctx.register_component(TimerComponent::new());
401 ctx.start().await.unwrap();
402
403 let def =
405 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-remove-test");
406 ctx.route_controller().lock().await.add_route(def).unwrap();
407 ctx.route_controller()
408 .lock()
409 .await
410 .start_route_reload("exec-remove-test")
411 .await
412 .unwrap();
413 assert_eq!(ctx.route_controller().lock().await.route_count(), 1);
414
415 let actions = vec![ReloadAction::Remove {
416 route_id: "exec-remove-test".into(),
417 }];
418 let errors = execute_reload_actions(actions, vec![], ctx.route_controller()).await;
419 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
420
421 assert_eq!(ctx.route_controller().lock().await.route_count(), 0);
422
423 ctx.stop().await.unwrap();
424 }
425
426 #[tokio::test]
427 async fn test_execute_swap_action_replaces_pipeline() {
428 use crate::CamelContext;
429 use camel_component_timer::TimerComponent;
430
431 let mut ctx = CamelContext::new();
432 ctx.register_component(TimerComponent::new());
433 ctx.start().await.unwrap();
434
435 let def =
437 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
438 ctx.route_controller().lock().await.add_route(def).unwrap();
439
440 let new_def =
442 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
443 let actions = vec![ReloadAction::Swap {
444 route_id: "exec-swap-test".into(),
445 }];
446 let errors = execute_reload_actions(actions, vec![new_def], ctx.route_controller()).await;
447 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
448
449 assert_eq!(ctx.route_controller().lock().await.route_count(), 1);
451
452 ctx.stop().await.unwrap();
453 }
454}