1use camel_api::CamelError;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::Duration;
9
10use crate::context::RuntimeExecutionHandle;
11use crate::hot_reload::application::drain::drain_route;
12use crate::hot_reload::domain::ReloadAction;
13#[cfg(test)]
14use crate::lifecycle::adapters::route_controller::DefaultRouteController;
15use crate::lifecycle::application::route_definition::RouteDefinition;
16
17pub struct FunctionReloadContext {
18 pub invoker: std::sync::Arc<dyn camel_api::function::FunctionInvoker>,
19 pub generation: u64,
20}
21
22fn compute_function_diff_for_route(
23 invoker: &std::sync::Arc<dyn camel_api::function::FunctionInvoker>,
24 route_id: &str,
25 generation: u64,
26) -> camel_api::function::FunctionDiff {
27 let current_refs: std::collections::HashSet<_> = invoker
28 .function_refs_for_route(route_id)
29 .into_iter()
30 .collect();
31 let staged_defs = invoker.staged_defs_for_route(route_id, generation);
32 let staged_refs: std::collections::HashSet<_> = staged_defs
33 .iter()
34 .map(|(def, rid)| (def.id.clone(), rid.clone()))
35 .collect();
36
37 let removed: Vec<_> = current_refs.difference(&staged_refs).cloned().collect();
38 let added_refs: std::collections::HashSet<_> =
39 staged_refs.difference(¤t_refs).cloned().collect();
40 let added: Vec<_> = staged_defs
41 .into_iter()
42 .filter(|(def, rid)| added_refs.contains(&(def.id.clone(), rid.clone())))
43 .collect();
44 let unchanged: Vec<_> = current_refs
45 .intersection(&staged_refs)
46 .map(|(id, _)| id.clone())
47 .collect();
48
49 camel_api::function::FunctionDiff {
50 added,
51 removed,
52 unchanged,
53 }
54}
55
56static RELOAD_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
57
58fn next_reload_command_id(op: &str, route_id: &str) -> String {
59 let seq = RELOAD_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
60 format!("reload:{op}:{route_id}:{seq}")
61}
62
63fn is_invalid_stop_transition(err: &CamelError) -> bool {
64 err.to_string().contains("invalid transition")
65}
66
67fn should_stop_before_mutation(runtime_status: Option<&str>) -> bool {
68 !matches!(runtime_status, Some("Registered" | "Stopped"))
69}
70
71fn should_start_after_restart(runtime_status: Option<&str>) -> bool {
72 !matches!(runtime_status, Some("Registered" | "Stopped"))
73}
74
75#[derive(Debug)]
79pub struct ReloadError {
80 pub route_id: String,
81 pub action: String,
82 pub error: CamelError,
83}
84
85#[cfg(test)]
87fn compute_reload_actions(
88 new_definitions: &[RouteDefinition],
89 controller: &DefaultRouteController,
90) -> Vec<ReloadAction> {
91 let active_ids: std::collections::HashSet<String> =
92 controller.route_ids().into_iter().collect();
93 let mut new_ids = std::collections::HashSet::new();
94 let mut actions = Vec::new();
95
96 for def in new_definitions {
97 let route_id = def.route_id().to_string();
98 new_ids.insert(route_id.clone());
99
100 if active_ids.contains(&route_id) {
101 if let Some(from_uri) = controller.route_from_uri(&route_id) {
103 if from_uri != def.from_uri() {
104 actions.push(ReloadAction::Restart { route_id });
105 } else {
106 let existing_hash = controller.route_source_hash(&route_id);
107 let new_hash = def.source_hash();
108 match (existing_hash, new_hash) {
109 (Some(h_existing), Some(h_new)) if h_existing == h_new => {
110 actions.push(ReloadAction::Skip { route_id });
111 }
112 _ => {
113 actions.push(ReloadAction::Swap { route_id });
114 }
115 }
116 }
117 }
118 } else {
119 actions.push(ReloadAction::Add { route_id });
120 }
121 }
122
123 for id in &active_ids {
125 if !new_ids.contains(id) {
126 actions.push(ReloadAction::Remove {
127 route_id: id.clone(),
128 });
129 }
130 }
131
132 actions
133}
134
135pub(crate) fn compute_reload_actions_from_runtime_snapshot(
140 new_definitions: &[RouteDefinition],
141 runtime_route_ids: &[String],
142 runtime_source_hash: &dyn Fn(&str) -> Option<u64>,
143) -> Vec<ReloadAction> {
144 let active_ids: std::collections::HashSet<String> = runtime_route_ids.iter().cloned().collect();
145 let mut new_ids = std::collections::HashSet::new();
146 let mut actions = Vec::new();
147
148 for def in new_definitions {
149 let route_id = def.route_id().to_string();
150 new_ids.insert(route_id.clone());
151
152 if active_ids.contains(&route_id) {
153 let existing_hash = runtime_source_hash(&route_id);
154 let new_hash = def.source_hash();
155 match (existing_hash, new_hash) {
156 (Some(h_existing), Some(h_new)) if h_existing == h_new => {
157 actions.push(ReloadAction::Skip { route_id });
158 }
159 _ => {
160 actions.push(ReloadAction::Restart { route_id });
161 }
162 }
163 } else {
164 actions.push(ReloadAction::Add { route_id });
165 }
166 }
167
168 for id in &active_ids {
169 if !new_ids.contains(id) {
170 actions.push(ReloadAction::Remove {
171 route_id: id.clone(),
172 });
173 }
174 }
175
176 actions
177}
178
179pub async fn execute_reload_actions(
186 actions: Vec<ReloadAction>,
187 mut new_definitions: Vec<RouteDefinition>,
188 controller: &RuntimeExecutionHandle,
189 drain_timeout: Duration,
190 function_ctx: Option<&FunctionReloadContext>,
191) -> Vec<ReloadError> {
192 let mut errors = Vec::new();
193
194 for action in actions {
195 match action {
196 ReloadAction::Swap { route_id } => {
197 let def_pos = new_definitions
198 .iter()
199 .position(|d| d.route_id() == route_id);
200 let def = match def_pos {
201 Some(pos) => new_definitions.remove(pos),
202 None => {
203 errors.push(ReloadError {
204 route_id: route_id.clone(),
205 action: "Swap".into(),
206 error: CamelError::RouteError(format!(
207 "No definition found for route '{}'",
208 route_id
209 )),
210 });
211 continue;
212 }
213 };
214
215 let in_flight = controller.in_flight_count(&route_id).await.unwrap_or(0);
216
217 let pipeline = if let Some(ctx) = function_ctx {
218 controller
219 .compile_route_definition_with_generation(def, ctx.generation)
220 .await
221 } else {
222 controller.compile_route_definition(def).await
223 };
224
225 match pipeline {
226 Ok(p) => {
227 let prepare_token = if let Some(ctx) = function_ctx {
228 let diff = compute_function_diff_for_route(
229 &ctx.invoker,
230 &route_id,
231 ctx.generation,
232 );
233 match ctx.invoker.prepare_reload(diff, ctx.generation).await {
234 Ok(token) => Some(token),
235 Err(e) => {
236 errors.push(ReloadError {
237 route_id: route_id.clone(),
238 action: "Swap (prepare)".into(),
239 error: CamelError::ProcessorError(format!("{e}")),
240 });
241 continue;
242 }
243 }
244 } else {
245 None
246 };
247
248 let result = controller.swap_route_pipeline(&route_id, p).await;
249 if let Err(e) = result {
250 if let Some(ctx) = function_ctx
251 && let Some(ref token) = prepare_token
252 {
253 let _ = ctx
254 .invoker
255 .rollback_reload(token.clone(), ctx.generation)
256 .await;
257 }
258 errors.push(ReloadError {
259 route_id,
260 action: "Swap".into(),
261 error: e,
262 });
263 } else {
264 if let Some(ctx) = function_ctx {
265 let diff = compute_function_diff_for_route(
266 &ctx.invoker,
267 &route_id,
268 ctx.generation,
269 );
270 if let Err(e) =
271 ctx.invoker.finalize_reload(&diff, ctx.generation).await
272 {
273 errors.push(ReloadError {
274 route_id: route_id.clone(),
275 action: "Finalize".into(),
276 error: CamelError::ProcessorError(format!("{e}")),
277 });
278 }
279 }
280 if in_flight > 0 {
281 tracing::info!(
282 route_id = %route_id,
283 action = "swap",
284 in_flight = in_flight,
285 "hot-reload: swapped route pipeline ({} exchanges continuing with previous pipeline)",
286 in_flight
287 );
288 } else {
289 tracing::info!(route_id = %route_id, "hot-reload: swapped route pipeline");
290 }
291 }
292 }
293 Err(e) => {
294 errors.push(ReloadError {
295 route_id,
296 action: "Swap (compile)".into(),
297 error: e,
298 });
299 }
300 }
301 }
302
303 ReloadAction::Add { route_id } => {
304 let def_pos = new_definitions
305 .iter()
306 .position(|d| d.route_id() == route_id);
307 let def = match def_pos {
308 Some(pos) => new_definitions.remove(pos),
309 None => {
310 errors.push(ReloadError {
311 route_id: route_id.clone(),
312 action: "Add".into(),
313 error: CamelError::RouteError(format!(
314 "No definition found for route '{}'",
315 route_id
316 )),
317 });
318 continue;
319 }
320 };
321
322 if let Some(ctx) = function_ctx {
323 let prepared = match controller
324 .prepare_route_definition_with_generation(def, ctx.generation)
325 .await
326 {
327 Ok(p) => p,
328 Err(e) => {
329 ctx.invoker.discard_staging(ctx.generation);
330 errors.push(ReloadError {
331 route_id,
332 action: "Add (prepare-route)".into(),
333 error: e,
334 });
335 continue;
336 }
337 };
338
339 let diff =
340 compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
341
342 let prepare_token = match ctx.invoker.prepare_reload(diff, ctx.generation).await
343 {
344 Ok(token) => token,
345 Err(e) => {
346 ctx.invoker.discard_staging(ctx.generation);
347 errors.push(ReloadError {
348 route_id,
349 action: "Add (prepare)".into(),
350 error: CamelError::ProcessorError(format!("{e}")),
351 });
352 continue;
353 }
354 };
355
356 if let Err(e) = controller.insert_prepared_route(prepared).await {
357 let _ = ctx
358 .invoker
359 .rollback_reload(prepare_token, ctx.generation)
360 .await;
361 errors.push(ReloadError {
362 route_id,
363 action: "Add (insert)".into(),
364 error: e,
365 });
366 continue;
367 }
368
369 if let Err(e) = controller.register_route_aggregate(route_id.clone()).await {
370 let _ = controller
371 .remove_route_preserving_functions(route_id.clone())
372 .await;
373 let _ = ctx
374 .invoker
375 .rollback_reload(prepare_token, ctx.generation)
376 .await;
377 errors.push(ReloadError {
378 route_id,
379 action: "Add (aggregate)".into(),
380 error: e,
381 });
382 continue;
383 }
384
385 let diff =
386 compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
387 if let Err(e) = ctx.invoker.finalize_reload(&diff, ctx.generation).await {
388 errors.push(ReloadError {
389 route_id: route_id.clone(),
390 action: "Finalize".into(),
391 error: CamelError::ProcessorError(format!("{e}")),
392 });
393 }
394 } else {
395 if let Err(e) = controller.add_route_definition(def).await {
396 errors.push(ReloadError {
397 route_id,
398 action: "Add".into(),
399 error: e,
400 });
401 continue;
402 }
403 }
404
405 let start_result = controller
406 .execute_runtime_command(camel_api::RuntimeCommand::StartRoute {
407 route_id: route_id.clone(),
408 command_id: next_reload_command_id("add-start", &route_id),
409 causation_id: None,
410 })
411 .await;
412 if let Err(e) = start_result {
413 errors.push(ReloadError {
414 route_id,
415 action: "Add (start)".into(),
416 error: e,
417 });
418 } else {
419 tracing::info!(route_id = %route_id, "hot-reload: added and started route");
420 }
421 }
422
423 ReloadAction::Remove { route_id } => {
424 if let Some(ctx) = function_ctx {
425 let diff =
426 compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
427
428 let runtime_status = match controller.runtime_route_status(&route_id).await {
429 Ok(status) => status,
430 Err(e) => {
431 errors.push(ReloadError {
432 route_id: route_id.clone(),
433 action: "Remove (status)".into(),
434 error: e,
435 });
436 continue;
437 }
438 };
439
440 if should_stop_before_mutation(runtime_status.as_deref()) {
441 let stop_result = controller
442 .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
443 route_id: route_id.clone(),
444 command_id: next_reload_command_id("remove-stop", &route_id),
445 causation_id: None,
446 })
447 .await;
448 if let Err(e) = stop_result
449 && !is_invalid_stop_transition(&e)
450 {
451 errors.push(ReloadError {
452 route_id: route_id.clone(),
453 action: "Remove (stop)".into(),
454 error: e,
455 });
456 continue;
457 }
458
459 let _ = drain_route(&route_id, "remove", controller, drain_timeout).await;
460 }
461
462 if let Err(e) = controller
463 .remove_route_preserving_functions(route_id.clone())
464 .await
465 {
466 errors.push(ReloadError {
467 route_id,
468 action: "Remove".into(),
469 error: e,
470 });
471 continue;
472 }
473
474 if let Err(e) = ctx.invoker.finalize_reload(&diff, ctx.generation).await {
475 errors.push(ReloadError {
476 route_id: route_id.clone(),
477 action: "Finalize".into(),
478 error: CamelError::ProcessorError(format!("{e}")),
479 });
480 }
481
482 tracing::info!(route_id = %route_id, "hot-reload: stopped and removed route");
483 } else {
484 let runtime_status = match controller.runtime_route_status(&route_id).await {
485 Ok(status) => status,
486 Err(e) => {
487 errors.push(ReloadError {
488 route_id,
489 action: "Remove (status)".into(),
490 error: e,
491 });
492 continue;
493 }
494 };
495
496 if should_stop_before_mutation(runtime_status.as_deref()) {
497 let stop_result = controller
498 .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
499 route_id: route_id.clone(),
500 command_id: next_reload_command_id("remove-stop", &route_id),
501 causation_id: None,
502 })
503 .await;
504 if let Err(e) = stop_result
505 && !is_invalid_stop_transition(&e)
506 {
507 errors.push(ReloadError {
508 route_id: route_id.clone(),
509 action: "Remove (stop)".into(),
510 error: e,
511 });
512 continue;
513 }
514
515 let _ = drain_route(&route_id, "remove", controller, drain_timeout).await;
516 }
517
518 let remove_result = controller
519 .execute_runtime_command(camel_api::RuntimeCommand::RemoveRoute {
520 route_id: route_id.clone(),
521 command_id: next_reload_command_id("remove", &route_id),
522 causation_id: None,
523 })
524 .await;
525 match remove_result {
526 Ok(_) => {
527 tracing::info!(route_id = %route_id, "hot-reload: stopped and removed route");
528 }
529 Err(e) => {
530 errors.push(ReloadError {
531 route_id,
532 action: "Remove".into(),
533 error: e,
534 });
535 }
536 }
537 }
538 }
539
540 ReloadAction::Restart { route_id } => {
541 tracing::info!(route_id = %route_id, "hot-reload: restarting route (from_uri changed)");
542
543 let def_pos = new_definitions
544 .iter()
545 .position(|d| d.route_id() == route_id);
546 let def = match def_pos {
547 Some(pos) => new_definitions.remove(pos),
548 None => {
549 errors.push(ReloadError {
550 route_id: route_id.clone(),
551 action: "Restart".into(),
552 error: CamelError::RouteError(format!(
553 "No definition found for route '{}'",
554 route_id
555 )),
556 });
557 continue;
558 }
559 };
560
561 if let Some(ctx) = function_ctx {
562 let prepared = match controller
563 .prepare_route_definition_with_generation(def, ctx.generation)
564 .await
565 {
566 Ok(p) => p,
567 Err(e) => {
568 errors.push(ReloadError {
569 route_id,
570 action: "Restart (prepare-route)".into(),
571 error: e,
572 });
573 continue;
574 }
575 };
576
577 let diff =
578 compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
579 let prepare_token = match ctx.invoker.prepare_reload(diff, ctx.generation).await
580 {
581 Ok(token) => token,
582 Err(e) => {
583 errors.push(ReloadError {
584 route_id,
585 action: "Restart (prepare)".into(),
586 error: CamelError::ProcessorError(format!("{e}")),
587 });
588 continue;
589 }
590 };
591
592 let runtime_status = match controller.runtime_route_status(&route_id).await {
593 Ok(status) => status,
594 Err(e) => {
595 let _ = ctx
596 .invoker
597 .rollback_reload(prepare_token, ctx.generation)
598 .await;
599 errors.push(ReloadError {
600 route_id,
601 action: "Restart (status)".into(),
602 error: e,
603 });
604 continue;
605 }
606 };
607
608 if should_stop_before_mutation(runtime_status.as_deref()) {
609 let stop_result = controller
610 .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
611 route_id: route_id.clone(),
612 command_id: next_reload_command_id("restart-stop", &route_id),
613 causation_id: None,
614 })
615 .await;
616 if let Err(e) = stop_result
617 && !is_invalid_stop_transition(&e)
618 {
619 let _ = ctx
620 .invoker
621 .rollback_reload(prepare_token, ctx.generation)
622 .await;
623 errors.push(ReloadError {
624 route_id,
625 action: "Restart (stop)".into(),
626 error: e,
627 });
628 continue;
629 }
630
631 let _ = drain_route(&route_id, "restart", controller, drain_timeout).await;
632 }
633
634 if let Err(e) = controller
635 .remove_route_preserving_functions(route_id.clone())
636 .await
637 {
638 let _ = ctx
639 .invoker
640 .rollback_reload(prepare_token, ctx.generation)
641 .await;
642 errors.push(ReloadError {
643 route_id,
644 action: "Restart (remove)".into(),
645 error: e,
646 });
647 continue;
648 }
649
650 if let Err(e) = controller.insert_prepared_route(prepared).await {
651 let _ = ctx
652 .invoker
653 .rollback_reload(prepare_token, ctx.generation)
654 .await;
655 errors.push(ReloadError {
656 route_id,
657 action: "Restart (insert)".into(),
658 error: e,
659 });
660 continue;
661 }
662
663 let diff =
664 compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
665 if let Err(e) = ctx.invoker.finalize_reload(&diff, ctx.generation).await {
666 errors.push(ReloadError {
667 route_id: route_id.clone(),
668 action: "Finalize".into(),
669 error: CamelError::ProcessorError(format!("{e}")),
670 });
671 }
672
673 if should_start_after_restart(runtime_status.as_deref()) {
674 let start_result = controller
675 .execute_runtime_command(camel_api::RuntimeCommand::StartRoute {
676 route_id: route_id.clone(),
677 command_id: next_reload_command_id("restart-start", &route_id),
678 causation_id: None,
679 })
680 .await;
681 if let Err(e) = start_result {
682 errors.push(ReloadError {
683 route_id,
684 action: "Restart (start)".into(),
685 error: e,
686 });
687 } else {
688 tracing::info!(
689 route_id = %route_id,
690 "hot-reload: route restarted successfully"
691 );
692 }
693 } else {
694 tracing::info!(
695 route_id = %route_id,
696 "hot-reload: restart applied while preserving stopped lifecycle state"
697 );
698 }
699 } else {
700 let runtime_status = match controller.runtime_route_status(&route_id).await {
701 Ok(status) => status,
702 Err(e) => {
703 errors.push(ReloadError {
704 route_id,
705 action: "Restart (status)".into(),
706 error: e,
707 });
708 continue;
709 }
710 };
711
712 if should_stop_before_mutation(runtime_status.as_deref()) {
713 let stop_result = controller
714 .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
715 route_id: route_id.clone(),
716 command_id: next_reload_command_id("restart-stop", &route_id),
717 causation_id: None,
718 })
719 .await;
720 if let Err(e) = stop_result
721 && !is_invalid_stop_transition(&e)
722 {
723 errors.push(ReloadError {
724 route_id,
725 action: "Restart (stop)".into(),
726 error: e,
727 });
728 continue;
729 }
730
731 let _ = drain_route(&route_id, "restart", controller, drain_timeout).await;
732 }
733
734 if let Err(e) = controller
735 .execute_runtime_command(camel_api::RuntimeCommand::RemoveRoute {
736 route_id: route_id.clone(),
737 command_id: next_reload_command_id("restart-remove", &route_id),
738 causation_id: None,
739 })
740 .await
741 {
742 errors.push(ReloadError {
743 route_id,
744 action: "Restart (remove)".into(),
745 error: e,
746 });
747 continue;
748 }
749
750 if let Err(e) = controller.add_route_definition(def).await {
751 errors.push(ReloadError {
752 route_id,
753 action: "Restart (add)".into(),
754 error: e,
755 });
756 continue;
757 }
758
759 if should_start_after_restart(runtime_status.as_deref()) {
760 let start_result = controller
761 .execute_runtime_command(camel_api::RuntimeCommand::StartRoute {
762 route_id: route_id.clone(),
763 command_id: next_reload_command_id("restart-start", &route_id),
764 causation_id: None,
765 })
766 .await;
767 if let Err(e) = start_result {
768 errors.push(ReloadError {
769 route_id,
770 action: "Restart (start)".into(),
771 error: e,
772 });
773 } else {
774 tracing::info!(
775 route_id = %route_id,
776 "hot-reload: route restarted successfully"
777 );
778 }
779 } else {
780 tracing::info!(
781 route_id = %route_id,
782 "hot-reload: restart applied while preserving stopped lifecycle state"
783 );
784 }
785 }
786 }
787
788 ReloadAction::Skip { route_id } => {
789 tracing::debug!(route_id = %route_id, "hot-reload: skipped unchanged route");
790 }
791 }
792 }
793
794 errors
795}
796
797#[cfg(test)]
798mod tests {
799 use super::*;
800 use crate::lifecycle::adapters::route_controller::DefaultRouteController;
801 use crate::shared::components::domain::Registry;
802 use std::sync::Arc;
803 use std::time::Duration;
804
805 fn make_controller() -> DefaultRouteController {
806 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
807 DefaultRouteController::new(
808 registry,
809 Arc::new(camel_api::NoopPlatformService::default()),
810 )
811 }
812
813 #[test]
814 fn test_new_route_detected_as_add() {
815 let controller = make_controller();
816 let defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("new-route")];
817 let actions = compute_reload_actions(&defs, &controller);
818 assert_eq!(
819 actions,
820 vec![ReloadAction::Add {
821 route_id: "new-route".into()
822 }]
823 );
824 }
825
826 #[tokio::test]
827 async fn test_removed_route_detected() {
828 let mut controller = make_controller();
829 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("old-route");
830 controller.add_route(def).await.unwrap();
831
832 let actions = compute_reload_actions(&[], &controller);
833 assert_eq!(
834 actions,
835 vec![ReloadAction::Remove {
836 route_id: "old-route".into()
837 }]
838 );
839 }
840
841 #[tokio::test]
842 async fn test_same_from_uri_detected_as_swap() {
843 let mut controller = make_controller();
844 let def = RouteDefinition::new("timer:tick", vec![])
845 .with_route_id("my-route")
846 .with_source_hash(100);
847 controller.add_route(def).await.unwrap();
848
849 let new_defs = vec![
850 RouteDefinition::new("timer:tick", vec![])
851 .with_route_id("my-route")
852 .with_source_hash(200),
853 ];
854 let actions = compute_reload_actions(&new_defs, &controller);
855 assert_eq!(
856 actions,
857 vec![ReloadAction::Swap {
858 route_id: "my-route".into()
859 }]
860 );
861 }
862
863 #[tokio::test]
864 async fn test_changed_from_uri_detected_as_restart() {
865 let mut controller = make_controller();
866 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
867 controller.add_route(def).await.unwrap();
868
869 let new_defs =
870 vec![RouteDefinition::new("timer:tock?period=500", vec![]).with_route_id("my-route")];
871 let actions = compute_reload_actions(&new_defs, &controller);
872 assert_eq!(
873 actions,
874 vec![ReloadAction::Restart {
875 route_id: "my-route".into()
876 }]
877 );
878 }
879
880 #[tokio::test]
881 async fn test_runtime_snapshot_drives_remove_set() {
882 let mut controller = make_controller();
883 controller
884 .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("runtime-route"))
885 .await
886 .unwrap();
887 controller
888 .add_route(RouteDefinition::new("timer:ghost", vec![]).with_route_id("ghost-route"))
889 .await
890 .unwrap();
891
892 let runtime_ids = vec!["runtime-route".to_string()];
893 let actions =
894 compute_reload_actions_from_runtime_snapshot(&[], &runtime_ids, &|_id: &str| None);
895 assert_eq!(
896 actions,
897 vec![ReloadAction::Remove {
898 route_id: "runtime-route".into()
899 }]
900 );
901 }
902
903 #[test]
904 fn test_runtime_snapshot_existing_routes_map_to_restart() {
905 let defs = vec![
906 RouteDefinition::new("timer:tick", vec![])
907 .with_route_id("runtime-r1")
908 .with_source_hash(10),
909 RouteDefinition::new("timer:tock", vec![])
910 .with_route_id("runtime-r2")
911 .with_source_hash(20),
912 ];
913 let runtime_ids = vec!["runtime-r1".to_string(), "runtime-r2".to_string()];
914 let runtime_hashes = std::collections::HashMap::from([
915 ("runtime-r1".to_string(), 11u64),
916 ("runtime-r2".to_string(), 22u64),
917 ]);
918
919 let actions =
920 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
921 runtime_hashes.get(id).copied()
922 });
923 assert_eq!(
924 actions,
925 vec![
926 ReloadAction::Restart {
927 route_id: "runtime-r1".into()
928 },
929 ReloadAction::Restart {
930 route_id: "runtime-r2".into()
931 }
932 ]
933 );
934 }
935
936 #[tokio::test]
937 async fn test_same_hash_detected_as_skip() {
938 let mut controller = make_controller();
939 let def = RouteDefinition::new("timer:tick", vec![])
940 .with_route_id("my-route")
941 .with_source_hash(42);
942 controller.add_route(def).await.unwrap();
943
944 let new_defs = vec![
945 RouteDefinition::new("timer:tick", vec![])
946 .with_route_id("my-route")
947 .with_source_hash(42),
948 ];
949 let actions = compute_reload_actions(&new_defs, &controller);
950 assert_eq!(
951 actions,
952 vec![ReloadAction::Skip {
953 route_id: "my-route".into()
954 }]
955 );
956 }
957
958 #[tokio::test]
959 async fn test_none_hash_detected_as_swap() {
960 let mut controller = make_controller();
961 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
962 controller.add_route(def).await.unwrap();
963
964 let new_defs = vec![
965 RouteDefinition::new("timer:tick", vec![])
966 .with_route_id("my-route")
967 .with_source_hash(99),
968 ];
969 let actions = compute_reload_actions(&new_defs, &controller);
970 assert_eq!(
971 actions,
972 vec![ReloadAction::Swap {
973 route_id: "my-route".into()
974 }]
975 );
976 }
977
978 #[test]
979 fn test_runtime_snapshot_same_hash_detected_as_skip() {
980 let defs = vec![
981 RouteDefinition::new("timer:tick", vec![])
982 .with_route_id("r1")
983 .with_source_hash(42),
984 ];
985 let runtime_ids = vec!["r1".to_string()];
986 let runtime_hashes = std::collections::HashMap::from([("r1".to_string(), 42u64)]);
987
988 let actions =
989 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
990 runtime_hashes.get(id).copied()
991 });
992 assert_eq!(
993 actions,
994 vec![ReloadAction::Skip {
995 route_id: "r1".into()
996 }]
997 );
998 }
999
1000 #[tokio::test]
1004 async fn test_execute_add_action_inserts_route() {
1005 use crate::CamelContext;
1006 use camel_component_timer::TimerComponent;
1007
1008 let mut ctx = CamelContext::builder().build().await.unwrap();
1009 ctx.register_component(TimerComponent::new());
1010 ctx.start().await.unwrap();
1011
1012 let def = RouteDefinition::new("timer:tick?period=50&repeatCount=1", vec![])
1013 .with_route_id("exec-add-test");
1014 let actions = vec![ReloadAction::Add {
1015 route_id: "exec-add-test".into(),
1016 }];
1017 let errors = execute_reload_actions(
1018 actions,
1019 vec![def],
1020 &ctx.runtime_execution_handle(),
1021 Duration::from_secs(10),
1022 None,
1023 )
1024 .await;
1025 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1026
1027 assert_eq!(
1028 ctx.runtime_execution_handle()
1029 .controller_route_count_for_test()
1030 .await,
1031 1
1032 );
1033
1034 ctx.stop().await.unwrap();
1035 }
1036
1037 #[tokio::test]
1038 async fn test_execute_remove_action_deletes_route() {
1039 use crate::CamelContext;
1040 use camel_component_timer::TimerComponent;
1041
1042 let mut ctx = CamelContext::builder().build().await.unwrap();
1043 ctx.register_component(TimerComponent::new());
1044 ctx.start().await.unwrap();
1045
1046 let def =
1048 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-remove-test");
1049 ctx.add_route_definition(def).await.unwrap();
1050 assert_eq!(
1051 ctx.runtime_execution_handle()
1052 .controller_route_count_for_test()
1053 .await,
1054 1
1055 );
1056
1057 let actions = vec![ReloadAction::Remove {
1058 route_id: "exec-remove-test".into(),
1059 }];
1060 let errors = execute_reload_actions(
1061 actions,
1062 vec![],
1063 &ctx.runtime_execution_handle(),
1064 Duration::from_secs(10),
1065 None,
1066 )
1067 .await;
1068 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1069
1070 assert_eq!(
1071 ctx.runtime_execution_handle()
1072 .controller_route_count_for_test()
1073 .await,
1074 0
1075 );
1076
1077 ctx.stop().await.unwrap();
1078 }
1079
1080 #[tokio::test]
1081 async fn test_execute_swap_action_replaces_pipeline() {
1082 use crate::CamelContext;
1083 use camel_component_timer::TimerComponent;
1084
1085 let mut ctx = CamelContext::builder().build().await.unwrap();
1086 ctx.register_component(TimerComponent::new());
1087 ctx.start().await.unwrap();
1088
1089 let def =
1091 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
1092 ctx.add_route_definition(def).await.unwrap();
1093
1094 let new_def =
1096 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
1097 let actions = vec![ReloadAction::Swap {
1098 route_id: "exec-swap-test".into(),
1099 }];
1100 let errors = execute_reload_actions(
1101 actions,
1102 vec![new_def],
1103 &ctx.runtime_execution_handle(),
1104 Duration::from_secs(10),
1105 None,
1106 )
1107 .await;
1108 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1109
1110 assert_eq!(
1112 ctx.runtime_execution_handle()
1113 .controller_route_count_for_test()
1114 .await,
1115 1
1116 );
1117
1118 ctx.stop().await.unwrap();
1119 }
1120
1121 #[tokio::test]
1122 async fn test_execute_restart_action_preserves_registered_lifecycle_state() {
1123 use crate::CamelContext;
1124 use camel_api::{RuntimeQuery, RuntimeQueryResult};
1125 use camel_component_timer::TimerComponent;
1126
1127 let mut ctx = CamelContext::builder().build().await.unwrap();
1128 ctx.register_component(TimerComponent::new());
1129 ctx.start().await.unwrap();
1130
1131 let initial = RouteDefinition::new("timer:tick?period=100", vec![])
1133 .with_route_id("exec-restart-test");
1134 ctx.add_route_definition(initial).await.unwrap();
1135
1136 let before = ctx
1138 .runtime()
1139 .ask(RuntimeQuery::GetRouteStatus {
1140 route_id: "exec-restart-test".into(),
1141 })
1142 .await
1143 .unwrap();
1144 match before {
1145 RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
1146 other => panic!("unexpected query result: {other:?}"),
1147 }
1148
1149 let replacement = RouteDefinition::new("timer:tick?period=250", vec![])
1150 .with_route_id("exec-restart-test");
1151 let actions = vec![ReloadAction::Restart {
1152 route_id: "exec-restart-test".into(),
1153 }];
1154 let errors = execute_reload_actions(
1155 actions,
1156 vec![replacement],
1157 &ctx.runtime_execution_handle(),
1158 Duration::from_secs(10),
1159 None,
1160 )
1161 .await;
1162 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1163
1164 let after = ctx
1167 .runtime()
1168 .ask(RuntimeQuery::GetRouteStatus {
1169 route_id: "exec-restart-test".into(),
1170 })
1171 .await
1172 .unwrap();
1173 match after {
1174 RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
1175 other => panic!("unexpected query result: {other:?}"),
1176 }
1177
1178 assert_eq!(
1179 ctx.runtime_route_status("exec-restart-test").await.unwrap(),
1180 Some("Registered".to_string())
1181 );
1182
1183 ctx.stop().await.unwrap();
1184 }
1185
1186 #[tokio::test]
1187 async fn test_execute_swap_action_missing_definition_returns_error() {
1188 use crate::CamelContext;
1189
1190 let ctx = CamelContext::builder().build().await.unwrap();
1191 let errors = execute_reload_actions(
1192 vec![ReloadAction::Swap {
1193 route_id: "missing-swap-def".into(),
1194 }],
1195 vec![],
1196 &ctx.runtime_execution_handle(),
1197 Duration::from_millis(1),
1198 None,
1199 )
1200 .await;
1201
1202 assert_eq!(errors.len(), 1);
1203 assert_eq!(errors[0].action, "Swap");
1204 assert_eq!(errors[0].route_id, "missing-swap-def");
1205 }
1206
1207 #[tokio::test]
1208 async fn test_execute_add_action_missing_definition_returns_error() {
1209 use crate::CamelContext;
1210
1211 let ctx = CamelContext::builder().build().await.unwrap();
1212 let errors = execute_reload_actions(
1213 vec![ReloadAction::Add {
1214 route_id: "missing-add-def".into(),
1215 }],
1216 vec![],
1217 &ctx.runtime_execution_handle(),
1218 Duration::from_millis(1),
1219 None,
1220 )
1221 .await;
1222
1223 assert_eq!(errors.len(), 1);
1224 assert_eq!(errors[0].action, "Add");
1225 assert_eq!(errors[0].route_id, "missing-add-def");
1226 }
1227
1228 #[tokio::test]
1229 async fn test_execute_remove_action_status_error_returns_error() {
1230 use crate::CamelContext;
1231
1232 let ctx = CamelContext::builder().build().await.unwrap();
1233 let errors = execute_reload_actions(
1234 vec![ReloadAction::Remove {
1235 route_id: "missing-remove-route".into(),
1236 }],
1237 vec![],
1238 &ctx.runtime_execution_handle(),
1239 Duration::from_millis(1),
1240 None,
1241 )
1242 .await;
1243
1244 assert_eq!(errors.len(), 1);
1245 assert!(errors[0].action.starts_with("Remove"));
1246 assert_eq!(errors[0].route_id, "missing-remove-route");
1247 }
1248
1249 #[tokio::test]
1250 async fn test_execute_restart_action_missing_definition_returns_error() {
1251 use crate::CamelContext;
1252
1253 let ctx = CamelContext::builder().build().await.unwrap();
1254 let errors = execute_reload_actions(
1255 vec![ReloadAction::Restart {
1256 route_id: "missing-restart-def".into(),
1257 }],
1258 vec![],
1259 &ctx.runtime_execution_handle(),
1260 Duration::from_millis(1),
1261 None,
1262 )
1263 .await;
1264
1265 assert_eq!(errors.len(), 1);
1266 assert_eq!(errors[0].action, "Restart");
1267 assert_eq!(errors[0].route_id, "missing-restart-def");
1268 }
1269}