1use std::time::Duration;
7
8use camel_api::CamelError;
9
10use crate::context::RuntimeExecutionHandle;
11use crate::hot_reload::domain::ReloadAction;
12#[cfg(test)]
13use crate::lifecycle::adapters::route_controller::DefaultRouteController;
14use crate::lifecycle::application::route_definition::RouteDefinition;
15
16use super::reload_actions;
17
18pub struct FunctionReloadContext {
19 pub invoker: std::sync::Arc<dyn camel_api::function::FunctionInvoker>,
20 pub generation: u64,
21}
22
23#[derive(Debug)]
27pub struct ReloadError {
28 pub route_id: String,
29 pub action: String,
30 pub error: CamelError,
31}
32
33#[cfg(test)]
35fn compute_reload_actions(
36 new_definitions: &[RouteDefinition],
37 controller: &DefaultRouteController,
38) -> Vec<ReloadAction> {
39 let active_ids: std::collections::HashSet<String> =
40 controller.route_ids().into_iter().collect();
41 let mut new_ids = std::collections::HashSet::new();
42 let mut actions = Vec::new();
43
44 for def in new_definitions {
45 let route_id = def.route_id().to_string();
46 new_ids.insert(route_id.clone());
47
48 if active_ids.contains(&route_id) {
49 if let Some(from_uri) = controller.route_from_uri(&route_id) {
51 if from_uri != def.from_uri() {
52 actions.push(ReloadAction::Restart { route_id });
53 } else {
54 let existing_hash = controller.route_source_hash(&route_id);
55 let new_hash = def.source_hash();
56 match (existing_hash, new_hash) {
57 (Some(h_existing), Some(h_new)) if h_existing == h_new => {
58 actions.push(ReloadAction::Skip { route_id });
59 }
60 _ => {
61 actions.push(ReloadAction::Swap { route_id });
62 }
63 }
64 }
65 }
66 } else {
67 actions.push(ReloadAction::Add { route_id });
68 }
69 }
70
71 for id in &active_ids {
73 if !new_ids.contains(id) {
74 actions.push(ReloadAction::Remove {
75 route_id: id.clone(),
76 });
77 }
78 }
79
80 actions
81}
82
83pub(crate) fn compute_reload_actions_from_runtime_snapshot(
88 new_definitions: &[RouteDefinition],
89 runtime_route_ids: &[String],
90 runtime_source_hash: &dyn Fn(&str) -> Option<u64>,
91) -> Vec<ReloadAction> {
92 let active_ids: std::collections::HashSet<String> = runtime_route_ids.iter().cloned().collect();
93 let mut new_ids = std::collections::HashSet::new();
94 let mut actions = Vec::new();
95
96 for def in new_definitions {
97 let route_id = def.route_id().to_string();
98 new_ids.insert(route_id.clone());
99
100 if active_ids.contains(&route_id) {
101 let existing_hash = runtime_source_hash(&route_id);
102 let new_hash = def.source_hash();
103 match (existing_hash, new_hash) {
104 (Some(h_existing), Some(h_new)) if h_existing == h_new => {
105 actions.push(ReloadAction::Skip { route_id });
106 }
107 _ => {
108 actions.push(ReloadAction::Restart { route_id });
109 }
110 }
111 } else {
112 actions.push(ReloadAction::Add { route_id });
113 }
114 }
115
116 for id in &active_ids {
117 if !new_ids.contains(id) {
118 actions.push(ReloadAction::Remove {
119 route_id: id.clone(),
120 });
121 }
122 }
123
124 actions
125}
126
127pub async fn execute_reload_actions(
134 actions: Vec<ReloadAction>,
135 mut new_definitions: Vec<RouteDefinition>,
136 controller: &RuntimeExecutionHandle,
137 drain_timeout: Duration,
138 function_ctx: Option<&FunctionReloadContext>,
139) -> Vec<ReloadError> {
140 let mut errors = Vec::new();
141
142 for action in actions {
143 match action {
144 ReloadAction::Swap { route_id } => {
145 reload_actions::apply_swap(
146 route_id,
147 &mut new_definitions,
148 controller,
149 function_ctx,
150 &mut errors,
151 )
152 .await;
153 }
154
155 ReloadAction::Add { route_id } => {
156 reload_actions::apply_add(
157 route_id,
158 &mut new_definitions,
159 controller,
160 function_ctx,
161 &mut errors,
162 )
163 .await;
164 }
165
166 ReloadAction::Remove { route_id } => {
167 reload_actions::apply_remove(
168 route_id,
169 controller,
170 drain_timeout,
171 function_ctx,
172 &mut errors,
173 )
174 .await;
175 }
176
177 ReloadAction::Restart { route_id } => {
178 reload_actions::apply_restart(
179 route_id,
180 &mut new_definitions,
181 controller,
182 drain_timeout,
183 function_ctx,
184 &mut errors,
185 )
186 .await;
187 }
188
189 ReloadAction::Skip { route_id } => {
190 tracing::debug!(route_id = %route_id, "hot-reload: skipped unchanged route");
191 }
192 }
193 }
194
195 errors
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use crate::lifecycle::adapters::route_controller::DefaultRouteController;
202 use crate::shared::components::domain::Registry;
203 use std::sync::Arc;
204
205 fn make_controller() -> DefaultRouteController {
206 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
207 DefaultRouteController::new(
208 registry,
209 Arc::new(camel_api::NoopPlatformService::default()),
210 )
211 }
212
213 #[test]
214 fn test_new_route_detected_as_add() {
215 let controller = make_controller();
216 let defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("new-route")];
217 let actions = compute_reload_actions(&defs, &controller);
218 assert_eq!(
219 actions,
220 vec![ReloadAction::Add {
221 route_id: "new-route".into()
222 }]
223 );
224 }
225
226 #[tokio::test]
227 async fn test_removed_route_detected() {
228 let mut controller = make_controller();
229 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("old-route");
230 controller.add_route(def).await.unwrap();
231
232 let actions = compute_reload_actions(&[], &controller);
233 assert_eq!(
234 actions,
235 vec![ReloadAction::Remove {
236 route_id: "old-route".into()
237 }]
238 );
239 }
240
241 #[tokio::test]
242 async fn test_same_from_uri_detected_as_swap() {
243 let mut controller = make_controller();
244 let def = RouteDefinition::new("timer:tick", vec![])
245 .with_route_id("my-route")
246 .with_source_hash(100);
247 controller.add_route(def).await.unwrap();
248
249 let new_defs = vec![
250 RouteDefinition::new("timer:tick", vec![])
251 .with_route_id("my-route")
252 .with_source_hash(200),
253 ];
254 let actions = compute_reload_actions(&new_defs, &controller);
255 assert_eq!(
256 actions,
257 vec![ReloadAction::Swap {
258 route_id: "my-route".into()
259 }]
260 );
261 }
262
263 #[tokio::test]
264 async fn test_changed_from_uri_detected_as_restart() {
265 let mut controller = make_controller();
266 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
267 controller.add_route(def).await.unwrap();
268
269 let new_defs =
270 vec![RouteDefinition::new("timer:tock?period=500", vec![]).with_route_id("my-route")];
271 let actions = compute_reload_actions(&new_defs, &controller);
272 assert_eq!(
273 actions,
274 vec![ReloadAction::Restart {
275 route_id: "my-route".into()
276 }]
277 );
278 }
279
280 #[tokio::test]
281 async fn test_runtime_snapshot_drives_remove_set() {
282 let mut controller = make_controller();
283 controller
284 .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("runtime-route"))
285 .await
286 .unwrap();
287 controller
288 .add_route(RouteDefinition::new("timer:ghost", vec![]).with_route_id("ghost-route"))
289 .await
290 .unwrap();
291
292 let runtime_ids = vec!["runtime-route".to_string()];
293 let actions =
294 compute_reload_actions_from_runtime_snapshot(&[], &runtime_ids, &|_id: &str| None);
295 assert_eq!(
296 actions,
297 vec![ReloadAction::Remove {
298 route_id: "runtime-route".into()
299 }]
300 );
301 }
302
303 #[test]
304 fn test_runtime_snapshot_existing_routes_map_to_restart() {
305 let defs = vec![
306 RouteDefinition::new("timer:tick", vec![])
307 .with_route_id("runtime-r1")
308 .with_source_hash(10),
309 RouteDefinition::new("timer:tock", vec![])
310 .with_route_id("runtime-r2")
311 .with_source_hash(20),
312 ];
313 let runtime_ids = vec!["runtime-r1".to_string(), "runtime-r2".to_string()];
314 let runtime_hashes = std::collections::HashMap::from([
315 ("runtime-r1".to_string(), 11u64),
316 ("runtime-r2".to_string(), 22u64),
317 ]);
318
319 let actions =
320 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
321 runtime_hashes.get(id).copied()
322 });
323 assert_eq!(
324 actions,
325 vec![
326 ReloadAction::Restart {
327 route_id: "runtime-r1".into()
328 },
329 ReloadAction::Restart {
330 route_id: "runtime-r2".into()
331 }
332 ]
333 );
334 }
335
336 #[tokio::test]
337 async fn test_same_hash_detected_as_skip() {
338 let mut controller = make_controller();
339 let def = RouteDefinition::new("timer:tick", vec![])
340 .with_route_id("my-route")
341 .with_source_hash(42);
342 controller.add_route(def).await.unwrap();
343
344 let new_defs = vec![
345 RouteDefinition::new("timer:tick", vec![])
346 .with_route_id("my-route")
347 .with_source_hash(42),
348 ];
349 let actions = compute_reload_actions(&new_defs, &controller);
350 assert_eq!(
351 actions,
352 vec![ReloadAction::Skip {
353 route_id: "my-route".into()
354 }]
355 );
356 }
357
358 #[tokio::test]
359 async fn test_none_hash_detected_as_swap() {
360 let mut controller = make_controller();
361 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
362 controller.add_route(def).await.unwrap();
363
364 let new_defs = vec![
365 RouteDefinition::new("timer:tick", vec![])
366 .with_route_id("my-route")
367 .with_source_hash(99),
368 ];
369 let actions = compute_reload_actions(&new_defs, &controller);
370 assert_eq!(
371 actions,
372 vec![ReloadAction::Swap {
373 route_id: "my-route".into()
374 }]
375 );
376 }
377
378 #[test]
379 fn test_runtime_snapshot_same_hash_detected_as_skip() {
380 let defs = vec![
381 RouteDefinition::new("timer:tick", vec![])
382 .with_route_id("r1")
383 .with_source_hash(42),
384 ];
385 let runtime_ids = vec!["r1".to_string()];
386 let runtime_hashes = std::collections::HashMap::from([("r1".to_string(), 42u64)]);
387
388 let actions =
389 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
390 runtime_hashes.get(id).copied()
391 });
392 assert_eq!(
393 actions,
394 vec![ReloadAction::Skip {
395 route_id: "r1".into()
396 }]
397 );
398 }
399
400 #[test]
401 fn test_runtime_snapshot_mixed_actions_cover_all_decisions() {
402 let defs = vec![
403 RouteDefinition::new("timer:tick", vec![])
404 .with_route_id("existing-same")
405 .with_source_hash(10),
406 RouteDefinition::new("timer:tock", vec![])
407 .with_route_id("existing-diff")
408 .with_source_hash(20),
409 RouteDefinition::new("timer:new", vec![])
410 .with_route_id("brand-new")
411 .with_source_hash(30),
412 ];
413 let runtime_ids = vec![
414 "existing-same".to_string(),
415 "existing-diff".to_string(),
416 "orphan".to_string(),
417 ];
418 let runtime_hashes = std::collections::HashMap::from([
419 ("existing-same".to_string(), 10u64),
420 ("existing-diff".to_string(), 999u64),
421 ("orphan".to_string(), 77u64),
422 ]);
423
424 let actions =
425 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
426 runtime_hashes.get(id).copied()
427 });
428
429 assert_eq!(
430 actions,
431 vec![
432 ReloadAction::Skip {
433 route_id: "existing-same".into()
434 },
435 ReloadAction::Restart {
436 route_id: "existing-diff".into()
437 },
438 ReloadAction::Add {
439 route_id: "brand-new".into()
440 },
441 ReloadAction::Remove {
442 route_id: "orphan".into()
443 }
444 ]
445 );
446 }
447
448 #[test]
449 fn test_runtime_snapshot_missing_runtime_hash_for_existing_route_restarts() {
450 let defs = vec![
451 RouteDefinition::new("timer:tick", vec![])
452 .with_route_id("r1")
453 .with_source_hash(42),
454 ];
455 let runtime_ids = vec!["r1".to_string()];
456
457 let actions =
458 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|_id: &str| None);
459
460 assert_eq!(
461 actions,
462 vec![ReloadAction::Restart {
463 route_id: "r1".into()
464 }]
465 );
466 }
467
468 #[test]
469 fn test_runtime_snapshot_missing_new_hash_for_existing_route_restarts() {
470 let defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("r1")];
471 let runtime_ids = vec!["r1".to_string()];
472 let runtime_hashes = std::collections::HashMap::from([("r1".to_string(), 42u64)]);
473
474 let actions =
475 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
476 runtime_hashes.get(id).copied()
477 });
478
479 assert_eq!(
480 actions,
481 vec![ReloadAction::Restart {
482 route_id: "r1".into()
483 }]
484 );
485 }
486
487 #[test]
488 fn test_runtime_snapshot_new_only_route_maps_to_add() {
489 let defs = vec![
490 RouteDefinition::new("timer:tick", vec![])
491 .with_route_id("new-only")
492 .with_source_hash(1),
493 ];
494 let runtime_ids: Vec<String> = vec![];
495
496 let actions =
497 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|_id: &str| None);
498
499 assert_eq!(
500 actions,
501 vec![ReloadAction::Add {
502 route_id: "new-only".into()
503 }]
504 );
505 }
506
507 #[tokio::test]
511 async fn test_execute_add_action_inserts_route() {
512 use crate::CamelContext;
513 use camel_component_timer::TimerComponent;
514
515 let mut ctx = CamelContext::builder().build().await.unwrap();
516 ctx.register_component(TimerComponent::new());
517 ctx.start().await.unwrap();
518
519 let def = RouteDefinition::new("timer:tick?period=50&repeatCount=1", vec![])
520 .with_route_id("exec-add-test");
521 let actions = vec![ReloadAction::Add {
522 route_id: "exec-add-test".into(),
523 }];
524 let errors = execute_reload_actions(
525 actions,
526 vec![def],
527 &ctx.runtime_execution_handle(),
528 Duration::from_secs(10),
529 None,
530 )
531 .await;
532 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
533
534 assert_eq!(
535 ctx.runtime_execution_handle()
536 .controller_route_count_for_test()
537 .await,
538 1
539 );
540
541 ctx.stop().await.unwrap();
542 }
543
544 #[tokio::test]
545 async fn test_execute_remove_action_deletes_route() {
546 use crate::CamelContext;
547 use camel_component_timer::TimerComponent;
548
549 let mut ctx = CamelContext::builder().build().await.unwrap();
550 ctx.register_component(TimerComponent::new());
551 ctx.start().await.unwrap();
552
553 let def =
555 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-remove-test");
556 ctx.add_route_definition(def).await.unwrap();
557 assert_eq!(
558 ctx.runtime_execution_handle()
559 .controller_route_count_for_test()
560 .await,
561 1
562 );
563
564 let actions = vec![ReloadAction::Remove {
565 route_id: "exec-remove-test".into(),
566 }];
567 let errors = execute_reload_actions(
568 actions,
569 vec![],
570 &ctx.runtime_execution_handle(),
571 Duration::from_secs(10),
572 None,
573 )
574 .await;
575 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
576
577 assert_eq!(
578 ctx.runtime_execution_handle()
579 .controller_route_count_for_test()
580 .await,
581 0
582 );
583
584 ctx.stop().await.unwrap();
585 }
586
587 #[tokio::test]
588 async fn test_execute_swap_action_replaces_pipeline() {
589 use crate::CamelContext;
590 use camel_component_timer::TimerComponent;
591
592 let mut ctx = CamelContext::builder().build().await.unwrap();
593 ctx.register_component(TimerComponent::new());
594 ctx.start().await.unwrap();
595
596 let def =
598 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
599 ctx.add_route_definition(def).await.unwrap();
600
601 let new_def =
603 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
604 let actions = vec![ReloadAction::Swap {
605 route_id: "exec-swap-test".into(),
606 }];
607 let errors = execute_reload_actions(
608 actions,
609 vec![new_def],
610 &ctx.runtime_execution_handle(),
611 Duration::from_secs(10),
612 None,
613 )
614 .await;
615 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
616
617 assert_eq!(
619 ctx.runtime_execution_handle()
620 .controller_route_count_for_test()
621 .await,
622 1
623 );
624
625 ctx.stop().await.unwrap();
626 }
627
628 #[tokio::test]
629 async fn test_execute_restart_action_preserves_registered_lifecycle_state() {
630 use crate::CamelContext;
631 use camel_api::{RuntimeQuery, RuntimeQueryResult};
632 use camel_component_timer::TimerComponent;
633
634 let mut ctx = CamelContext::builder().build().await.unwrap();
635 ctx.register_component(TimerComponent::new());
636 ctx.start().await.unwrap();
637
638 let initial = RouteDefinition::new("timer:tick?period=100", vec![])
640 .with_route_id("exec-restart-test");
641 ctx.add_route_definition(initial).await.unwrap();
642
643 let before = ctx
645 .runtime()
646 .ask(RuntimeQuery::GetRouteStatus {
647 route_id: "exec-restart-test".into(),
648 })
649 .await
650 .unwrap();
651 match before {
652 RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
653 other => panic!("unexpected query result: {other:?}"),
654 }
655
656 let replacement = RouteDefinition::new("timer:tick?period=250", vec![])
657 .with_route_id("exec-restart-test");
658 let actions = vec![ReloadAction::Restart {
659 route_id: "exec-restart-test".into(),
660 }];
661 let errors = execute_reload_actions(
662 actions,
663 vec![replacement],
664 &ctx.runtime_execution_handle(),
665 Duration::from_secs(10),
666 None,
667 )
668 .await;
669 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
670
671 let after = ctx
674 .runtime()
675 .ask(RuntimeQuery::GetRouteStatus {
676 route_id: "exec-restart-test".into(),
677 })
678 .await
679 .unwrap();
680 match after {
681 RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
682 other => panic!("unexpected query result: {other:?}"),
683 }
684
685 assert_eq!(
686 ctx.runtime_route_status("exec-restart-test").await.unwrap(),
687 Some("Registered".to_string())
688 );
689
690 ctx.stop().await.unwrap();
691 }
692
693 #[tokio::test]
694 async fn test_execute_swap_action_missing_definition_returns_error() {
695 use crate::CamelContext;
696
697 let ctx = CamelContext::builder().build().await.unwrap();
698 let errors = execute_reload_actions(
699 vec![ReloadAction::Swap {
700 route_id: "missing-swap-def".into(),
701 }],
702 vec![],
703 &ctx.runtime_execution_handle(),
704 Duration::from_millis(1),
705 None,
706 )
707 .await;
708
709 assert_eq!(errors.len(), 1);
710 assert_eq!(errors[0].action, "Swap");
711 assert_eq!(errors[0].route_id, "missing-swap-def");
712 }
713
714 #[tokio::test]
715 async fn test_execute_add_action_missing_definition_returns_error() {
716 use crate::CamelContext;
717
718 let ctx = CamelContext::builder().build().await.unwrap();
719 let errors = execute_reload_actions(
720 vec![ReloadAction::Add {
721 route_id: "missing-add-def".into(),
722 }],
723 vec![],
724 &ctx.runtime_execution_handle(),
725 Duration::from_millis(1),
726 None,
727 )
728 .await;
729
730 assert_eq!(errors.len(), 1);
731 assert_eq!(errors[0].action, "Add");
732 assert_eq!(errors[0].route_id, "missing-add-def");
733 }
734
735 #[tokio::test]
736 async fn test_execute_remove_action_status_error_returns_error() {
737 use crate::CamelContext;
738
739 let ctx = CamelContext::builder().build().await.unwrap();
740 let errors = execute_reload_actions(
741 vec![ReloadAction::Remove {
742 route_id: "missing-remove-route".into(),
743 }],
744 vec![],
745 &ctx.runtime_execution_handle(),
746 Duration::from_millis(1),
747 None,
748 )
749 .await;
750
751 assert_eq!(errors.len(), 1);
752 assert!(errors[0].action.starts_with("Remove"));
753 assert_eq!(errors[0].route_id, "missing-remove-route");
754 }
755
756 #[tokio::test]
757 async fn test_execute_restart_action_missing_definition_returns_error() {
758 use crate::CamelContext;
759
760 let ctx = CamelContext::builder().build().await.unwrap();
761 let errors = execute_reload_actions(
762 vec![ReloadAction::Restart {
763 route_id: "missing-restart-def".into(),
764 }],
765 vec![],
766 &ctx.runtime_execution_handle(),
767 Duration::from_millis(1),
768 None,
769 )
770 .await;
771
772 assert_eq!(errors.len(), 1);
773 assert_eq!(errors[0].action, "Restart");
774 assert_eq!(errors[0].route_id, "missing-restart-def");
775 }
776
777 #[tokio::test]
778 async fn test_execute_skip_action_returns_no_errors() {
779 use crate::CamelContext;
780
781 let ctx = CamelContext::builder().build().await.unwrap();
782 let errors = execute_reload_actions(
783 vec![ReloadAction::Skip {
784 route_id: "skip-only-route".into(),
785 }],
786 vec![],
787 &ctx.runtime_execution_handle(),
788 Duration::from_millis(1),
789 None,
790 )
791 .await;
792
793 assert!(errors.is_empty());
794 }
795
796 #[test]
797 fn reload_error_debug_format() {
798 let err = ReloadError {
799 route_id: "r1".into(),
800 action: "Swap".into(),
801 error: CamelError::RouteError("test error".into()),
802 };
803 let debug = format!("{:?}", err);
804 assert!(debug.contains("r1"));
805 assert!(debug.contains("Swap"));
806 }
807}