1use std::time::Duration;
7
8use camel_api::CamelError;
9
10use crate::context::RuntimeExecutionHandle;
11use crate::hot_reload::domain::ReloadAction;
12#[cfg(test)]
13use crate::lifecycle::adapters::route_controller::DefaultRouteController;
14use crate::lifecycle::application::route_definition::RouteDefinition;
15
16use super::reload_actions;
17
18pub struct FunctionReloadContext {
19 pub invoker: std::sync::Arc<dyn camel_api::function::FunctionInvoker>,
20 pub generation: u64,
21}
22
23#[derive(Debug)]
27pub struct ReloadError {
28 pub route_id: String,
29 pub action: String,
30 pub error: CamelError,
31}
32
33#[cfg(test)]
35fn compute_reload_actions(
36 new_definitions: &[RouteDefinition],
37 controller: &DefaultRouteController,
38) -> Vec<ReloadAction> {
39 let active_ids: std::collections::HashSet<String> =
40 controller.route_ids().into_iter().collect();
41 let mut new_ids = std::collections::HashSet::new();
42 let mut actions = Vec::new();
43
44 for def in new_definitions {
45 let route_id = def.route_id().to_string();
46 new_ids.insert(route_id.clone());
47
48 if active_ids.contains(&route_id) {
49 if let Some(from_uri) = controller.route_from_uri(&route_id) {
51 if from_uri != def.from_uri() {
52 actions.push(ReloadAction::Restart { route_id });
53 } else {
54 let existing_hash = controller.route_source_hash(&route_id);
55 let new_hash = def.source_hash();
56 match (existing_hash, new_hash) {
57 (Some(h_existing), Some(h_new)) if h_existing == h_new => {
58 actions.push(ReloadAction::Skip { route_id });
59 }
60 _ => {
61 actions.push(ReloadAction::Swap { route_id });
62 }
63 }
64 }
65 }
66 } else {
67 actions.push(ReloadAction::Add { route_id });
68 }
69 }
70
71 for id in &active_ids {
73 if !new_ids.contains(id) {
74 actions.push(ReloadAction::Remove {
75 route_id: id.clone(),
76 });
77 }
78 }
79
80 actions
81}
82
83pub(crate) fn compute_reload_actions_from_runtime_snapshot(
88 new_definitions: &[RouteDefinition],
89 runtime_route_ids: &[String],
90 runtime_source_hash: &dyn Fn(&str) -> Option<u64>,
91) -> Vec<ReloadAction> {
92 let active_ids: std::collections::HashSet<String> = runtime_route_ids.iter().cloned().collect();
93 let mut new_ids = std::collections::HashSet::new();
94 let mut actions = Vec::new();
95
96 for def in new_definitions {
97 let route_id = def.route_id().to_string();
98 new_ids.insert(route_id.clone());
99
100 if active_ids.contains(&route_id) {
101 let existing_hash = runtime_source_hash(&route_id);
102 let new_hash = def.source_hash();
103 match (existing_hash, new_hash) {
104 (Some(h_existing), Some(h_new)) if h_existing == h_new => {
105 actions.push(ReloadAction::Skip { route_id });
106 }
107 _ => {
108 actions.push(ReloadAction::Restart { route_id });
109 }
110 }
111 } else {
112 actions.push(ReloadAction::Add { route_id });
113 }
114 }
115
116 for id in &active_ids {
117 if !new_ids.contains(id) {
118 actions.push(ReloadAction::Remove {
119 route_id: id.clone(),
120 });
121 }
122 }
123
124 actions
125}
126
127pub async fn execute_reload_actions(
134 actions: Vec<ReloadAction>,
135 mut new_definitions: Vec<RouteDefinition>,
136 controller: &RuntimeExecutionHandle,
137 drain_timeout: Duration,
138 function_ctx: Option<&FunctionReloadContext>,
139) -> Vec<ReloadError> {
140 let mut errors = Vec::new();
141
142 for action in actions {
143 match action {
144 ReloadAction::Swap { route_id } => {
145 reload_actions::apply_swap(
146 route_id,
147 &mut new_definitions,
148 controller,
149 drain_timeout,
150 function_ctx,
151 &mut errors,
152 )
153 .await;
154 }
155
156 ReloadAction::Add { route_id } => {
157 reload_actions::apply_add(
158 route_id,
159 &mut new_definitions,
160 controller,
161 function_ctx,
162 &mut errors,
163 )
164 .await;
165 }
166
167 ReloadAction::Remove { route_id } => {
168 reload_actions::apply_remove(
169 route_id,
170 controller,
171 drain_timeout,
172 function_ctx,
173 &mut errors,
174 )
175 .await;
176 }
177
178 ReloadAction::Restart { route_id } => {
179 reload_actions::apply_restart(
180 route_id,
181 &mut new_definitions,
182 controller,
183 drain_timeout,
184 function_ctx,
185 &mut errors,
186 )
187 .await;
188 }
189
190 ReloadAction::Skip { route_id } => {
191 tracing::debug!(route_id = %route_id, "hot-reload: skipped unchanged route");
192 }
193 }
194 }
195
196 errors
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202 use crate::lifecycle::adapters::route_controller::DefaultRouteController;
203 use crate::shared::components::domain::Registry;
204 use std::sync::Arc;
205
206 fn make_controller() -> DefaultRouteController {
207 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
208 DefaultRouteController::new(
209 registry,
210 Arc::new(camel_api::NoopPlatformService::default()),
211 )
212 }
213
214 #[test]
215 fn test_new_route_detected_as_add() {
216 let controller = make_controller();
217 let defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("new-route")];
218 let actions = compute_reload_actions(&defs, &controller);
219 assert_eq!(
220 actions,
221 vec![ReloadAction::Add {
222 route_id: "new-route".into()
223 }]
224 );
225 }
226
227 #[tokio::test]
228 async fn test_removed_route_detected() {
229 let mut controller = make_controller();
230 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("old-route");
231 controller.add_route(def).await.unwrap();
232
233 let actions = compute_reload_actions(&[], &controller);
234 assert_eq!(
235 actions,
236 vec![ReloadAction::Remove {
237 route_id: "old-route".into()
238 }]
239 );
240 }
241
242 #[tokio::test]
243 async fn test_same_from_uri_detected_as_swap() {
244 let mut controller = make_controller();
245 let def = RouteDefinition::new("timer:tick", vec![])
246 .with_route_id("my-route")
247 .with_source_hash(100);
248 controller.add_route(def).await.unwrap();
249
250 let new_defs = vec![
251 RouteDefinition::new("timer:tick", vec![])
252 .with_route_id("my-route")
253 .with_source_hash(200),
254 ];
255 let actions = compute_reload_actions(&new_defs, &controller);
256 assert_eq!(
257 actions,
258 vec![ReloadAction::Swap {
259 route_id: "my-route".into()
260 }]
261 );
262 }
263
264 #[tokio::test]
265 async fn test_changed_from_uri_detected_as_restart() {
266 let mut controller = make_controller();
267 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
268 controller.add_route(def).await.unwrap();
269
270 let new_defs =
271 vec![RouteDefinition::new("timer:tock?period=500", vec![]).with_route_id("my-route")];
272 let actions = compute_reload_actions(&new_defs, &controller);
273 assert_eq!(
274 actions,
275 vec![ReloadAction::Restart {
276 route_id: "my-route".into()
277 }]
278 );
279 }
280
281 #[tokio::test]
282 async fn test_runtime_snapshot_drives_remove_set() {
283 let mut controller = make_controller();
284 controller
285 .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("runtime-route"))
286 .await
287 .unwrap();
288 controller
289 .add_route(RouteDefinition::new("timer:ghost", vec![]).with_route_id("ghost-route"))
290 .await
291 .unwrap();
292
293 let runtime_ids = vec!["runtime-route".to_string()];
294 let actions =
295 compute_reload_actions_from_runtime_snapshot(&[], &runtime_ids, &|_id: &str| None);
296 assert_eq!(
297 actions,
298 vec![ReloadAction::Remove {
299 route_id: "runtime-route".into()
300 }]
301 );
302 }
303
304 #[test]
305 fn test_runtime_snapshot_existing_routes_map_to_restart() {
306 let defs = vec![
307 RouteDefinition::new("timer:tick", vec![])
308 .with_route_id("runtime-r1")
309 .with_source_hash(10),
310 RouteDefinition::new("timer:tock", vec![])
311 .with_route_id("runtime-r2")
312 .with_source_hash(20),
313 ];
314 let runtime_ids = vec!["runtime-r1".to_string(), "runtime-r2".to_string()];
315 let runtime_hashes = std::collections::HashMap::from([
316 ("runtime-r1".to_string(), 11u64),
317 ("runtime-r2".to_string(), 22u64),
318 ]);
319
320 let actions =
321 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
322 runtime_hashes.get(id).copied()
323 });
324 assert_eq!(
325 actions,
326 vec![
327 ReloadAction::Restart {
328 route_id: "runtime-r1".into()
329 },
330 ReloadAction::Restart {
331 route_id: "runtime-r2".into()
332 }
333 ]
334 );
335 }
336
337 #[tokio::test]
338 async fn test_same_hash_detected_as_skip() {
339 let mut controller = make_controller();
340 let def = RouteDefinition::new("timer:tick", vec![])
341 .with_route_id("my-route")
342 .with_source_hash(42);
343 controller.add_route(def).await.unwrap();
344
345 let new_defs = vec![
346 RouteDefinition::new("timer:tick", vec![])
347 .with_route_id("my-route")
348 .with_source_hash(42),
349 ];
350 let actions = compute_reload_actions(&new_defs, &controller);
351 assert_eq!(
352 actions,
353 vec![ReloadAction::Skip {
354 route_id: "my-route".into()
355 }]
356 );
357 }
358
359 #[tokio::test]
360 async fn test_none_hash_detected_as_swap() {
361 let mut controller = make_controller();
362 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
363 controller.add_route(def).await.unwrap();
364
365 let new_defs = vec![
366 RouteDefinition::new("timer:tick", vec![])
367 .with_route_id("my-route")
368 .with_source_hash(99),
369 ];
370 let actions = compute_reload_actions(&new_defs, &controller);
371 assert_eq!(
372 actions,
373 vec![ReloadAction::Swap {
374 route_id: "my-route".into()
375 }]
376 );
377 }
378
379 #[test]
380 fn test_runtime_snapshot_same_hash_detected_as_skip() {
381 let defs = vec![
382 RouteDefinition::new("timer:tick", vec![])
383 .with_route_id("r1")
384 .with_source_hash(42),
385 ];
386 let runtime_ids = vec!["r1".to_string()];
387 let runtime_hashes = std::collections::HashMap::from([("r1".to_string(), 42u64)]);
388
389 let actions =
390 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
391 runtime_hashes.get(id).copied()
392 });
393 assert_eq!(
394 actions,
395 vec![ReloadAction::Skip {
396 route_id: "r1".into()
397 }]
398 );
399 }
400
401 #[test]
402 fn test_runtime_snapshot_mixed_actions_cover_all_decisions() {
403 let defs = vec![
404 RouteDefinition::new("timer:tick", vec![])
405 .with_route_id("existing-same")
406 .with_source_hash(10),
407 RouteDefinition::new("timer:tock", vec![])
408 .with_route_id("existing-diff")
409 .with_source_hash(20),
410 RouteDefinition::new("timer:new", vec![])
411 .with_route_id("brand-new")
412 .with_source_hash(30),
413 ];
414 let runtime_ids = vec![
415 "existing-same".to_string(),
416 "existing-diff".to_string(),
417 "orphan".to_string(),
418 ];
419 let runtime_hashes = std::collections::HashMap::from([
420 ("existing-same".to_string(), 10u64),
421 ("existing-diff".to_string(), 999u64),
422 ("orphan".to_string(), 77u64),
423 ]);
424
425 let actions =
426 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
427 runtime_hashes.get(id).copied()
428 });
429
430 assert_eq!(
431 actions,
432 vec![
433 ReloadAction::Skip {
434 route_id: "existing-same".into()
435 },
436 ReloadAction::Restart {
437 route_id: "existing-diff".into()
438 },
439 ReloadAction::Add {
440 route_id: "brand-new".into()
441 },
442 ReloadAction::Remove {
443 route_id: "orphan".into()
444 }
445 ]
446 );
447 }
448
449 #[test]
450 fn test_runtime_snapshot_missing_runtime_hash_for_existing_route_restarts() {
451 let defs = vec![
452 RouteDefinition::new("timer:tick", vec![])
453 .with_route_id("r1")
454 .with_source_hash(42),
455 ];
456 let runtime_ids = vec!["r1".to_string()];
457
458 let actions =
459 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|_id: &str| None);
460
461 assert_eq!(
462 actions,
463 vec![ReloadAction::Restart {
464 route_id: "r1".into()
465 }]
466 );
467 }
468
469 #[test]
470 fn test_runtime_snapshot_missing_new_hash_for_existing_route_restarts() {
471 let defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("r1")];
472 let runtime_ids = vec!["r1".to_string()];
473 let runtime_hashes = std::collections::HashMap::from([("r1".to_string(), 42u64)]);
474
475 let actions =
476 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
477 runtime_hashes.get(id).copied()
478 });
479
480 assert_eq!(
481 actions,
482 vec![ReloadAction::Restart {
483 route_id: "r1".into()
484 }]
485 );
486 }
487
488 #[test]
489 fn test_runtime_snapshot_new_only_route_maps_to_add() {
490 let defs = vec![
491 RouteDefinition::new("timer:tick", vec![])
492 .with_route_id("new-only")
493 .with_source_hash(1),
494 ];
495 let runtime_ids: Vec<String> = vec![];
496
497 let actions =
498 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|_id: &str| None);
499
500 assert_eq!(
501 actions,
502 vec![ReloadAction::Add {
503 route_id: "new-only".into()
504 }]
505 );
506 }
507
508 #[tokio::test]
512 async fn test_execute_add_action_inserts_route() {
513 use crate::CamelContext;
514 use camel_component_timer::TimerComponent;
515
516 let mut ctx = CamelContext::builder().build().await.unwrap();
517 ctx.register_component(TimerComponent::new());
518 ctx.start().await.unwrap();
519
520 let def = RouteDefinition::new("timer:tick?period=50&repeatCount=1", vec![])
521 .with_route_id("exec-add-test");
522 let actions = vec![ReloadAction::Add {
523 route_id: "exec-add-test".into(),
524 }];
525 let errors = execute_reload_actions(
526 actions,
527 vec![def],
528 &ctx.runtime_execution_handle(),
529 Duration::from_secs(10),
530 None,
531 )
532 .await;
533 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
534
535 assert_eq!(
536 ctx.runtime_execution_handle()
537 .controller_route_count_for_test()
538 .await,
539 1
540 );
541
542 ctx.stop().await.unwrap();
543 }
544
545 #[tokio::test]
546 async fn test_execute_remove_action_deletes_route() {
547 use crate::CamelContext;
548 use camel_component_timer::TimerComponent;
549
550 let mut ctx = CamelContext::builder().build().await.unwrap();
551 ctx.register_component(TimerComponent::new());
552 ctx.start().await.unwrap();
553
554 let def =
556 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-remove-test");
557 ctx.add_route_definition(def).await.unwrap();
558 assert_eq!(
559 ctx.runtime_execution_handle()
560 .controller_route_count_for_test()
561 .await,
562 1
563 );
564
565 let actions = vec![ReloadAction::Remove {
566 route_id: "exec-remove-test".into(),
567 }];
568 let errors = execute_reload_actions(
569 actions,
570 vec![],
571 &ctx.runtime_execution_handle(),
572 Duration::from_secs(10),
573 None,
574 )
575 .await;
576 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
577
578 assert_eq!(
579 ctx.runtime_execution_handle()
580 .controller_route_count_for_test()
581 .await,
582 0
583 );
584
585 ctx.stop().await.unwrap();
586 }
587
588 #[tokio::test]
589 async fn test_execute_swap_action_replaces_pipeline() {
590 use crate::CamelContext;
591 use camel_component_timer::TimerComponent;
592
593 let mut ctx = CamelContext::builder().build().await.unwrap();
594 ctx.register_component(TimerComponent::new());
595 ctx.start().await.unwrap();
596
597 let def =
599 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
600 ctx.add_route_definition(def).await.unwrap();
601
602 let new_def =
604 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
605 let actions = vec![ReloadAction::Swap {
606 route_id: "exec-swap-test".into(),
607 }];
608 let errors = execute_reload_actions(
609 actions,
610 vec![new_def],
611 &ctx.runtime_execution_handle(),
612 Duration::from_secs(10),
613 None,
614 )
615 .await;
616 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
617
618 assert_eq!(
620 ctx.runtime_execution_handle()
621 .controller_route_count_for_test()
622 .await,
623 1
624 );
625
626 ctx.stop().await.unwrap();
627 }
628
629 #[tokio::test]
630 async fn test_execute_restart_action_preserves_registered_lifecycle_state() {
631 use crate::CamelContext;
632 use camel_api::{RuntimeQuery, RuntimeQueryResult};
633 use camel_component_timer::TimerComponent;
634
635 let mut ctx = CamelContext::builder().build().await.unwrap();
636 ctx.register_component(TimerComponent::new());
637 ctx.start().await.unwrap();
638
639 let initial = RouteDefinition::new("timer:tick?period=100", vec![])
641 .with_route_id("exec-restart-test");
642 ctx.add_route_definition(initial).await.unwrap();
643
644 let before = ctx
646 .runtime()
647 .ask(RuntimeQuery::GetRouteStatus {
648 route_id: "exec-restart-test".into(),
649 })
650 .await
651 .unwrap();
652 match before {
653 RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
654 other => panic!("unexpected query result: {other:?}"),
655 }
656
657 let replacement = RouteDefinition::new("timer:tick?period=250", vec![])
658 .with_route_id("exec-restart-test");
659 let actions = vec![ReloadAction::Restart {
660 route_id: "exec-restart-test".into(),
661 }];
662 let errors = execute_reload_actions(
663 actions,
664 vec![replacement],
665 &ctx.runtime_execution_handle(),
666 Duration::from_secs(10),
667 None,
668 )
669 .await;
670 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
671
672 let after = ctx
675 .runtime()
676 .ask(RuntimeQuery::GetRouteStatus {
677 route_id: "exec-restart-test".into(),
678 })
679 .await
680 .unwrap();
681 match after {
682 RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
683 other => panic!("unexpected query result: {other:?}"),
684 }
685
686 assert_eq!(
687 ctx.runtime_route_status("exec-restart-test").await.unwrap(),
688 Some("Registered".to_string())
689 );
690
691 ctx.stop().await.unwrap();
692 }
693
694 #[tokio::test]
695 async fn test_execute_swap_action_missing_definition_returns_error() {
696 use crate::CamelContext;
697
698 let ctx = CamelContext::builder().build().await.unwrap();
699 let errors = execute_reload_actions(
700 vec![ReloadAction::Swap {
701 route_id: "missing-swap-def".into(),
702 }],
703 vec![],
704 &ctx.runtime_execution_handle(),
705 Duration::from_millis(1),
706 None,
707 )
708 .await;
709
710 assert_eq!(errors.len(), 1);
711 assert_eq!(errors[0].action, "Swap");
712 assert_eq!(errors[0].route_id, "missing-swap-def");
713 }
714
715 #[tokio::test]
716 async fn test_execute_add_action_missing_definition_returns_error() {
717 use crate::CamelContext;
718
719 let ctx = CamelContext::builder().build().await.unwrap();
720 let errors = execute_reload_actions(
721 vec![ReloadAction::Add {
722 route_id: "missing-add-def".into(),
723 }],
724 vec![],
725 &ctx.runtime_execution_handle(),
726 Duration::from_millis(1),
727 None,
728 )
729 .await;
730
731 assert_eq!(errors.len(), 1);
732 assert_eq!(errors[0].action, "Add");
733 assert_eq!(errors[0].route_id, "missing-add-def");
734 }
735
736 #[tokio::test]
737 async fn test_execute_remove_action_status_error_returns_error() {
738 use crate::CamelContext;
739
740 let ctx = CamelContext::builder().build().await.unwrap();
741 let errors = execute_reload_actions(
742 vec![ReloadAction::Remove {
743 route_id: "missing-remove-route".into(),
744 }],
745 vec![],
746 &ctx.runtime_execution_handle(),
747 Duration::from_millis(1),
748 None,
749 )
750 .await;
751
752 assert_eq!(errors.len(), 1);
753 assert!(errors[0].action.starts_with("Remove"));
754 assert_eq!(errors[0].route_id, "missing-remove-route");
755 }
756
757 #[tokio::test]
758 async fn test_execute_restart_action_missing_definition_returns_error() {
759 use crate::CamelContext;
760
761 let ctx = CamelContext::builder().build().await.unwrap();
762 let errors = execute_reload_actions(
763 vec![ReloadAction::Restart {
764 route_id: "missing-restart-def".into(),
765 }],
766 vec![],
767 &ctx.runtime_execution_handle(),
768 Duration::from_millis(1),
769 None,
770 )
771 .await;
772
773 assert_eq!(errors.len(), 1);
774 assert_eq!(errors[0].action, "Restart");
775 assert_eq!(errors[0].route_id, "missing-restart-def");
776 }
777
778 #[tokio::test]
779 async fn test_execute_skip_action_returns_no_errors() {
780 use crate::CamelContext;
781
782 let ctx = CamelContext::builder().build().await.unwrap();
783 let errors = execute_reload_actions(
784 vec![ReloadAction::Skip {
785 route_id: "skip-only-route".into(),
786 }],
787 vec![],
788 &ctx.runtime_execution_handle(),
789 Duration::from_millis(1),
790 None,
791 )
792 .await;
793
794 assert!(errors.is_empty());
795 }
796
797 #[test]
798 fn reload_error_debug_format() {
799 let err = ReloadError {
800 route_id: "r1".into(),
801 action: "Swap".into(),
802 error: CamelError::RouteError("test error".into()),
803 };
804 let debug = format!("{:?}", err);
805 assert!(debug.contains("r1"));
806 assert!(debug.contains("Swap"));
807 }
808}