1use camel_api::CamelError;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::Duration;
9
10use crate::context::RuntimeExecutionHandle;
11use crate::hot_reload::application::drain::drain_route;
12use crate::hot_reload::domain::ReloadAction;
13#[cfg(test)]
14use crate::lifecycle::adapters::route_controller::DefaultRouteController;
15use crate::lifecycle::application::route_definition::RouteDefinition;
16
17pub struct FunctionReloadContext {
18 pub invoker: std::sync::Arc<dyn camel_api::function::FunctionInvoker>,
19 pub generation: u64,
20}
21
22fn compute_function_diff_for_route(
23 invoker: &std::sync::Arc<dyn camel_api::function::FunctionInvoker>,
24 route_id: &str,
25 generation: u64,
26) -> camel_api::function::FunctionDiff {
27 let current_refs: std::collections::HashSet<_> = invoker
28 .function_refs_for_route(route_id)
29 .into_iter()
30 .collect();
31 let staged_defs = invoker.staged_defs_for_route(route_id, generation);
32 let staged_refs: std::collections::HashSet<_> = staged_defs
33 .iter()
34 .map(|(def, rid)| (def.id.clone(), rid.clone()))
35 .collect();
36
37 let removed: Vec<_> = current_refs.difference(&staged_refs).cloned().collect();
38 let added_refs: std::collections::HashSet<_> =
39 staged_refs.difference(¤t_refs).cloned().collect();
40 let added: Vec<_> = staged_defs
41 .into_iter()
42 .filter(|(def, rid)| added_refs.contains(&(def.id.clone(), rid.clone())))
43 .collect();
44 let unchanged: Vec<_> = current_refs
45 .intersection(&staged_refs)
46 .map(|(id, _)| id.clone())
47 .collect();
48
49 camel_api::function::FunctionDiff {
50 added,
51 removed,
52 unchanged,
53 }
54}
55
56static RELOAD_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
57
58fn next_reload_command_id(op: &str, route_id: &str) -> String {
59 let seq = RELOAD_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
60 format!("reload:{op}:{route_id}:{seq}")
61}
62
63fn is_invalid_stop_transition(err: &CamelError) -> bool {
64 err.to_string().contains("invalid transition")
65}
66
67fn should_stop_before_mutation(runtime_status: Option<&str>) -> bool {
68 !matches!(runtime_status, Some("Registered" | "Stopped"))
69}
70
71fn should_start_after_restart(runtime_status: Option<&str>) -> bool {
72 !matches!(runtime_status, Some("Registered" | "Stopped"))
73}
74
75#[derive(Debug)]
79pub struct ReloadError {
80 pub route_id: String,
81 pub action: String,
82 pub error: CamelError,
83}
84
85#[cfg(test)]
87fn compute_reload_actions(
88 new_definitions: &[RouteDefinition],
89 controller: &DefaultRouteController,
90) -> Vec<ReloadAction> {
91 let active_ids: std::collections::HashSet<String> =
92 controller.route_ids().into_iter().collect();
93 let mut new_ids = std::collections::HashSet::new();
94 let mut actions = Vec::new();
95
96 for def in new_definitions {
97 let route_id = def.route_id().to_string();
98 new_ids.insert(route_id.clone());
99
100 if active_ids.contains(&route_id) {
101 if let Some(from_uri) = controller.route_from_uri(&route_id) {
103 if from_uri != def.from_uri() {
104 actions.push(ReloadAction::Restart { route_id });
105 } else {
106 let existing_hash = controller.route_source_hash(&route_id);
107 let new_hash = def.source_hash();
108 match (existing_hash, new_hash) {
109 (Some(h_existing), Some(h_new)) if h_existing == h_new => {
110 actions.push(ReloadAction::Skip { route_id });
111 }
112 _ => {
113 actions.push(ReloadAction::Swap { route_id });
114 }
115 }
116 }
117 }
118 } else {
119 actions.push(ReloadAction::Add { route_id });
120 }
121 }
122
123 for id in &active_ids {
125 if !new_ids.contains(id) {
126 actions.push(ReloadAction::Remove {
127 route_id: id.clone(),
128 });
129 }
130 }
131
132 actions
133}
134
135pub(crate) fn compute_reload_actions_from_runtime_snapshot(
140 new_definitions: &[RouteDefinition],
141 runtime_route_ids: &[String],
142 runtime_source_hash: &dyn Fn(&str) -> Option<u64>,
143) -> Vec<ReloadAction> {
144 let active_ids: std::collections::HashSet<String> = runtime_route_ids.iter().cloned().collect();
145 let mut new_ids = std::collections::HashSet::new();
146 let mut actions = Vec::new();
147
148 for def in new_definitions {
149 let route_id = def.route_id().to_string();
150 new_ids.insert(route_id.clone());
151
152 if active_ids.contains(&route_id) {
153 let existing_hash = runtime_source_hash(&route_id);
154 let new_hash = def.source_hash();
155 match (existing_hash, new_hash) {
156 (Some(h_existing), Some(h_new)) if h_existing == h_new => {
157 actions.push(ReloadAction::Skip { route_id });
158 }
159 _ => {
160 actions.push(ReloadAction::Restart { route_id });
161 }
162 }
163 } else {
164 actions.push(ReloadAction::Add { route_id });
165 }
166 }
167
168 for id in &active_ids {
169 if !new_ids.contains(id) {
170 actions.push(ReloadAction::Remove {
171 route_id: id.clone(),
172 });
173 }
174 }
175
176 actions
177}
178
179pub async fn execute_reload_actions(
186 actions: Vec<ReloadAction>,
187 mut new_definitions: Vec<RouteDefinition>,
188 controller: &RuntimeExecutionHandle,
189 drain_timeout: Duration,
190 function_ctx: Option<&FunctionReloadContext>,
191) -> Vec<ReloadError> {
192 let mut errors = Vec::new();
193
194 for action in actions {
195 match action {
196 ReloadAction::Swap { route_id } => {
197 let def_pos = new_definitions
198 .iter()
199 .position(|d| d.route_id() == route_id);
200 let def = match def_pos {
201 Some(pos) => new_definitions.remove(pos),
202 None => {
203 errors.push(ReloadError {
204 route_id: route_id.clone(),
205 action: "Swap".into(),
206 error: CamelError::RouteError(format!(
207 "No definition found for route '{}'",
208 route_id
209 )),
210 });
211 continue;
212 }
213 };
214
215 let in_flight = controller.in_flight_count(&route_id).await.unwrap_or(0);
216
217 let pipeline = if let Some(ctx) = function_ctx {
218 controller
219 .compile_route_definition_with_generation(def, ctx.generation)
220 .await
221 } else {
222 controller.compile_route_definition(def).await
223 };
224
225 match pipeline {
226 Ok(p) => {
227 let prepare_token = if let Some(ctx) = function_ctx {
228 let diff = compute_function_diff_for_route(
229 &ctx.invoker,
230 &route_id,
231 ctx.generation,
232 );
233 match ctx.invoker.prepare_reload(diff, ctx.generation).await {
234 Ok(token) => Some(token),
235 Err(e) => {
236 errors.push(ReloadError {
237 route_id: route_id.clone(),
238 action: "Swap (prepare)".into(),
239 error: CamelError::ProcessorError(format!("{e}")),
240 });
241 continue;
242 }
243 }
244 } else {
245 None
246 };
247
248 let result = controller.swap_route_pipeline(&route_id, p).await;
249 if let Err(e) = result {
250 if let Some(ctx) = function_ctx
251 && let Some(ref token) = prepare_token
252 {
253 let _ = ctx
254 .invoker
255 .rollback_reload(token.clone(), ctx.generation)
256 .await;
257 }
258 errors.push(ReloadError {
259 route_id,
260 action: "Swap".into(),
261 error: e,
262 });
263 } else {
264 if let Some(ctx) = function_ctx {
265 let diff = compute_function_diff_for_route(
266 &ctx.invoker,
267 &route_id,
268 ctx.generation,
269 );
270 if let Err(e) =
271 ctx.invoker.finalize_reload(&diff, ctx.generation).await
272 {
273 errors.push(ReloadError {
274 route_id: route_id.clone(),
275 action: "Finalize".into(),
276 error: CamelError::ProcessorError(format!("{e}")),
277 });
278 }
279 }
280 if in_flight > 0 {
281 tracing::info!(
282 route_id = %route_id,
283 action = "swap",
284 in_flight = in_flight,
285 "hot-reload: swapped route pipeline ({} exchanges continuing with previous pipeline)",
286 in_flight
287 );
288 } else {
289 tracing::info!(route_id = %route_id, "hot-reload: swapped route pipeline");
290 }
291 }
292 }
293 Err(e) => {
294 errors.push(ReloadError {
295 route_id,
296 action: "Swap (compile)".into(),
297 error: e,
298 });
299 }
300 }
301 }
302
303 ReloadAction::Add { route_id } => {
304 let def_pos = new_definitions
305 .iter()
306 .position(|d| d.route_id() == route_id);
307 let def = match def_pos {
308 Some(pos) => new_definitions.remove(pos),
309 None => {
310 errors.push(ReloadError {
311 route_id: route_id.clone(),
312 action: "Add".into(),
313 error: CamelError::RouteError(format!(
314 "No definition found for route '{}'",
315 route_id
316 )),
317 });
318 continue;
319 }
320 };
321
322 if let Some(ctx) = function_ctx {
323 let prepared = match controller
324 .prepare_route_definition_with_generation(def, ctx.generation)
325 .await
326 {
327 Ok(p) => p,
328 Err(e) => {
329 ctx.invoker.discard_staging(ctx.generation);
330 errors.push(ReloadError {
331 route_id,
332 action: "Add (prepare-route)".into(),
333 error: e,
334 });
335 continue;
336 }
337 };
338
339 let diff =
340 compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
341
342 let prepare_token = match ctx.invoker.prepare_reload(diff, ctx.generation).await
343 {
344 Ok(token) => token,
345 Err(e) => {
346 ctx.invoker.discard_staging(ctx.generation);
347 errors.push(ReloadError {
348 route_id,
349 action: "Add (prepare)".into(),
350 error: CamelError::ProcessorError(format!("{e}")),
351 });
352 continue;
353 }
354 };
355
356 if let Err(e) = controller.insert_prepared_route(prepared).await {
357 let _ = ctx
358 .invoker
359 .rollback_reload(prepare_token, ctx.generation)
360 .await;
361 errors.push(ReloadError {
362 route_id,
363 action: "Add (insert)".into(),
364 error: e,
365 });
366 continue;
367 }
368
369 if let Err(e) = controller.register_route_aggregate(route_id.clone()).await {
370 let _ = controller
371 .remove_route_preserving_functions(route_id.clone())
372 .await;
373 let _ = ctx
374 .invoker
375 .rollback_reload(prepare_token, ctx.generation)
376 .await;
377 errors.push(ReloadError {
378 route_id,
379 action: "Add (aggregate)".into(),
380 error: e,
381 });
382 continue;
383 }
384
385 let diff =
386 compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
387 if let Err(e) = ctx.invoker.finalize_reload(&diff, ctx.generation).await {
388 errors.push(ReloadError {
389 route_id: route_id.clone(),
390 action: "Finalize".into(),
391 error: CamelError::ProcessorError(format!("{e}")),
392 });
393 }
394 } else {
395 if let Err(e) = controller.add_route_definition(def).await {
396 errors.push(ReloadError {
397 route_id,
398 action: "Add".into(),
399 error: e,
400 });
401 continue;
402 }
403 }
404
405 let start_result = controller
406 .execute_runtime_command(camel_api::RuntimeCommand::StartRoute {
407 route_id: route_id.clone(),
408 command_id: next_reload_command_id("add-start", &route_id),
409 causation_id: None,
410 })
411 .await;
412 if let Err(e) = start_result {
413 errors.push(ReloadError {
414 route_id,
415 action: "Add (start)".into(),
416 error: e,
417 });
418 } else {
419 tracing::info!(route_id = %route_id, "hot-reload: added and started route");
420 }
421 }
422
423 ReloadAction::Remove { route_id } => {
424 if let Some(ctx) = function_ctx {
425 let diff =
426 compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
427
428 let runtime_status = match controller.runtime_route_status(&route_id).await {
429 Ok(status) => status,
430 Err(e) => {
431 errors.push(ReloadError {
432 route_id: route_id.clone(),
433 action: "Remove (status)".into(),
434 error: e,
435 });
436 continue;
437 }
438 };
439
440 if should_stop_before_mutation(runtime_status.as_deref()) {
441 let stop_result = controller
442 .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
443 route_id: route_id.clone(),
444 command_id: next_reload_command_id("remove-stop", &route_id),
445 causation_id: None,
446 })
447 .await;
448 if let Err(e) = stop_result
449 && !is_invalid_stop_transition(&e)
450 {
451 errors.push(ReloadError {
452 route_id: route_id.clone(),
453 action: "Remove (stop)".into(),
454 error: e,
455 });
456 continue;
457 }
458
459 let _ = drain_route(&route_id, "remove", controller, drain_timeout).await;
460 }
461
462 if let Err(e) = controller
463 .remove_route_preserving_functions(route_id.clone())
464 .await
465 {
466 errors.push(ReloadError {
467 route_id,
468 action: "Remove".into(),
469 error: e,
470 });
471 continue;
472 }
473
474 if let Err(e) = ctx.invoker.finalize_reload(&diff, ctx.generation).await {
475 errors.push(ReloadError {
476 route_id: route_id.clone(),
477 action: "Finalize".into(),
478 error: CamelError::ProcessorError(format!("{e}")),
479 });
480 }
481
482 tracing::info!(route_id = %route_id, "hot-reload: stopped and removed route");
483 } else {
484 let runtime_status = match controller.runtime_route_status(&route_id).await {
485 Ok(status) => status,
486 Err(e) => {
487 errors.push(ReloadError {
488 route_id,
489 action: "Remove (status)".into(),
490 error: e,
491 });
492 continue;
493 }
494 };
495
496 if should_stop_before_mutation(runtime_status.as_deref()) {
497 let stop_result = controller
498 .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
499 route_id: route_id.clone(),
500 command_id: next_reload_command_id("remove-stop", &route_id),
501 causation_id: None,
502 })
503 .await;
504 if let Err(e) = stop_result
505 && !is_invalid_stop_transition(&e)
506 {
507 errors.push(ReloadError {
508 route_id: route_id.clone(),
509 action: "Remove (stop)".into(),
510 error: e,
511 });
512 continue;
513 }
514
515 let _ = drain_route(&route_id, "remove", controller, drain_timeout).await;
516 }
517
518 let remove_result = controller
519 .execute_runtime_command(camel_api::RuntimeCommand::RemoveRoute {
520 route_id: route_id.clone(),
521 command_id: next_reload_command_id("remove", &route_id),
522 causation_id: None,
523 })
524 .await;
525 match remove_result {
526 Ok(_) => {
527 tracing::info!(route_id = %route_id, "hot-reload: stopped and removed route");
528 }
529 Err(e) => {
530 errors.push(ReloadError {
531 route_id,
532 action: "Remove".into(),
533 error: e,
534 });
535 }
536 }
537 }
538 }
539
540 ReloadAction::Restart { route_id } => {
541 tracing::info!(route_id = %route_id, "hot-reload: restarting route (from_uri changed)");
542
543 let def_pos = new_definitions
544 .iter()
545 .position(|d| d.route_id() == route_id);
546 let def = match def_pos {
547 Some(pos) => new_definitions.remove(pos),
548 None => {
549 errors.push(ReloadError {
550 route_id: route_id.clone(),
551 action: "Restart".into(),
552 error: CamelError::RouteError(format!(
553 "No definition found for route '{}'",
554 route_id
555 )),
556 });
557 continue;
558 }
559 };
560
561 if let Some(ctx) = function_ctx {
562 let prepared = match controller
563 .prepare_route_definition_with_generation(def, ctx.generation)
564 .await
565 {
566 Ok(p) => p,
567 Err(e) => {
568 errors.push(ReloadError {
569 route_id,
570 action: "Restart (prepare-route)".into(),
571 error: e,
572 });
573 continue;
574 }
575 };
576
577 let diff =
578 compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
579 let prepare_token = match ctx.invoker.prepare_reload(diff, ctx.generation).await
580 {
581 Ok(token) => token,
582 Err(e) => {
583 errors.push(ReloadError {
584 route_id,
585 action: "Restart (prepare)".into(),
586 error: CamelError::ProcessorError(format!("{e}")),
587 });
588 continue;
589 }
590 };
591
592 let runtime_status = match controller.runtime_route_status(&route_id).await {
593 Ok(status) => status,
594 Err(e) => {
595 let _ = ctx
596 .invoker
597 .rollback_reload(prepare_token, ctx.generation)
598 .await;
599 errors.push(ReloadError {
600 route_id,
601 action: "Restart (status)".into(),
602 error: e,
603 });
604 continue;
605 }
606 };
607
608 if should_stop_before_mutation(runtime_status.as_deref()) {
609 let stop_result = controller
610 .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
611 route_id: route_id.clone(),
612 command_id: next_reload_command_id("restart-stop", &route_id),
613 causation_id: None,
614 })
615 .await;
616 if let Err(e) = stop_result
617 && !is_invalid_stop_transition(&e)
618 {
619 let _ = ctx
620 .invoker
621 .rollback_reload(prepare_token, ctx.generation)
622 .await;
623 errors.push(ReloadError {
624 route_id,
625 action: "Restart (stop)".into(),
626 error: e,
627 });
628 continue;
629 }
630
631 let _ = drain_route(&route_id, "restart", controller, drain_timeout).await;
632 }
633
634 if let Err(e) = controller
635 .remove_route_preserving_functions(route_id.clone())
636 .await
637 {
638 let _ = ctx
639 .invoker
640 .rollback_reload(prepare_token, ctx.generation)
641 .await;
642 errors.push(ReloadError {
643 route_id,
644 action: "Restart (remove)".into(),
645 error: e,
646 });
647 continue;
648 }
649
650 if let Err(e) = controller.insert_prepared_route(prepared).await {
651 let _ = ctx
652 .invoker
653 .rollback_reload(prepare_token, ctx.generation)
654 .await;
655 errors.push(ReloadError {
656 route_id,
657 action: "Restart (insert)".into(),
658 error: e,
659 });
660 continue;
661 }
662
663 let diff =
664 compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
665 if let Err(e) = ctx.invoker.finalize_reload(&diff, ctx.generation).await {
666 errors.push(ReloadError {
667 route_id: route_id.clone(),
668 action: "Finalize".into(),
669 error: CamelError::ProcessorError(format!("{e}")),
670 });
671 }
672
673 if should_start_after_restart(runtime_status.as_deref()) {
674 let start_result = controller
675 .execute_runtime_command(camel_api::RuntimeCommand::StartRoute {
676 route_id: route_id.clone(),
677 command_id: next_reload_command_id("restart-start", &route_id),
678 causation_id: None,
679 })
680 .await;
681 if let Err(e) = start_result {
682 errors.push(ReloadError {
683 route_id,
684 action: "Restart (start)".into(),
685 error: e,
686 });
687 } else {
688 tracing::info!(
689 route_id = %route_id,
690 "hot-reload: route restarted successfully"
691 );
692 }
693 } else {
694 tracing::info!(
695 route_id = %route_id,
696 "hot-reload: restart applied while preserving stopped lifecycle state"
697 );
698 }
699 } else {
700 let runtime_status = match controller.runtime_route_status(&route_id).await {
701 Ok(status) => status,
702 Err(e) => {
703 errors.push(ReloadError {
704 route_id,
705 action: "Restart (status)".into(),
706 error: e,
707 });
708 continue;
709 }
710 };
711
712 if should_stop_before_mutation(runtime_status.as_deref()) {
713 let stop_result = controller
714 .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
715 route_id: route_id.clone(),
716 command_id: next_reload_command_id("restart-stop", &route_id),
717 causation_id: None,
718 })
719 .await;
720 if let Err(e) = stop_result
721 && !is_invalid_stop_transition(&e)
722 {
723 errors.push(ReloadError {
724 route_id,
725 action: "Restart (stop)".into(),
726 error: e,
727 });
728 continue;
729 }
730
731 let _ = drain_route(&route_id, "restart", controller, drain_timeout).await;
732 }
733
734 if let Err(e) = controller
735 .execute_runtime_command(camel_api::RuntimeCommand::RemoveRoute {
736 route_id: route_id.clone(),
737 command_id: next_reload_command_id("restart-remove", &route_id),
738 causation_id: None,
739 })
740 .await
741 {
742 errors.push(ReloadError {
743 route_id,
744 action: "Restart (remove)".into(),
745 error: e,
746 });
747 continue;
748 }
749
750 if let Err(e) = controller.add_route_definition(def).await {
751 errors.push(ReloadError {
752 route_id,
753 action: "Restart (add)".into(),
754 error: e,
755 });
756 continue;
757 }
758
759 if should_start_after_restart(runtime_status.as_deref()) {
760 let start_result = controller
761 .execute_runtime_command(camel_api::RuntimeCommand::StartRoute {
762 route_id: route_id.clone(),
763 command_id: next_reload_command_id("restart-start", &route_id),
764 causation_id: None,
765 })
766 .await;
767 if let Err(e) = start_result {
768 errors.push(ReloadError {
769 route_id,
770 action: "Restart (start)".into(),
771 error: e,
772 });
773 } else {
774 tracing::info!(
775 route_id = %route_id,
776 "hot-reload: route restarted successfully"
777 );
778 }
779 } else {
780 tracing::info!(
781 route_id = %route_id,
782 "hot-reload: restart applied while preserving stopped lifecycle state"
783 );
784 }
785 }
786 }
787
788 ReloadAction::Skip { route_id } => {
789 tracing::debug!(route_id = %route_id, "hot-reload: skipped unchanged route");
790 }
791 }
792 }
793
794 errors
795}
796
797#[cfg(test)]
798mod tests {
799 use super::*;
800 use crate::lifecycle::adapters::route_controller::DefaultRouteController;
801 use crate::shared::components::domain::Registry;
802 use camel_api::function::{FunctionDiff, FunctionId};
803 use camel_api::{Exchange, ExchangePatch, FunctionInvocationError, FunctionInvoker};
804 use std::sync::Arc;
805 use std::time::Duration;
806
807 fn make_controller() -> DefaultRouteController {
808 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
809 DefaultRouteController::new(
810 registry,
811 Arc::new(camel_api::NoopPlatformService::default()),
812 )
813 }
814
815 #[test]
816 fn helper_next_reload_command_id_increments_and_keeps_prefix() {
817 let one = next_reload_command_id("restart-stop", "r1");
818 let two = next_reload_command_id("restart-stop", "r1");
819 assert!(one.starts_with("reload:restart-stop:r1:"));
820 assert!(two.starts_with("reload:restart-stop:r1:"));
821 assert_ne!(one, two);
822 }
823
824 #[test]
825 fn helper_should_stop_before_mutation_respects_status() {
826 assert!(!should_stop_before_mutation(Some("Registered")));
827 assert!(!should_stop_before_mutation(Some("Stopped")));
828 assert!(should_stop_before_mutation(Some("Started")));
829 assert!(should_stop_before_mutation(None));
830 }
831
832 #[test]
833 fn helper_should_start_after_restart_respects_status() {
834 assert!(!should_start_after_restart(Some("Registered")));
835 assert!(!should_start_after_restart(Some("Stopped")));
836 assert!(should_start_after_restart(Some("Started")));
837 assert!(should_start_after_restart(None));
838 }
839
840 #[test]
841 fn helper_invalid_stop_transition_detects_marker() {
842 let err = CamelError::RouteError("invalid transition: Started -> Started".into());
843 assert!(is_invalid_stop_transition(&err));
844
845 let other = CamelError::RouteError("route missing".into());
846 assert!(!is_invalid_stop_transition(&other));
847 }
848
849 #[test]
850 fn test_new_route_detected_as_add() {
851 let controller = make_controller();
852 let defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("new-route")];
853 let actions = compute_reload_actions(&defs, &controller);
854 assert_eq!(
855 actions,
856 vec![ReloadAction::Add {
857 route_id: "new-route".into()
858 }]
859 );
860 }
861
862 #[tokio::test]
863 async fn test_removed_route_detected() {
864 let mut controller = make_controller();
865 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("old-route");
866 controller.add_route(def).await.unwrap();
867
868 let actions = compute_reload_actions(&[], &controller);
869 assert_eq!(
870 actions,
871 vec![ReloadAction::Remove {
872 route_id: "old-route".into()
873 }]
874 );
875 }
876
877 #[tokio::test]
878 async fn test_same_from_uri_detected_as_swap() {
879 let mut controller = make_controller();
880 let def = RouteDefinition::new("timer:tick", vec![])
881 .with_route_id("my-route")
882 .with_source_hash(100);
883 controller.add_route(def).await.unwrap();
884
885 let new_defs = vec![
886 RouteDefinition::new("timer:tick", vec![])
887 .with_route_id("my-route")
888 .with_source_hash(200),
889 ];
890 let actions = compute_reload_actions(&new_defs, &controller);
891 assert_eq!(
892 actions,
893 vec![ReloadAction::Swap {
894 route_id: "my-route".into()
895 }]
896 );
897 }
898
899 #[tokio::test]
900 async fn test_changed_from_uri_detected_as_restart() {
901 let mut controller = make_controller();
902 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
903 controller.add_route(def).await.unwrap();
904
905 let new_defs =
906 vec![RouteDefinition::new("timer:tock?period=500", vec![]).with_route_id("my-route")];
907 let actions = compute_reload_actions(&new_defs, &controller);
908 assert_eq!(
909 actions,
910 vec![ReloadAction::Restart {
911 route_id: "my-route".into()
912 }]
913 );
914 }
915
916 #[tokio::test]
917 async fn test_runtime_snapshot_drives_remove_set() {
918 let mut controller = make_controller();
919 controller
920 .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("runtime-route"))
921 .await
922 .unwrap();
923 controller
924 .add_route(RouteDefinition::new("timer:ghost", vec![]).with_route_id("ghost-route"))
925 .await
926 .unwrap();
927
928 let runtime_ids = vec!["runtime-route".to_string()];
929 let actions =
930 compute_reload_actions_from_runtime_snapshot(&[], &runtime_ids, &|_id: &str| None);
931 assert_eq!(
932 actions,
933 vec![ReloadAction::Remove {
934 route_id: "runtime-route".into()
935 }]
936 );
937 }
938
939 #[test]
940 fn test_runtime_snapshot_existing_routes_map_to_restart() {
941 let defs = vec![
942 RouteDefinition::new("timer:tick", vec![])
943 .with_route_id("runtime-r1")
944 .with_source_hash(10),
945 RouteDefinition::new("timer:tock", vec![])
946 .with_route_id("runtime-r2")
947 .with_source_hash(20),
948 ];
949 let runtime_ids = vec!["runtime-r1".to_string(), "runtime-r2".to_string()];
950 let runtime_hashes = std::collections::HashMap::from([
951 ("runtime-r1".to_string(), 11u64),
952 ("runtime-r2".to_string(), 22u64),
953 ]);
954
955 let actions =
956 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
957 runtime_hashes.get(id).copied()
958 });
959 assert_eq!(
960 actions,
961 vec![
962 ReloadAction::Restart {
963 route_id: "runtime-r1".into()
964 },
965 ReloadAction::Restart {
966 route_id: "runtime-r2".into()
967 }
968 ]
969 );
970 }
971
972 #[tokio::test]
973 async fn test_same_hash_detected_as_skip() {
974 let mut controller = make_controller();
975 let def = RouteDefinition::new("timer:tick", vec![])
976 .with_route_id("my-route")
977 .with_source_hash(42);
978 controller.add_route(def).await.unwrap();
979
980 let new_defs = vec![
981 RouteDefinition::new("timer:tick", vec![])
982 .with_route_id("my-route")
983 .with_source_hash(42),
984 ];
985 let actions = compute_reload_actions(&new_defs, &controller);
986 assert_eq!(
987 actions,
988 vec![ReloadAction::Skip {
989 route_id: "my-route".into()
990 }]
991 );
992 }
993
994 #[tokio::test]
995 async fn test_none_hash_detected_as_swap() {
996 let mut controller = make_controller();
997 let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
998 controller.add_route(def).await.unwrap();
999
1000 let new_defs = vec![
1001 RouteDefinition::new("timer:tick", vec![])
1002 .with_route_id("my-route")
1003 .with_source_hash(99),
1004 ];
1005 let actions = compute_reload_actions(&new_defs, &controller);
1006 assert_eq!(
1007 actions,
1008 vec![ReloadAction::Swap {
1009 route_id: "my-route".into()
1010 }]
1011 );
1012 }
1013
1014 #[test]
1015 fn test_runtime_snapshot_same_hash_detected_as_skip() {
1016 let defs = vec![
1017 RouteDefinition::new("timer:tick", vec![])
1018 .with_route_id("r1")
1019 .with_source_hash(42),
1020 ];
1021 let runtime_ids = vec!["r1".to_string()];
1022 let runtime_hashes = std::collections::HashMap::from([("r1".to_string(), 42u64)]);
1023
1024 let actions =
1025 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
1026 runtime_hashes.get(id).copied()
1027 });
1028 assert_eq!(
1029 actions,
1030 vec![ReloadAction::Skip {
1031 route_id: "r1".into()
1032 }]
1033 );
1034 }
1035
1036 #[test]
1037 fn test_runtime_snapshot_mixed_actions_cover_all_decisions() {
1038 let defs = vec![
1039 RouteDefinition::new("timer:tick", vec![])
1040 .with_route_id("existing-same")
1041 .with_source_hash(10),
1042 RouteDefinition::new("timer:tock", vec![])
1043 .with_route_id("existing-diff")
1044 .with_source_hash(20),
1045 RouteDefinition::new("timer:new", vec![])
1046 .with_route_id("brand-new")
1047 .with_source_hash(30),
1048 ];
1049 let runtime_ids = vec![
1050 "existing-same".to_string(),
1051 "existing-diff".to_string(),
1052 "orphan".to_string(),
1053 ];
1054 let runtime_hashes = std::collections::HashMap::from([
1055 ("existing-same".to_string(), 10u64),
1056 ("existing-diff".to_string(), 999u64),
1057 ("orphan".to_string(), 77u64),
1058 ]);
1059
1060 let actions =
1061 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
1062 runtime_hashes.get(id).copied()
1063 });
1064
1065 assert_eq!(
1066 actions,
1067 vec![
1068 ReloadAction::Skip {
1069 route_id: "existing-same".into()
1070 },
1071 ReloadAction::Restart {
1072 route_id: "existing-diff".into()
1073 },
1074 ReloadAction::Add {
1075 route_id: "brand-new".into()
1076 },
1077 ReloadAction::Remove {
1078 route_id: "orphan".into()
1079 }
1080 ]
1081 );
1082 }
1083
1084 #[test]
1085 fn test_runtime_snapshot_missing_runtime_hash_for_existing_route_restarts() {
1086 let defs = vec![
1087 RouteDefinition::new("timer:tick", vec![])
1088 .with_route_id("r1")
1089 .with_source_hash(42),
1090 ];
1091 let runtime_ids = vec!["r1".to_string()];
1092
1093 let actions =
1094 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|_id: &str| None);
1095
1096 assert_eq!(
1097 actions,
1098 vec![ReloadAction::Restart {
1099 route_id: "r1".into()
1100 }]
1101 );
1102 }
1103
1104 #[test]
1105 fn test_runtime_snapshot_missing_new_hash_for_existing_route_restarts() {
1106 let defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("r1")];
1107 let runtime_ids = vec!["r1".to_string()];
1108 let runtime_hashes = std::collections::HashMap::from([("r1".to_string(), 42u64)]);
1109
1110 let actions =
1111 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
1112 runtime_hashes.get(id).copied()
1113 });
1114
1115 assert_eq!(
1116 actions,
1117 vec![ReloadAction::Restart {
1118 route_id: "r1".into()
1119 }]
1120 );
1121 }
1122
1123 #[test]
1124 fn test_runtime_snapshot_new_only_route_maps_to_add() {
1125 let defs = vec![
1126 RouteDefinition::new("timer:tick", vec![])
1127 .with_route_id("new-only")
1128 .with_source_hash(1),
1129 ];
1130 let runtime_ids: Vec<String> = vec![];
1131
1132 let actions =
1133 compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|_id: &str| None);
1134
1135 assert_eq!(
1136 actions,
1137 vec![ReloadAction::Add {
1138 route_id: "new-only".into()
1139 }]
1140 );
1141 }
1142
1143 #[tokio::test]
1147 async fn test_execute_add_action_inserts_route() {
1148 use crate::CamelContext;
1149 use camel_component_timer::TimerComponent;
1150
1151 let mut ctx = CamelContext::builder().build().await.unwrap();
1152 ctx.register_component(TimerComponent::new());
1153 ctx.start().await.unwrap();
1154
1155 let def = RouteDefinition::new("timer:tick?period=50&repeatCount=1", vec![])
1156 .with_route_id("exec-add-test");
1157 let actions = vec![ReloadAction::Add {
1158 route_id: "exec-add-test".into(),
1159 }];
1160 let errors = execute_reload_actions(
1161 actions,
1162 vec![def],
1163 &ctx.runtime_execution_handle(),
1164 Duration::from_secs(10),
1165 None,
1166 )
1167 .await;
1168 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1169
1170 assert_eq!(
1171 ctx.runtime_execution_handle()
1172 .controller_route_count_for_test()
1173 .await,
1174 1
1175 );
1176
1177 ctx.stop().await.unwrap();
1178 }
1179
1180 #[tokio::test]
1181 async fn test_execute_remove_action_deletes_route() {
1182 use crate::CamelContext;
1183 use camel_component_timer::TimerComponent;
1184
1185 let mut ctx = CamelContext::builder().build().await.unwrap();
1186 ctx.register_component(TimerComponent::new());
1187 ctx.start().await.unwrap();
1188
1189 let def =
1191 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-remove-test");
1192 ctx.add_route_definition(def).await.unwrap();
1193 assert_eq!(
1194 ctx.runtime_execution_handle()
1195 .controller_route_count_for_test()
1196 .await,
1197 1
1198 );
1199
1200 let actions = vec![ReloadAction::Remove {
1201 route_id: "exec-remove-test".into(),
1202 }];
1203 let errors = execute_reload_actions(
1204 actions,
1205 vec![],
1206 &ctx.runtime_execution_handle(),
1207 Duration::from_secs(10),
1208 None,
1209 )
1210 .await;
1211 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1212
1213 assert_eq!(
1214 ctx.runtime_execution_handle()
1215 .controller_route_count_for_test()
1216 .await,
1217 0
1218 );
1219
1220 ctx.stop().await.unwrap();
1221 }
1222
1223 #[tokio::test]
1224 async fn test_execute_swap_action_replaces_pipeline() {
1225 use crate::CamelContext;
1226 use camel_component_timer::TimerComponent;
1227
1228 let mut ctx = CamelContext::builder().build().await.unwrap();
1229 ctx.register_component(TimerComponent::new());
1230 ctx.start().await.unwrap();
1231
1232 let def =
1234 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
1235 ctx.add_route_definition(def).await.unwrap();
1236
1237 let new_def =
1239 RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
1240 let actions = vec![ReloadAction::Swap {
1241 route_id: "exec-swap-test".into(),
1242 }];
1243 let errors = execute_reload_actions(
1244 actions,
1245 vec![new_def],
1246 &ctx.runtime_execution_handle(),
1247 Duration::from_secs(10),
1248 None,
1249 )
1250 .await;
1251 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1252
1253 assert_eq!(
1255 ctx.runtime_execution_handle()
1256 .controller_route_count_for_test()
1257 .await,
1258 1
1259 );
1260
1261 ctx.stop().await.unwrap();
1262 }
1263
1264 #[tokio::test]
1265 async fn test_execute_restart_action_preserves_registered_lifecycle_state() {
1266 use crate::CamelContext;
1267 use camel_api::{RuntimeQuery, RuntimeQueryResult};
1268 use camel_component_timer::TimerComponent;
1269
1270 let mut ctx = CamelContext::builder().build().await.unwrap();
1271 ctx.register_component(TimerComponent::new());
1272 ctx.start().await.unwrap();
1273
1274 let initial = RouteDefinition::new("timer:tick?period=100", vec![])
1276 .with_route_id("exec-restart-test");
1277 ctx.add_route_definition(initial).await.unwrap();
1278
1279 let before = ctx
1281 .runtime()
1282 .ask(RuntimeQuery::GetRouteStatus {
1283 route_id: "exec-restart-test".into(),
1284 })
1285 .await
1286 .unwrap();
1287 match before {
1288 RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
1289 other => panic!("unexpected query result: {other:?}"),
1290 }
1291
1292 let replacement = RouteDefinition::new("timer:tick?period=250", vec![])
1293 .with_route_id("exec-restart-test");
1294 let actions = vec![ReloadAction::Restart {
1295 route_id: "exec-restart-test".into(),
1296 }];
1297 let errors = execute_reload_actions(
1298 actions,
1299 vec![replacement],
1300 &ctx.runtime_execution_handle(),
1301 Duration::from_secs(10),
1302 None,
1303 )
1304 .await;
1305 assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1306
1307 let after = ctx
1310 .runtime()
1311 .ask(RuntimeQuery::GetRouteStatus {
1312 route_id: "exec-restart-test".into(),
1313 })
1314 .await
1315 .unwrap();
1316 match after {
1317 RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
1318 other => panic!("unexpected query result: {other:?}"),
1319 }
1320
1321 assert_eq!(
1322 ctx.runtime_route_status("exec-restart-test").await.unwrap(),
1323 Some("Registered".to_string())
1324 );
1325
1326 ctx.stop().await.unwrap();
1327 }
1328
1329 #[tokio::test]
1330 async fn test_execute_swap_action_missing_definition_returns_error() {
1331 use crate::CamelContext;
1332
1333 let ctx = CamelContext::builder().build().await.unwrap();
1334 let errors = execute_reload_actions(
1335 vec![ReloadAction::Swap {
1336 route_id: "missing-swap-def".into(),
1337 }],
1338 vec![],
1339 &ctx.runtime_execution_handle(),
1340 Duration::from_millis(1),
1341 None,
1342 )
1343 .await;
1344
1345 assert_eq!(errors.len(), 1);
1346 assert_eq!(errors[0].action, "Swap");
1347 assert_eq!(errors[0].route_id, "missing-swap-def");
1348 }
1349
1350 #[tokio::test]
1351 async fn test_execute_add_action_missing_definition_returns_error() {
1352 use crate::CamelContext;
1353
1354 let ctx = CamelContext::builder().build().await.unwrap();
1355 let errors = execute_reload_actions(
1356 vec![ReloadAction::Add {
1357 route_id: "missing-add-def".into(),
1358 }],
1359 vec![],
1360 &ctx.runtime_execution_handle(),
1361 Duration::from_millis(1),
1362 None,
1363 )
1364 .await;
1365
1366 assert_eq!(errors.len(), 1);
1367 assert_eq!(errors[0].action, "Add");
1368 assert_eq!(errors[0].route_id, "missing-add-def");
1369 }
1370
1371 #[tokio::test]
1372 async fn test_execute_remove_action_status_error_returns_error() {
1373 use crate::CamelContext;
1374
1375 let ctx = CamelContext::builder().build().await.unwrap();
1376 let errors = execute_reload_actions(
1377 vec![ReloadAction::Remove {
1378 route_id: "missing-remove-route".into(),
1379 }],
1380 vec![],
1381 &ctx.runtime_execution_handle(),
1382 Duration::from_millis(1),
1383 None,
1384 )
1385 .await;
1386
1387 assert_eq!(errors.len(), 1);
1388 assert!(errors[0].action.starts_with("Remove"));
1389 assert_eq!(errors[0].route_id, "missing-remove-route");
1390 }
1391
1392 #[tokio::test]
1393 async fn test_execute_restart_action_missing_definition_returns_error() {
1394 use crate::CamelContext;
1395
1396 let ctx = CamelContext::builder().build().await.unwrap();
1397 let errors = execute_reload_actions(
1398 vec![ReloadAction::Restart {
1399 route_id: "missing-restart-def".into(),
1400 }],
1401 vec![],
1402 &ctx.runtime_execution_handle(),
1403 Duration::from_millis(1),
1404 None,
1405 )
1406 .await;
1407
1408 assert_eq!(errors.len(), 1);
1409 assert_eq!(errors[0].action, "Restart");
1410 assert_eq!(errors[0].route_id, "missing-restart-def");
1411 }
1412
1413 #[tokio::test]
1414 async fn test_execute_skip_action_returns_no_errors() {
1415 use crate::CamelContext;
1416
1417 let ctx = CamelContext::builder().build().await.unwrap();
1418 let errors = execute_reload_actions(
1419 vec![ReloadAction::Skip {
1420 route_id: "skip-only-route".into(),
1421 }],
1422 vec![],
1423 &ctx.runtime_execution_handle(),
1424 Duration::from_millis(1),
1425 None,
1426 )
1427 .await;
1428
1429 assert!(errors.is_empty());
1430 }
1431
1432 #[test]
1433 fn compute_function_diff_all_added() {
1434 use camel_api::function::PrepareToken;
1435 use camel_api::{FunctionDefinition, FunctionInvokerSync};
1436 use std::sync::Mutex;
1437
1438 struct TestInvoker {
1439 staged: Mutex<Vec<(FunctionDefinition, Option<String>)>>,
1440 }
1441 impl FunctionInvokerSync for TestInvoker {
1442 fn stage_pending(&self, _def: FunctionDefinition, _route_id: Option<&str>, _gen: u64) {}
1443 fn discard_staging(&self, _generation: u64) {}
1444 fn begin_reload(&self) -> u64 {
1445 0
1446 }
1447 fn function_refs_for_route(
1448 &self,
1449 _route_id: &str,
1450 ) -> Vec<(FunctionId, Option<String>)> {
1451 vec![]
1452 }
1453 fn staged_refs_for_route(
1454 &self,
1455 _route_id: &str,
1456 _generation: u64,
1457 ) -> Vec<(FunctionId, Option<String>)> {
1458 vec![]
1459 }
1460 fn staged_defs_for_route(
1461 &self,
1462 _route_id: &str,
1463 _generation: u64,
1464 ) -> Vec<(FunctionDefinition, Option<String>)> {
1465 self.staged.lock().unwrap().clone()
1466 }
1467 }
1468 #[async_trait::async_trait]
1469 impl FunctionInvoker for TestInvoker {
1470 async fn register(
1471 &self,
1472 _def: FunctionDefinition,
1473 _route_id: Option<&str>,
1474 ) -> Result<(), FunctionInvocationError> {
1475 Ok(())
1476 }
1477 async fn unregister(
1478 &self,
1479 _id: &FunctionId,
1480 _route_id: Option<&str>,
1481 ) -> Result<(), FunctionInvocationError> {
1482 Ok(())
1483 }
1484 async fn invoke(
1485 &self,
1486 _id: &FunctionId,
1487 _exchange: &Exchange,
1488 ) -> Result<ExchangePatch, FunctionInvocationError> {
1489 Ok(ExchangePatch::default())
1490 }
1491 async fn prepare_reload(
1492 &self,
1493 _diff: FunctionDiff,
1494 _generation: u64,
1495 ) -> Result<PrepareToken, FunctionInvocationError> {
1496 Ok(PrepareToken::default())
1497 }
1498 async fn finalize_reload(
1499 &self,
1500 _diff: &FunctionDiff,
1501 _generation: u64,
1502 ) -> Result<(), FunctionInvocationError> {
1503 Ok(())
1504 }
1505 async fn rollback_reload(
1506 &self,
1507 _token: PrepareToken,
1508 _generation: u64,
1509 ) -> Result<(), FunctionInvocationError> {
1510 Ok(())
1511 }
1512 async fn commit_reload(
1513 &self,
1514 _diff: FunctionDiff,
1515 _generation: u64,
1516 ) -> Result<(), FunctionInvocationError> {
1517 Ok(())
1518 }
1519 async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
1520 Ok(())
1521 }
1522 }
1523
1524 let staged_def = FunctionDefinition {
1525 id: FunctionId::compute("deno", "fn1", 5000),
1526 runtime: "deno".into(),
1527 source: "fn1".into(),
1528 timeout_ms: 5000,
1529 route_id: Some("route-a".into()),
1530 step_index: Some(0),
1531 };
1532 let invoker: Arc<dyn FunctionInvoker> = Arc::new(TestInvoker {
1533 staged: Mutex::new(vec![(staged_def.clone(), Some("route-a".into()))]),
1534 });
1535
1536 let diff = compute_function_diff_for_route(&invoker, "route-a", 0);
1537 assert_eq!(diff.added.len(), 1);
1538 assert_eq!(diff.removed.len(), 0);
1539 assert_eq!(diff.unchanged.len(), 0);
1540 }
1541
1542 #[test]
1543 fn compute_function_diff_all_removed() {
1544 use camel_api::function::PrepareToken;
1545 use camel_api::{FunctionDefinition, FunctionInvokerSync};
1546
1547 struct TestInvoker;
1548 impl FunctionInvokerSync for TestInvoker {
1549 fn stage_pending(&self, _def: FunctionDefinition, _route_id: Option<&str>, _gen: u64) {}
1550 fn discard_staging(&self, _generation: u64) {}
1551 fn begin_reload(&self) -> u64 {
1552 0
1553 }
1554 fn function_refs_for_route(
1555 &self,
1556 _route_id: &str,
1557 ) -> Vec<(FunctionId, Option<String>)> {
1558 vec![(
1559 FunctionId::compute("deno", "old-fn", 5000),
1560 Some("route-b".into()),
1561 )]
1562 }
1563 fn staged_refs_for_route(
1564 &self,
1565 _route_id: &str,
1566 _generation: u64,
1567 ) -> Vec<(FunctionId, Option<String>)> {
1568 vec![]
1569 }
1570 fn staged_defs_for_route(
1571 &self,
1572 _route_id: &str,
1573 _generation: u64,
1574 ) -> Vec<(FunctionDefinition, Option<String>)> {
1575 vec![]
1576 }
1577 }
1578 #[async_trait::async_trait]
1579 impl FunctionInvoker for TestInvoker {
1580 async fn register(
1581 &self,
1582 _def: FunctionDefinition,
1583 _route_id: Option<&str>,
1584 ) -> Result<(), FunctionInvocationError> {
1585 Ok(())
1586 }
1587 async fn unregister(
1588 &self,
1589 _id: &FunctionId,
1590 _route_id: Option<&str>,
1591 ) -> Result<(), FunctionInvocationError> {
1592 Ok(())
1593 }
1594 async fn invoke(
1595 &self,
1596 _id: &FunctionId,
1597 _exchange: &Exchange,
1598 ) -> Result<ExchangePatch, FunctionInvocationError> {
1599 Ok(ExchangePatch::default())
1600 }
1601 async fn prepare_reload(
1602 &self,
1603 _diff: FunctionDiff,
1604 _generation: u64,
1605 ) -> Result<PrepareToken, FunctionInvocationError> {
1606 Ok(PrepareToken::default())
1607 }
1608 async fn finalize_reload(
1609 &self,
1610 _diff: &FunctionDiff,
1611 _generation: u64,
1612 ) -> Result<(), FunctionInvocationError> {
1613 Ok(())
1614 }
1615 async fn rollback_reload(
1616 &self,
1617 _token: PrepareToken,
1618 _generation: u64,
1619 ) -> Result<(), FunctionInvocationError> {
1620 Ok(())
1621 }
1622 async fn commit_reload(
1623 &self,
1624 _diff: FunctionDiff,
1625 _generation: u64,
1626 ) -> Result<(), FunctionInvocationError> {
1627 Ok(())
1628 }
1629 async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
1630 Ok(())
1631 }
1632 }
1633
1634 let invoker: Arc<dyn FunctionInvoker> = Arc::new(TestInvoker);
1635 let diff = compute_function_diff_for_route(&invoker, "route-b", 0);
1636 assert_eq!(diff.added.len(), 0);
1637 assert_eq!(diff.removed.len(), 1);
1638 assert_eq!(diff.unchanged.len(), 0);
1639 }
1640
1641 #[test]
1642 fn compute_function_diff_unchanged() {
1643 use camel_api::function::PrepareToken;
1644 use camel_api::{FunctionDefinition, FunctionInvokerSync};
1645
1646 let fn_id = FunctionId::compute("deno", "same-fn", 5000);
1647 let pair = (fn_id.clone(), Some("route-c".into()));
1648
1649 struct TestInvoker {
1650 pair: (FunctionId, Option<String>),
1651 }
1652 impl FunctionInvokerSync for TestInvoker {
1653 fn stage_pending(&self, _def: FunctionDefinition, _route_id: Option<&str>, _gen: u64) {}
1654 fn discard_staging(&self, _generation: u64) {}
1655 fn begin_reload(&self) -> u64 {
1656 0
1657 }
1658 fn function_refs_for_route(
1659 &self,
1660 _route_id: &str,
1661 ) -> Vec<(FunctionId, Option<String>)> {
1662 vec![self.pair.clone()]
1663 }
1664 fn staged_refs_for_route(
1665 &self,
1666 _route_id: &str,
1667 _generation: u64,
1668 ) -> Vec<(FunctionId, Option<String>)> {
1669 vec![self.pair.clone()]
1670 }
1671 fn staged_defs_for_route(
1672 &self,
1673 _route_id: &str,
1674 _generation: u64,
1675 ) -> Vec<(FunctionDefinition, Option<String>)> {
1676 vec![(
1677 FunctionDefinition {
1678 id: self.pair.0.clone(),
1679 runtime: "deno".into(),
1680 source: "same".into(),
1681 timeout_ms: 5000,
1682 route_id: Some("route-c".into()),
1683 step_index: Some(0),
1684 },
1685 self.pair.1.clone(),
1686 )]
1687 }
1688 }
1689 #[async_trait::async_trait]
1690 impl FunctionInvoker for TestInvoker {
1691 async fn register(
1692 &self,
1693 _def: FunctionDefinition,
1694 _route_id: Option<&str>,
1695 ) -> Result<(), FunctionInvocationError> {
1696 Ok(())
1697 }
1698 async fn unregister(
1699 &self,
1700 _id: &FunctionId,
1701 _route_id: Option<&str>,
1702 ) -> Result<(), FunctionInvocationError> {
1703 Ok(())
1704 }
1705 async fn invoke(
1706 &self,
1707 _id: &FunctionId,
1708 _exchange: &Exchange,
1709 ) -> Result<ExchangePatch, FunctionInvocationError> {
1710 Ok(ExchangePatch::default())
1711 }
1712 async fn prepare_reload(
1713 &self,
1714 _diff: FunctionDiff,
1715 _generation: u64,
1716 ) -> Result<PrepareToken, FunctionInvocationError> {
1717 Ok(PrepareToken::default())
1718 }
1719 async fn finalize_reload(
1720 &self,
1721 _diff: &FunctionDiff,
1722 _generation: u64,
1723 ) -> Result<(), FunctionInvocationError> {
1724 Ok(())
1725 }
1726 async fn rollback_reload(
1727 &self,
1728 _token: PrepareToken,
1729 _generation: u64,
1730 ) -> Result<(), FunctionInvocationError> {
1731 Ok(())
1732 }
1733 async fn commit_reload(
1734 &self,
1735 _diff: FunctionDiff,
1736 _generation: u64,
1737 ) -> Result<(), FunctionInvocationError> {
1738 Ok(())
1739 }
1740 async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
1741 Ok(())
1742 }
1743 }
1744
1745 let invoker: Arc<dyn FunctionInvoker> = Arc::new(TestInvoker { pair });
1746 let diff = compute_function_diff_for_route(&invoker, "route-c", 0);
1747 assert_eq!(diff.added.len(), 0);
1748 assert_eq!(diff.removed.len(), 0);
1749 assert_eq!(diff.unchanged.len(), 1);
1750 }
1751
1752 #[test]
1753 fn reload_error_debug_format() {
1754 let err = ReloadError {
1755 route_id: "r1".into(),
1756 action: "Swap".into(),
1757 error: CamelError::RouteError("test error".into()),
1758 };
1759 let debug = format!("{:?}", err);
1760 assert!(debug.contains("r1"));
1761 assert!(debug.contains("Swap"));
1762 }
1763}