Skip to main content

camel_core/hot_reload/application/
reload.rs

1//! Route reload coordinator.
2//!
3//! Compares a new set of route definitions against the currently running routes
4//! and computes the minimal set of actions: SWAP, RESTART, ADD, REMOVE, or SKIP.
5
6use camel_api::CamelError;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::Duration;
9
10use crate::context::RuntimeExecutionHandle;
11use crate::hot_reload::application::drain::drain_route;
12use crate::hot_reload::domain::ReloadAction;
13#[cfg(test)]
14use crate::lifecycle::adapters::route_controller::DefaultRouteController;
15use crate::lifecycle::application::route_definition::RouteDefinition;
16
17pub struct FunctionReloadContext {
18    pub invoker: std::sync::Arc<dyn camel_api::function::FunctionInvoker>,
19    pub generation: u64,
20}
21
22fn compute_function_diff_for_route(
23    invoker: &std::sync::Arc<dyn camel_api::function::FunctionInvoker>,
24    route_id: &str,
25    generation: u64,
26) -> camel_api::function::FunctionDiff {
27    let current_refs: std::collections::HashSet<_> = invoker
28        .function_refs_for_route(route_id)
29        .into_iter()
30        .collect();
31    let staged_defs = invoker.staged_defs_for_route(route_id, generation);
32    let staged_refs: std::collections::HashSet<_> = staged_defs
33        .iter()
34        .map(|(def, rid)| (def.id.clone(), rid.clone()))
35        .collect();
36
37    let removed: Vec<_> = current_refs.difference(&staged_refs).cloned().collect();
38    let added_refs: std::collections::HashSet<_> =
39        staged_refs.difference(&current_refs).cloned().collect();
40    let added: Vec<_> = staged_defs
41        .into_iter()
42        .filter(|(def, rid)| added_refs.contains(&(def.id.clone(), rid.clone())))
43        .collect();
44    let unchanged: Vec<_> = current_refs
45        .intersection(&staged_refs)
46        .map(|(id, _)| id.clone())
47        .collect();
48
49    camel_api::function::FunctionDiff {
50        added,
51        removed,
52        unchanged,
53    }
54}
55
56static RELOAD_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
57
58fn next_reload_command_id(op: &str, route_id: &str) -> String {
59    let seq = RELOAD_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
60    format!("reload:{op}:{route_id}:{seq}")
61}
62
63fn is_invalid_stop_transition(err: &CamelError) -> bool {
64    err.to_string().contains("invalid transition")
65}
66
67fn should_stop_before_mutation(runtime_status: Option<&str>) -> bool {
68    !matches!(runtime_status, Some("Registered" | "Stopped"))
69}
70
71fn should_start_after_restart(runtime_status: Option<&str>) -> bool {
72    !matches!(runtime_status, Some("Registered" | "Stopped"))
73}
74
75/// A non-fatal error during reload action execution.
76///
77/// The watcher logs these and continues watching for future changes.
78#[derive(Debug)]
79pub struct ReloadError {
80    pub route_id: String,
81    pub action: String,
82    pub error: CamelError,
83}
84
85/// Compute the diff between new definitions and active routes.
86#[cfg(test)]
87fn compute_reload_actions(
88    new_definitions: &[RouteDefinition],
89    controller: &DefaultRouteController,
90) -> Vec<ReloadAction> {
91    let active_ids: std::collections::HashSet<String> =
92        controller.route_ids().into_iter().collect();
93    let mut new_ids = std::collections::HashSet::new();
94    let mut actions = Vec::new();
95
96    for def in new_definitions {
97        let route_id = def.route_id().to_string();
98        new_ids.insert(route_id.clone());
99
100        if active_ids.contains(&route_id) {
101            // Route exists — check what changed
102            if let Some(from_uri) = controller.route_from_uri(&route_id) {
103                if from_uri != def.from_uri() {
104                    actions.push(ReloadAction::Restart { route_id });
105                } else {
106                    let existing_hash = controller.route_source_hash(&route_id);
107                    let new_hash = def.source_hash();
108                    match (existing_hash, new_hash) {
109                        (Some(h_existing), Some(h_new)) if h_existing == h_new => {
110                            actions.push(ReloadAction::Skip { route_id });
111                        }
112                        _ => {
113                            actions.push(ReloadAction::Swap { route_id });
114                        }
115                    }
116                }
117            }
118        } else {
119            actions.push(ReloadAction::Add { route_id });
120        }
121    }
122
123    // Routes in active but not in new definitions → remove
124    for id in &active_ids {
125        if !new_ids.contains(id) {
126            actions.push(ReloadAction::Remove {
127                route_id: id.clone(),
128            });
129        }
130    }
131
132    actions
133}
134
135/// Compute reload actions using runtime projection route IDs as primary source.
136///
137/// This variant is used by the file watcher hard-cut path where runtime projection
138/// is authoritative for route existence.
139pub(crate) fn compute_reload_actions_from_runtime_snapshot(
140    new_definitions: &[RouteDefinition],
141    runtime_route_ids: &[String],
142    runtime_source_hash: &dyn Fn(&str) -> Option<u64>,
143) -> Vec<ReloadAction> {
144    let active_ids: std::collections::HashSet<String> = runtime_route_ids.iter().cloned().collect();
145    let mut new_ids = std::collections::HashSet::new();
146    let mut actions = Vec::new();
147
148    for def in new_definitions {
149        let route_id = def.route_id().to_string();
150        new_ids.insert(route_id.clone());
151
152        if active_ids.contains(&route_id) {
153            let existing_hash = runtime_source_hash(&route_id);
154            let new_hash = def.source_hash();
155            match (existing_hash, new_hash) {
156                (Some(h_existing), Some(h_new)) if h_existing == h_new => {
157                    actions.push(ReloadAction::Skip { route_id });
158                }
159                _ => {
160                    actions.push(ReloadAction::Restart { route_id });
161                }
162            }
163        } else {
164            actions.push(ReloadAction::Add { route_id });
165        }
166    }
167
168    for id in &active_ids {
169        if !new_ids.contains(id) {
170            actions.push(ReloadAction::Remove {
171                route_id: id.clone(),
172            });
173        }
174    }
175
176    actions
177}
178
179/// Execute a list of reload actions against a live controller.
180///
181/// Non-fatal: errors for individual routes are collected and returned.
182/// The caller should log them as warnings and continue watching.
183///
184/// `new_definitions` is consumed — each definition is moved to the controller for Add/Swap/Restart.
185pub async fn execute_reload_actions(
186    actions: Vec<ReloadAction>,
187    mut new_definitions: Vec<RouteDefinition>,
188    controller: &RuntimeExecutionHandle,
189    drain_timeout: Duration,
190    function_ctx: Option<&FunctionReloadContext>,
191) -> Vec<ReloadError> {
192    let mut errors = Vec::new();
193
194    for action in actions {
195        match action {
196            ReloadAction::Swap { route_id } => {
197                let def_pos = new_definitions
198                    .iter()
199                    .position(|d| d.route_id() == route_id);
200                let def = match def_pos {
201                    Some(pos) => new_definitions.remove(pos),
202                    None => {
203                        errors.push(ReloadError {
204                            route_id: route_id.clone(),
205                            action: "Swap".into(),
206                            error: CamelError::RouteError(format!(
207                                "No definition found for route '{}'",
208                                route_id
209                            )),
210                        });
211                        continue;
212                    }
213                };
214
215                let in_flight = controller.in_flight_count(&route_id).await.unwrap_or(0);
216
217                let pipeline = if let Some(ctx) = function_ctx {
218                    controller
219                        .compile_route_definition_with_generation(def, ctx.generation)
220                        .await
221                } else {
222                    controller.compile_route_definition(def).await
223                };
224
225                match pipeline {
226                    Ok(p) => {
227                        let prepare_token = if let Some(ctx) = function_ctx {
228                            let diff = compute_function_diff_for_route(
229                                &ctx.invoker,
230                                &route_id,
231                                ctx.generation,
232                            );
233                            match ctx.invoker.prepare_reload(diff, ctx.generation).await {
234                                Ok(token) => Some(token),
235                                Err(e) => {
236                                    errors.push(ReloadError {
237                                        route_id: route_id.clone(),
238                                        action: "Swap (prepare)".into(),
239                                        error: CamelError::ProcessorError(format!("{e}")),
240                                    });
241                                    continue;
242                                }
243                            }
244                        } else {
245                            None
246                        };
247
248                        let result = controller.swap_route_pipeline(&route_id, p).await;
249                        if let Err(e) = result {
250                            if let Some(ctx) = function_ctx
251                                && let Some(ref token) = prepare_token
252                            {
253                                let _ = ctx
254                                    .invoker
255                                    .rollback_reload(token.clone(), ctx.generation)
256                                    .await;
257                            }
258                            errors.push(ReloadError {
259                                route_id,
260                                action: "Swap".into(),
261                                error: e,
262                            });
263                        } else {
264                            if let Some(ctx) = function_ctx {
265                                let diff = compute_function_diff_for_route(
266                                    &ctx.invoker,
267                                    &route_id,
268                                    ctx.generation,
269                                );
270                                if let Err(e) =
271                                    ctx.invoker.finalize_reload(&diff, ctx.generation).await
272                                {
273                                    errors.push(ReloadError {
274                                        route_id: route_id.clone(),
275                                        action: "Finalize".into(),
276                                        error: CamelError::ProcessorError(format!("{e}")),
277                                    });
278                                }
279                            }
280                            if in_flight > 0 {
281                                tracing::info!(
282                                    route_id = %route_id,
283                                    action = "swap",
284                                    in_flight = in_flight,
285                                    "hot-reload: swapped route pipeline ({} exchanges continuing with previous pipeline)",
286                                    in_flight
287                                );
288                            } else {
289                                tracing::info!(route_id = %route_id, "hot-reload: swapped route pipeline");
290                            }
291                        }
292                    }
293                    Err(e) => {
294                        errors.push(ReloadError {
295                            route_id,
296                            action: "Swap (compile)".into(),
297                            error: e,
298                        });
299                    }
300                }
301            }
302
303            ReloadAction::Add { route_id } => {
304                let def_pos = new_definitions
305                    .iter()
306                    .position(|d| d.route_id() == route_id);
307                let def = match def_pos {
308                    Some(pos) => new_definitions.remove(pos),
309                    None => {
310                        errors.push(ReloadError {
311                            route_id: route_id.clone(),
312                            action: "Add".into(),
313                            error: CamelError::RouteError(format!(
314                                "No definition found for route '{}'",
315                                route_id
316                            )),
317                        });
318                        continue;
319                    }
320                };
321
322                if let Some(ctx) = function_ctx {
323                    let prepared = match controller
324                        .prepare_route_definition_with_generation(def, ctx.generation)
325                        .await
326                    {
327                        Ok(p) => p,
328                        Err(e) => {
329                            ctx.invoker.discard_staging(ctx.generation);
330                            errors.push(ReloadError {
331                                route_id,
332                                action: "Add (prepare-route)".into(),
333                                error: e,
334                            });
335                            continue;
336                        }
337                    };
338
339                    let diff =
340                        compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
341
342                    let prepare_token = match ctx.invoker.prepare_reload(diff, ctx.generation).await
343                    {
344                        Ok(token) => token,
345                        Err(e) => {
346                            ctx.invoker.discard_staging(ctx.generation);
347                            errors.push(ReloadError {
348                                route_id,
349                                action: "Add (prepare)".into(),
350                                error: CamelError::ProcessorError(format!("{e}")),
351                            });
352                            continue;
353                        }
354                    };
355
356                    if let Err(e) = controller.insert_prepared_route(prepared).await {
357                        let _ = ctx
358                            .invoker
359                            .rollback_reload(prepare_token, ctx.generation)
360                            .await;
361                        errors.push(ReloadError {
362                            route_id,
363                            action: "Add (insert)".into(),
364                            error: e,
365                        });
366                        continue;
367                    }
368
369                    if let Err(e) = controller.register_route_aggregate(route_id.clone()).await {
370                        let _ = controller
371                            .remove_route_preserving_functions(route_id.clone())
372                            .await;
373                        let _ = ctx
374                            .invoker
375                            .rollback_reload(prepare_token, ctx.generation)
376                            .await;
377                        errors.push(ReloadError {
378                            route_id,
379                            action: "Add (aggregate)".into(),
380                            error: e,
381                        });
382                        continue;
383                    }
384
385                    let diff =
386                        compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
387                    if let Err(e) = ctx.invoker.finalize_reload(&diff, ctx.generation).await {
388                        errors.push(ReloadError {
389                            route_id: route_id.clone(),
390                            action: "Finalize".into(),
391                            error: CamelError::ProcessorError(format!("{e}")),
392                        });
393                    }
394                } else {
395                    if let Err(e) = controller.add_route_definition(def).await {
396                        errors.push(ReloadError {
397                            route_id,
398                            action: "Add".into(),
399                            error: e,
400                        });
401                        continue;
402                    }
403                }
404
405                let start_result = controller
406                    .execute_runtime_command(camel_api::RuntimeCommand::StartRoute {
407                        route_id: route_id.clone(),
408                        command_id: next_reload_command_id("add-start", &route_id),
409                        causation_id: None,
410                    })
411                    .await;
412                if let Err(e) = start_result {
413                    errors.push(ReloadError {
414                        route_id,
415                        action: "Add (start)".into(),
416                        error: e,
417                    });
418                } else {
419                    tracing::info!(route_id = %route_id, "hot-reload: added and started route");
420                }
421            }
422
423            ReloadAction::Remove { route_id } => {
424                if let Some(ctx) = function_ctx {
425                    let diff =
426                        compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
427
428                    let runtime_status = match controller.runtime_route_status(&route_id).await {
429                        Ok(status) => status,
430                        Err(e) => {
431                            errors.push(ReloadError {
432                                route_id: route_id.clone(),
433                                action: "Remove (status)".into(),
434                                error: e,
435                            });
436                            continue;
437                        }
438                    };
439
440                    if should_stop_before_mutation(runtime_status.as_deref()) {
441                        let stop_result = controller
442                            .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
443                                route_id: route_id.clone(),
444                                command_id: next_reload_command_id("remove-stop", &route_id),
445                                causation_id: None,
446                            })
447                            .await;
448                        if let Err(e) = stop_result
449                            && !is_invalid_stop_transition(&e)
450                        {
451                            errors.push(ReloadError {
452                                route_id: route_id.clone(),
453                                action: "Remove (stop)".into(),
454                                error: e,
455                            });
456                            continue;
457                        }
458
459                        let _ = drain_route(&route_id, "remove", controller, drain_timeout).await;
460                    }
461
462                    if let Err(e) = controller
463                        .remove_route_preserving_functions(route_id.clone())
464                        .await
465                    {
466                        errors.push(ReloadError {
467                            route_id,
468                            action: "Remove".into(),
469                            error: e,
470                        });
471                        continue;
472                    }
473
474                    if let Err(e) = ctx.invoker.finalize_reload(&diff, ctx.generation).await {
475                        errors.push(ReloadError {
476                            route_id: route_id.clone(),
477                            action: "Finalize".into(),
478                            error: CamelError::ProcessorError(format!("{e}")),
479                        });
480                    }
481
482                    tracing::info!(route_id = %route_id, "hot-reload: stopped and removed route");
483                } else {
484                    let runtime_status = match controller.runtime_route_status(&route_id).await {
485                        Ok(status) => status,
486                        Err(e) => {
487                            errors.push(ReloadError {
488                                route_id,
489                                action: "Remove (status)".into(),
490                                error: e,
491                            });
492                            continue;
493                        }
494                    };
495
496                    if should_stop_before_mutation(runtime_status.as_deref()) {
497                        let stop_result = controller
498                            .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
499                                route_id: route_id.clone(),
500                                command_id: next_reload_command_id("remove-stop", &route_id),
501                                causation_id: None,
502                            })
503                            .await;
504                        if let Err(e) = stop_result
505                            && !is_invalid_stop_transition(&e)
506                        {
507                            errors.push(ReloadError {
508                                route_id: route_id.clone(),
509                                action: "Remove (stop)".into(),
510                                error: e,
511                            });
512                            continue;
513                        }
514
515                        let _ = drain_route(&route_id, "remove", controller, drain_timeout).await;
516                    }
517
518                    let remove_result = controller
519                        .execute_runtime_command(camel_api::RuntimeCommand::RemoveRoute {
520                            route_id: route_id.clone(),
521                            command_id: next_reload_command_id("remove", &route_id),
522                            causation_id: None,
523                        })
524                        .await;
525                    match remove_result {
526                        Ok(_) => {
527                            tracing::info!(route_id = %route_id, "hot-reload: stopped and removed route");
528                        }
529                        Err(e) => {
530                            errors.push(ReloadError {
531                                route_id,
532                                action: "Remove".into(),
533                                error: e,
534                            });
535                        }
536                    }
537                }
538            }
539
540            ReloadAction::Restart { route_id } => {
541                tracing::info!(route_id = %route_id, "hot-reload: restarting route (from_uri changed)");
542
543                let def_pos = new_definitions
544                    .iter()
545                    .position(|d| d.route_id() == route_id);
546                let def = match def_pos {
547                    Some(pos) => new_definitions.remove(pos),
548                    None => {
549                        errors.push(ReloadError {
550                            route_id: route_id.clone(),
551                            action: "Restart".into(),
552                            error: CamelError::RouteError(format!(
553                                "No definition found for route '{}'",
554                                route_id
555                            )),
556                        });
557                        continue;
558                    }
559                };
560
561                if let Some(ctx) = function_ctx {
562                    let prepared = match controller
563                        .prepare_route_definition_with_generation(def, ctx.generation)
564                        .await
565                    {
566                        Ok(p) => p,
567                        Err(e) => {
568                            errors.push(ReloadError {
569                                route_id,
570                                action: "Restart (prepare-route)".into(),
571                                error: e,
572                            });
573                            continue;
574                        }
575                    };
576
577                    let diff =
578                        compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
579                    let prepare_token = match ctx.invoker.prepare_reload(diff, ctx.generation).await
580                    {
581                        Ok(token) => token,
582                        Err(e) => {
583                            errors.push(ReloadError {
584                                route_id,
585                                action: "Restart (prepare)".into(),
586                                error: CamelError::ProcessorError(format!("{e}")),
587                            });
588                            continue;
589                        }
590                    };
591
592                    let runtime_status = match controller.runtime_route_status(&route_id).await {
593                        Ok(status) => status,
594                        Err(e) => {
595                            let _ = ctx
596                                .invoker
597                                .rollback_reload(prepare_token, ctx.generation)
598                                .await;
599                            errors.push(ReloadError {
600                                route_id,
601                                action: "Restart (status)".into(),
602                                error: e,
603                            });
604                            continue;
605                        }
606                    };
607
608                    if should_stop_before_mutation(runtime_status.as_deref()) {
609                        let stop_result = controller
610                            .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
611                                route_id: route_id.clone(),
612                                command_id: next_reload_command_id("restart-stop", &route_id),
613                                causation_id: None,
614                            })
615                            .await;
616                        if let Err(e) = stop_result
617                            && !is_invalid_stop_transition(&e)
618                        {
619                            let _ = ctx
620                                .invoker
621                                .rollback_reload(prepare_token, ctx.generation)
622                                .await;
623                            errors.push(ReloadError {
624                                route_id,
625                                action: "Restart (stop)".into(),
626                                error: e,
627                            });
628                            continue;
629                        }
630
631                        let _ = drain_route(&route_id, "restart", controller, drain_timeout).await;
632                    }
633
634                    if let Err(e) = controller
635                        .remove_route_preserving_functions(route_id.clone())
636                        .await
637                    {
638                        let _ = ctx
639                            .invoker
640                            .rollback_reload(prepare_token, ctx.generation)
641                            .await;
642                        errors.push(ReloadError {
643                            route_id,
644                            action: "Restart (remove)".into(),
645                            error: e,
646                        });
647                        continue;
648                    }
649
650                    if let Err(e) = controller.insert_prepared_route(prepared).await {
651                        let _ = ctx
652                            .invoker
653                            .rollback_reload(prepare_token, ctx.generation)
654                            .await;
655                        errors.push(ReloadError {
656                            route_id,
657                            action: "Restart (insert)".into(),
658                            error: e,
659                        });
660                        continue;
661                    }
662
663                    let diff =
664                        compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
665                    if let Err(e) = ctx.invoker.finalize_reload(&diff, ctx.generation).await {
666                        errors.push(ReloadError {
667                            route_id: route_id.clone(),
668                            action: "Finalize".into(),
669                            error: CamelError::ProcessorError(format!("{e}")),
670                        });
671                    }
672
673                    if should_start_after_restart(runtime_status.as_deref()) {
674                        let start_result = controller
675                            .execute_runtime_command(camel_api::RuntimeCommand::StartRoute {
676                                route_id: route_id.clone(),
677                                command_id: next_reload_command_id("restart-start", &route_id),
678                                causation_id: None,
679                            })
680                            .await;
681                        if let Err(e) = start_result {
682                            errors.push(ReloadError {
683                                route_id,
684                                action: "Restart (start)".into(),
685                                error: e,
686                            });
687                        } else {
688                            tracing::info!(
689                                route_id = %route_id,
690                                "hot-reload: route restarted successfully"
691                            );
692                        }
693                    } else {
694                        tracing::info!(
695                            route_id = %route_id,
696                            "hot-reload: restart applied while preserving stopped lifecycle state"
697                        );
698                    }
699                } else {
700                    let runtime_status = match controller.runtime_route_status(&route_id).await {
701                        Ok(status) => status,
702                        Err(e) => {
703                            errors.push(ReloadError {
704                                route_id,
705                                action: "Restart (status)".into(),
706                                error: e,
707                            });
708                            continue;
709                        }
710                    };
711
712                    if should_stop_before_mutation(runtime_status.as_deref()) {
713                        let stop_result = controller
714                            .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
715                                route_id: route_id.clone(),
716                                command_id: next_reload_command_id("restart-stop", &route_id),
717                                causation_id: None,
718                            })
719                            .await;
720                        if let Err(e) = stop_result
721                            && !is_invalid_stop_transition(&e)
722                        {
723                            errors.push(ReloadError {
724                                route_id,
725                                action: "Restart (stop)".into(),
726                                error: e,
727                            });
728                            continue;
729                        }
730
731                        let _ = drain_route(&route_id, "restart", controller, drain_timeout).await;
732                    }
733
734                    if let Err(e) = controller
735                        .execute_runtime_command(camel_api::RuntimeCommand::RemoveRoute {
736                            route_id: route_id.clone(),
737                            command_id: next_reload_command_id("restart-remove", &route_id),
738                            causation_id: None,
739                        })
740                        .await
741                    {
742                        errors.push(ReloadError {
743                            route_id,
744                            action: "Restart (remove)".into(),
745                            error: e,
746                        });
747                        continue;
748                    }
749
750                    if let Err(e) = controller.add_route_definition(def).await {
751                        errors.push(ReloadError {
752                            route_id,
753                            action: "Restart (add)".into(),
754                            error: e,
755                        });
756                        continue;
757                    }
758
759                    if should_start_after_restart(runtime_status.as_deref()) {
760                        let start_result = controller
761                            .execute_runtime_command(camel_api::RuntimeCommand::StartRoute {
762                                route_id: route_id.clone(),
763                                command_id: next_reload_command_id("restart-start", &route_id),
764                                causation_id: None,
765                            })
766                            .await;
767                        if let Err(e) = start_result {
768                            errors.push(ReloadError {
769                                route_id,
770                                action: "Restart (start)".into(),
771                                error: e,
772                            });
773                        } else {
774                            tracing::info!(
775                                route_id = %route_id,
776                                "hot-reload: route restarted successfully"
777                            );
778                        }
779                    } else {
780                        tracing::info!(
781                            route_id = %route_id,
782                            "hot-reload: restart applied while preserving stopped lifecycle state"
783                        );
784                    }
785                }
786            }
787
788            ReloadAction::Skip { route_id } => {
789                tracing::debug!(route_id = %route_id, "hot-reload: skipped unchanged route");
790            }
791        }
792    }
793
794    errors
795}
796
797#[cfg(test)]
798mod tests {
799    use super::*;
800    use crate::lifecycle::adapters::route_controller::DefaultRouteController;
801    use crate::shared::components::domain::Registry;
802    use camel_api::function::{FunctionDiff, FunctionId};
803    use camel_api::{Exchange, ExchangePatch, FunctionInvocationError, FunctionInvoker};
804    use std::sync::Arc;
805    use std::time::Duration;
806
807    fn make_controller() -> DefaultRouteController {
808        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
809        DefaultRouteController::new(
810            registry,
811            Arc::new(camel_api::NoopPlatformService::default()),
812        )
813    }
814
815    #[test]
816    fn helper_next_reload_command_id_increments_and_keeps_prefix() {
817        let one = next_reload_command_id("restart-stop", "r1");
818        let two = next_reload_command_id("restart-stop", "r1");
819        assert!(one.starts_with("reload:restart-stop:r1:"));
820        assert!(two.starts_with("reload:restart-stop:r1:"));
821        assert_ne!(one, two);
822    }
823
824    #[test]
825    fn helper_should_stop_before_mutation_respects_status() {
826        assert!(!should_stop_before_mutation(Some("Registered")));
827        assert!(!should_stop_before_mutation(Some("Stopped")));
828        assert!(should_stop_before_mutation(Some("Started")));
829        assert!(should_stop_before_mutation(None));
830    }
831
832    #[test]
833    fn helper_should_start_after_restart_respects_status() {
834        assert!(!should_start_after_restart(Some("Registered")));
835        assert!(!should_start_after_restart(Some("Stopped")));
836        assert!(should_start_after_restart(Some("Started")));
837        assert!(should_start_after_restart(None));
838    }
839
840    #[test]
841    fn helper_invalid_stop_transition_detects_marker() {
842        let err = CamelError::RouteError("invalid transition: Started -> Started".into());
843        assert!(is_invalid_stop_transition(&err));
844
845        let other = CamelError::RouteError("route missing".into());
846        assert!(!is_invalid_stop_transition(&other));
847    }
848
849    #[test]
850    fn test_new_route_detected_as_add() {
851        let controller = make_controller();
852        let defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("new-route")];
853        let actions = compute_reload_actions(&defs, &controller);
854        assert_eq!(
855            actions,
856            vec![ReloadAction::Add {
857                route_id: "new-route".into()
858            }]
859        );
860    }
861
862    #[tokio::test]
863    async fn test_removed_route_detected() {
864        let mut controller = make_controller();
865        let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("old-route");
866        controller.add_route(def).await.unwrap();
867
868        let actions = compute_reload_actions(&[], &controller);
869        assert_eq!(
870            actions,
871            vec![ReloadAction::Remove {
872                route_id: "old-route".into()
873            }]
874        );
875    }
876
877    #[tokio::test]
878    async fn test_same_from_uri_detected_as_swap() {
879        let mut controller = make_controller();
880        let def = RouteDefinition::new("timer:tick", vec![])
881            .with_route_id("my-route")
882            .with_source_hash(100);
883        controller.add_route(def).await.unwrap();
884
885        let new_defs = vec![
886            RouteDefinition::new("timer:tick", vec![])
887                .with_route_id("my-route")
888                .with_source_hash(200),
889        ];
890        let actions = compute_reload_actions(&new_defs, &controller);
891        assert_eq!(
892            actions,
893            vec![ReloadAction::Swap {
894                route_id: "my-route".into()
895            }]
896        );
897    }
898
899    #[tokio::test]
900    async fn test_changed_from_uri_detected_as_restart() {
901        let mut controller = make_controller();
902        let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
903        controller.add_route(def).await.unwrap();
904
905        let new_defs =
906            vec![RouteDefinition::new("timer:tock?period=500", vec![]).with_route_id("my-route")];
907        let actions = compute_reload_actions(&new_defs, &controller);
908        assert_eq!(
909            actions,
910            vec![ReloadAction::Restart {
911                route_id: "my-route".into()
912            }]
913        );
914    }
915
916    #[tokio::test]
917    async fn test_runtime_snapshot_drives_remove_set() {
918        let mut controller = make_controller();
919        controller
920            .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("runtime-route"))
921            .await
922            .unwrap();
923        controller
924            .add_route(RouteDefinition::new("timer:ghost", vec![]).with_route_id("ghost-route"))
925            .await
926            .unwrap();
927
928        let runtime_ids = vec!["runtime-route".to_string()];
929        let actions =
930            compute_reload_actions_from_runtime_snapshot(&[], &runtime_ids, &|_id: &str| None);
931        assert_eq!(
932            actions,
933            vec![ReloadAction::Remove {
934                route_id: "runtime-route".into()
935            }]
936        );
937    }
938
939    #[test]
940    fn test_runtime_snapshot_existing_routes_map_to_restart() {
941        let defs = vec![
942            RouteDefinition::new("timer:tick", vec![])
943                .with_route_id("runtime-r1")
944                .with_source_hash(10),
945            RouteDefinition::new("timer:tock", vec![])
946                .with_route_id("runtime-r2")
947                .with_source_hash(20),
948        ];
949        let runtime_ids = vec!["runtime-r1".to_string(), "runtime-r2".to_string()];
950        let runtime_hashes = std::collections::HashMap::from([
951            ("runtime-r1".to_string(), 11u64),
952            ("runtime-r2".to_string(), 22u64),
953        ]);
954
955        let actions =
956            compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
957                runtime_hashes.get(id).copied()
958            });
959        assert_eq!(
960            actions,
961            vec![
962                ReloadAction::Restart {
963                    route_id: "runtime-r1".into()
964                },
965                ReloadAction::Restart {
966                    route_id: "runtime-r2".into()
967                }
968            ]
969        );
970    }
971
972    #[tokio::test]
973    async fn test_same_hash_detected_as_skip() {
974        let mut controller = make_controller();
975        let def = RouteDefinition::new("timer:tick", vec![])
976            .with_route_id("my-route")
977            .with_source_hash(42);
978        controller.add_route(def).await.unwrap();
979
980        let new_defs = vec![
981            RouteDefinition::new("timer:tick", vec![])
982                .with_route_id("my-route")
983                .with_source_hash(42),
984        ];
985        let actions = compute_reload_actions(&new_defs, &controller);
986        assert_eq!(
987            actions,
988            vec![ReloadAction::Skip {
989                route_id: "my-route".into()
990            }]
991        );
992    }
993
994    #[tokio::test]
995    async fn test_none_hash_detected_as_swap() {
996        let mut controller = make_controller();
997        let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
998        controller.add_route(def).await.unwrap();
999
1000        let new_defs = vec![
1001            RouteDefinition::new("timer:tick", vec![])
1002                .with_route_id("my-route")
1003                .with_source_hash(99),
1004        ];
1005        let actions = compute_reload_actions(&new_defs, &controller);
1006        assert_eq!(
1007            actions,
1008            vec![ReloadAction::Swap {
1009                route_id: "my-route".into()
1010            }]
1011        );
1012    }
1013
1014    #[test]
1015    fn test_runtime_snapshot_same_hash_detected_as_skip() {
1016        let defs = vec![
1017            RouteDefinition::new("timer:tick", vec![])
1018                .with_route_id("r1")
1019                .with_source_hash(42),
1020        ];
1021        let runtime_ids = vec!["r1".to_string()];
1022        let runtime_hashes = std::collections::HashMap::from([("r1".to_string(), 42u64)]);
1023
1024        let actions =
1025            compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
1026                runtime_hashes.get(id).copied()
1027            });
1028        assert_eq!(
1029            actions,
1030            vec![ReloadAction::Skip {
1031                route_id: "r1".into()
1032            }]
1033        );
1034    }
1035
1036    #[test]
1037    fn test_runtime_snapshot_mixed_actions_cover_all_decisions() {
1038        let defs = vec![
1039            RouteDefinition::new("timer:tick", vec![])
1040                .with_route_id("existing-same")
1041                .with_source_hash(10),
1042            RouteDefinition::new("timer:tock", vec![])
1043                .with_route_id("existing-diff")
1044                .with_source_hash(20),
1045            RouteDefinition::new("timer:new", vec![])
1046                .with_route_id("brand-new")
1047                .with_source_hash(30),
1048        ];
1049        let runtime_ids = vec![
1050            "existing-same".to_string(),
1051            "existing-diff".to_string(),
1052            "orphan".to_string(),
1053        ];
1054        let runtime_hashes = std::collections::HashMap::from([
1055            ("existing-same".to_string(), 10u64),
1056            ("existing-diff".to_string(), 999u64),
1057            ("orphan".to_string(), 77u64),
1058        ]);
1059
1060        let actions =
1061            compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
1062                runtime_hashes.get(id).copied()
1063            });
1064
1065        assert_eq!(
1066            actions,
1067            vec![
1068                ReloadAction::Skip {
1069                    route_id: "existing-same".into()
1070                },
1071                ReloadAction::Restart {
1072                    route_id: "existing-diff".into()
1073                },
1074                ReloadAction::Add {
1075                    route_id: "brand-new".into()
1076                },
1077                ReloadAction::Remove {
1078                    route_id: "orphan".into()
1079                }
1080            ]
1081        );
1082    }
1083
1084    #[test]
1085    fn test_runtime_snapshot_missing_runtime_hash_for_existing_route_restarts() {
1086        let defs = vec![
1087            RouteDefinition::new("timer:tick", vec![])
1088                .with_route_id("r1")
1089                .with_source_hash(42),
1090        ];
1091        let runtime_ids = vec!["r1".to_string()];
1092
1093        let actions =
1094            compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|_id: &str| None);
1095
1096        assert_eq!(
1097            actions,
1098            vec![ReloadAction::Restart {
1099                route_id: "r1".into()
1100            }]
1101        );
1102    }
1103
1104    #[test]
1105    fn test_runtime_snapshot_missing_new_hash_for_existing_route_restarts() {
1106        let defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("r1")];
1107        let runtime_ids = vec!["r1".to_string()];
1108        let runtime_hashes = std::collections::HashMap::from([("r1".to_string(), 42u64)]);
1109
1110        let actions =
1111            compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
1112                runtime_hashes.get(id).copied()
1113            });
1114
1115        assert_eq!(
1116            actions,
1117            vec![ReloadAction::Restart {
1118                route_id: "r1".into()
1119            }]
1120        );
1121    }
1122
1123    #[test]
1124    fn test_runtime_snapshot_new_only_route_maps_to_add() {
1125        let defs = vec![
1126            RouteDefinition::new("timer:tick", vec![])
1127                .with_route_id("new-only")
1128                .with_source_hash(1),
1129        ];
1130        let runtime_ids: Vec<String> = vec![];
1131
1132        let actions =
1133            compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|_id: &str| None);
1134
1135        assert_eq!(
1136            actions,
1137            vec![ReloadAction::Add {
1138                route_id: "new-only".into()
1139            }]
1140        );
1141    }
1142
1143    // ---- execute_reload_actions tests ----
1144    // These use a full CamelContext with real components so that start/stop work.
1145
1146    #[tokio::test]
1147    async fn test_execute_add_action_inserts_route() {
1148        use crate::CamelContext;
1149        use camel_component_timer::TimerComponent;
1150
1151        let mut ctx = CamelContext::builder().build().await.unwrap();
1152        ctx.register_component(TimerComponent::new());
1153        ctx.start().await.unwrap();
1154
1155        let def = RouteDefinition::new("timer:tick?period=50&repeatCount=1", vec![])
1156            .with_route_id("exec-add-test");
1157        let actions = vec![ReloadAction::Add {
1158            route_id: "exec-add-test".into(),
1159        }];
1160        let errors = execute_reload_actions(
1161            actions,
1162            vec![def],
1163            &ctx.runtime_execution_handle(),
1164            Duration::from_secs(10),
1165            None,
1166        )
1167        .await;
1168        assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1169
1170        assert_eq!(
1171            ctx.runtime_execution_handle()
1172                .controller_route_count_for_test()
1173                .await,
1174            1
1175        );
1176
1177        ctx.stop().await.unwrap();
1178    }
1179
1180    #[tokio::test]
1181    async fn test_execute_remove_action_deletes_route() {
1182        use crate::CamelContext;
1183        use camel_component_timer::TimerComponent;
1184
1185        let mut ctx = CamelContext::builder().build().await.unwrap();
1186        ctx.register_component(TimerComponent::new());
1187        ctx.start().await.unwrap();
1188
1189        // Add route through context so runtime aggregate/projection are seeded.
1190        let def =
1191            RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-remove-test");
1192        ctx.add_route_definition(def).await.unwrap();
1193        assert_eq!(
1194            ctx.runtime_execution_handle()
1195                .controller_route_count_for_test()
1196                .await,
1197            1
1198        );
1199
1200        let actions = vec![ReloadAction::Remove {
1201            route_id: "exec-remove-test".into(),
1202        }];
1203        let errors = execute_reload_actions(
1204            actions,
1205            vec![],
1206            &ctx.runtime_execution_handle(),
1207            Duration::from_secs(10),
1208            None,
1209        )
1210        .await;
1211        assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1212
1213        assert_eq!(
1214            ctx.runtime_execution_handle()
1215                .controller_route_count_for_test()
1216                .await,
1217            0
1218        );
1219
1220        ctx.stop().await.unwrap();
1221    }
1222
1223    #[tokio::test]
1224    async fn test_execute_swap_action_replaces_pipeline() {
1225        use crate::CamelContext;
1226        use camel_component_timer::TimerComponent;
1227
1228        let mut ctx = CamelContext::builder().build().await.unwrap();
1229        ctx.register_component(TimerComponent::new());
1230        ctx.start().await.unwrap();
1231
1232        // Add route through context so runtime aggregate/projection are seeded.
1233        let def =
1234            RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
1235        ctx.add_route_definition(def).await.unwrap();
1236
1237        // Swap with same from_uri (exercises compile + swap_pipeline code path)
1238        let new_def =
1239            RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
1240        let actions = vec![ReloadAction::Swap {
1241            route_id: "exec-swap-test".into(),
1242        }];
1243        let errors = execute_reload_actions(
1244            actions,
1245            vec![new_def],
1246            &ctx.runtime_execution_handle(),
1247            Duration::from_secs(10),
1248            None,
1249        )
1250        .await;
1251        assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1252
1253        // Route should still exist after swap
1254        assert_eq!(
1255            ctx.runtime_execution_handle()
1256                .controller_route_count_for_test()
1257                .await,
1258            1
1259        );
1260
1261        ctx.stop().await.unwrap();
1262    }
1263
1264    #[tokio::test]
1265    async fn test_execute_restart_action_preserves_registered_lifecycle_state() {
1266        use crate::CamelContext;
1267        use camel_api::{RuntimeQuery, RuntimeQueryResult};
1268        use camel_component_timer::TimerComponent;
1269
1270        let mut ctx = CamelContext::builder().build().await.unwrap();
1271        ctx.register_component(TimerComponent::new());
1272        ctx.start().await.unwrap();
1273
1274        // Add route through context so runtime aggregate/projection are seeded.
1275        let initial = RouteDefinition::new("timer:tick?period=100", vec![])
1276            .with_route_id("exec-restart-test");
1277        ctx.add_route_definition(initial).await.unwrap();
1278
1279        // Route is seeded as Registered by context registration.
1280        let before = ctx
1281            .runtime()
1282            .ask(RuntimeQuery::GetRouteStatus {
1283                route_id: "exec-restart-test".into(),
1284            })
1285            .await
1286            .unwrap();
1287        match before {
1288            RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
1289            other => panic!("unexpected query result: {other:?}"),
1290        }
1291
1292        let replacement = RouteDefinition::new("timer:tick?period=250", vec![])
1293            .with_route_id("exec-restart-test");
1294        let actions = vec![ReloadAction::Restart {
1295            route_id: "exec-restart-test".into(),
1296        }];
1297        let errors = execute_reload_actions(
1298            actions,
1299            vec![replacement],
1300            &ctx.runtime_execution_handle(),
1301            Duration::from_secs(10),
1302            None,
1303        )
1304        .await;
1305        assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1306
1307        // Restart re-adds through RuntimeExecutionHandle::add_route_definition,
1308        // which now goes through InternalRuntimeCommandBus and preserves Registered state.
1309        let after = ctx
1310            .runtime()
1311            .ask(RuntimeQuery::GetRouteStatus {
1312                route_id: "exec-restart-test".into(),
1313            })
1314            .await
1315            .unwrap();
1316        match after {
1317            RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
1318            other => panic!("unexpected query result: {other:?}"),
1319        }
1320
1321        assert_eq!(
1322            ctx.runtime_route_status("exec-restart-test").await.unwrap(),
1323            Some("Registered".to_string())
1324        );
1325
1326        ctx.stop().await.unwrap();
1327    }
1328
1329    #[tokio::test]
1330    async fn test_execute_swap_action_missing_definition_returns_error() {
1331        use crate::CamelContext;
1332
1333        let ctx = CamelContext::builder().build().await.unwrap();
1334        let errors = execute_reload_actions(
1335            vec![ReloadAction::Swap {
1336                route_id: "missing-swap-def".into(),
1337            }],
1338            vec![],
1339            &ctx.runtime_execution_handle(),
1340            Duration::from_millis(1),
1341            None,
1342        )
1343        .await;
1344
1345        assert_eq!(errors.len(), 1);
1346        assert_eq!(errors[0].action, "Swap");
1347        assert_eq!(errors[0].route_id, "missing-swap-def");
1348    }
1349
1350    #[tokio::test]
1351    async fn test_execute_add_action_missing_definition_returns_error() {
1352        use crate::CamelContext;
1353
1354        let ctx = CamelContext::builder().build().await.unwrap();
1355        let errors = execute_reload_actions(
1356            vec![ReloadAction::Add {
1357                route_id: "missing-add-def".into(),
1358            }],
1359            vec![],
1360            &ctx.runtime_execution_handle(),
1361            Duration::from_millis(1),
1362            None,
1363        )
1364        .await;
1365
1366        assert_eq!(errors.len(), 1);
1367        assert_eq!(errors[0].action, "Add");
1368        assert_eq!(errors[0].route_id, "missing-add-def");
1369    }
1370
1371    #[tokio::test]
1372    async fn test_execute_remove_action_status_error_returns_error() {
1373        use crate::CamelContext;
1374
1375        let ctx = CamelContext::builder().build().await.unwrap();
1376        let errors = execute_reload_actions(
1377            vec![ReloadAction::Remove {
1378                route_id: "missing-remove-route".into(),
1379            }],
1380            vec![],
1381            &ctx.runtime_execution_handle(),
1382            Duration::from_millis(1),
1383            None,
1384        )
1385        .await;
1386
1387        assert_eq!(errors.len(), 1);
1388        assert!(errors[0].action.starts_with("Remove"));
1389        assert_eq!(errors[0].route_id, "missing-remove-route");
1390    }
1391
1392    #[tokio::test]
1393    async fn test_execute_restart_action_missing_definition_returns_error() {
1394        use crate::CamelContext;
1395
1396        let ctx = CamelContext::builder().build().await.unwrap();
1397        let errors = execute_reload_actions(
1398            vec![ReloadAction::Restart {
1399                route_id: "missing-restart-def".into(),
1400            }],
1401            vec![],
1402            &ctx.runtime_execution_handle(),
1403            Duration::from_millis(1),
1404            None,
1405        )
1406        .await;
1407
1408        assert_eq!(errors.len(), 1);
1409        assert_eq!(errors[0].action, "Restart");
1410        assert_eq!(errors[0].route_id, "missing-restart-def");
1411    }
1412
1413    #[tokio::test]
1414    async fn test_execute_skip_action_returns_no_errors() {
1415        use crate::CamelContext;
1416
1417        let ctx = CamelContext::builder().build().await.unwrap();
1418        let errors = execute_reload_actions(
1419            vec![ReloadAction::Skip {
1420                route_id: "skip-only-route".into(),
1421            }],
1422            vec![],
1423            &ctx.runtime_execution_handle(),
1424            Duration::from_millis(1),
1425            None,
1426        )
1427        .await;
1428
1429        assert!(errors.is_empty());
1430    }
1431
1432    #[test]
1433    fn compute_function_diff_all_added() {
1434        use camel_api::function::PrepareToken;
1435        use camel_api::{FunctionDefinition, FunctionInvokerSync};
1436        use std::sync::Mutex;
1437
1438        struct TestInvoker {
1439            staged: Mutex<Vec<(FunctionDefinition, Option<String>)>>,
1440        }
1441        impl FunctionInvokerSync for TestInvoker {
1442            fn stage_pending(&self, _def: FunctionDefinition, _route_id: Option<&str>, _gen: u64) {}
1443            fn discard_staging(&self, _generation: u64) {}
1444            fn begin_reload(&self) -> u64 {
1445                0
1446            }
1447            fn function_refs_for_route(
1448                &self,
1449                _route_id: &str,
1450            ) -> Vec<(FunctionId, Option<String>)> {
1451                vec![]
1452            }
1453            fn staged_refs_for_route(
1454                &self,
1455                _route_id: &str,
1456                _generation: u64,
1457            ) -> Vec<(FunctionId, Option<String>)> {
1458                vec![]
1459            }
1460            fn staged_defs_for_route(
1461                &self,
1462                _route_id: &str,
1463                _generation: u64,
1464            ) -> Vec<(FunctionDefinition, Option<String>)> {
1465                self.staged.lock().unwrap().clone()
1466            }
1467        }
1468        #[async_trait::async_trait]
1469        impl FunctionInvoker for TestInvoker {
1470            async fn register(
1471                &self,
1472                _def: FunctionDefinition,
1473                _route_id: Option<&str>,
1474            ) -> Result<(), FunctionInvocationError> {
1475                Ok(())
1476            }
1477            async fn unregister(
1478                &self,
1479                _id: &FunctionId,
1480                _route_id: Option<&str>,
1481            ) -> Result<(), FunctionInvocationError> {
1482                Ok(())
1483            }
1484            async fn invoke(
1485                &self,
1486                _id: &FunctionId,
1487                _exchange: &Exchange,
1488            ) -> Result<ExchangePatch, FunctionInvocationError> {
1489                Ok(ExchangePatch::default())
1490            }
1491            async fn prepare_reload(
1492                &self,
1493                _diff: FunctionDiff,
1494                _generation: u64,
1495            ) -> Result<PrepareToken, FunctionInvocationError> {
1496                Ok(PrepareToken::default())
1497            }
1498            async fn finalize_reload(
1499                &self,
1500                _diff: &FunctionDiff,
1501                _generation: u64,
1502            ) -> Result<(), FunctionInvocationError> {
1503                Ok(())
1504            }
1505            async fn rollback_reload(
1506                &self,
1507                _token: PrepareToken,
1508                _generation: u64,
1509            ) -> Result<(), FunctionInvocationError> {
1510                Ok(())
1511            }
1512            async fn commit_reload(
1513                &self,
1514                _diff: FunctionDiff,
1515                _generation: u64,
1516            ) -> Result<(), FunctionInvocationError> {
1517                Ok(())
1518            }
1519            async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
1520                Ok(())
1521            }
1522        }
1523
1524        let staged_def = FunctionDefinition {
1525            id: FunctionId::compute("deno", "fn1", 5000),
1526            runtime: "deno".into(),
1527            source: "fn1".into(),
1528            timeout_ms: 5000,
1529            route_id: Some("route-a".into()),
1530            step_index: Some(0),
1531        };
1532        let invoker: Arc<dyn FunctionInvoker> = Arc::new(TestInvoker {
1533            staged: Mutex::new(vec![(staged_def.clone(), Some("route-a".into()))]),
1534        });
1535
1536        let diff = compute_function_diff_for_route(&invoker, "route-a", 0);
1537        assert_eq!(diff.added.len(), 1);
1538        assert_eq!(diff.removed.len(), 0);
1539        assert_eq!(diff.unchanged.len(), 0);
1540    }
1541
1542    #[test]
1543    fn compute_function_diff_all_removed() {
1544        use camel_api::function::PrepareToken;
1545        use camel_api::{FunctionDefinition, FunctionInvokerSync};
1546
1547        struct TestInvoker;
1548        impl FunctionInvokerSync for TestInvoker {
1549            fn stage_pending(&self, _def: FunctionDefinition, _route_id: Option<&str>, _gen: u64) {}
1550            fn discard_staging(&self, _generation: u64) {}
1551            fn begin_reload(&self) -> u64 {
1552                0
1553            }
1554            fn function_refs_for_route(
1555                &self,
1556                _route_id: &str,
1557            ) -> Vec<(FunctionId, Option<String>)> {
1558                vec![(
1559                    FunctionId::compute("deno", "old-fn", 5000),
1560                    Some("route-b".into()),
1561                )]
1562            }
1563            fn staged_refs_for_route(
1564                &self,
1565                _route_id: &str,
1566                _generation: u64,
1567            ) -> Vec<(FunctionId, Option<String>)> {
1568                vec![]
1569            }
1570            fn staged_defs_for_route(
1571                &self,
1572                _route_id: &str,
1573                _generation: u64,
1574            ) -> Vec<(FunctionDefinition, Option<String>)> {
1575                vec![]
1576            }
1577        }
1578        #[async_trait::async_trait]
1579        impl FunctionInvoker for TestInvoker {
1580            async fn register(
1581                &self,
1582                _def: FunctionDefinition,
1583                _route_id: Option<&str>,
1584            ) -> Result<(), FunctionInvocationError> {
1585                Ok(())
1586            }
1587            async fn unregister(
1588                &self,
1589                _id: &FunctionId,
1590                _route_id: Option<&str>,
1591            ) -> Result<(), FunctionInvocationError> {
1592                Ok(())
1593            }
1594            async fn invoke(
1595                &self,
1596                _id: &FunctionId,
1597                _exchange: &Exchange,
1598            ) -> Result<ExchangePatch, FunctionInvocationError> {
1599                Ok(ExchangePatch::default())
1600            }
1601            async fn prepare_reload(
1602                &self,
1603                _diff: FunctionDiff,
1604                _generation: u64,
1605            ) -> Result<PrepareToken, FunctionInvocationError> {
1606                Ok(PrepareToken::default())
1607            }
1608            async fn finalize_reload(
1609                &self,
1610                _diff: &FunctionDiff,
1611                _generation: u64,
1612            ) -> Result<(), FunctionInvocationError> {
1613                Ok(())
1614            }
1615            async fn rollback_reload(
1616                &self,
1617                _token: PrepareToken,
1618                _generation: u64,
1619            ) -> Result<(), FunctionInvocationError> {
1620                Ok(())
1621            }
1622            async fn commit_reload(
1623                &self,
1624                _diff: FunctionDiff,
1625                _generation: u64,
1626            ) -> Result<(), FunctionInvocationError> {
1627                Ok(())
1628            }
1629            async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
1630                Ok(())
1631            }
1632        }
1633
1634        let invoker: Arc<dyn FunctionInvoker> = Arc::new(TestInvoker);
1635        let diff = compute_function_diff_for_route(&invoker, "route-b", 0);
1636        assert_eq!(diff.added.len(), 0);
1637        assert_eq!(diff.removed.len(), 1);
1638        assert_eq!(diff.unchanged.len(), 0);
1639    }
1640
1641    #[test]
1642    fn compute_function_diff_unchanged() {
1643        use camel_api::function::PrepareToken;
1644        use camel_api::{FunctionDefinition, FunctionInvokerSync};
1645
1646        let fn_id = FunctionId::compute("deno", "same-fn", 5000);
1647        let pair = (fn_id.clone(), Some("route-c".into()));
1648
1649        struct TestInvoker {
1650            pair: (FunctionId, Option<String>),
1651        }
1652        impl FunctionInvokerSync for TestInvoker {
1653            fn stage_pending(&self, _def: FunctionDefinition, _route_id: Option<&str>, _gen: u64) {}
1654            fn discard_staging(&self, _generation: u64) {}
1655            fn begin_reload(&self) -> u64 {
1656                0
1657            }
1658            fn function_refs_for_route(
1659                &self,
1660                _route_id: &str,
1661            ) -> Vec<(FunctionId, Option<String>)> {
1662                vec![self.pair.clone()]
1663            }
1664            fn staged_refs_for_route(
1665                &self,
1666                _route_id: &str,
1667                _generation: u64,
1668            ) -> Vec<(FunctionId, Option<String>)> {
1669                vec![self.pair.clone()]
1670            }
1671            fn staged_defs_for_route(
1672                &self,
1673                _route_id: &str,
1674                _generation: u64,
1675            ) -> Vec<(FunctionDefinition, Option<String>)> {
1676                vec![(
1677                    FunctionDefinition {
1678                        id: self.pair.0.clone(),
1679                        runtime: "deno".into(),
1680                        source: "same".into(),
1681                        timeout_ms: 5000,
1682                        route_id: Some("route-c".into()),
1683                        step_index: Some(0),
1684                    },
1685                    self.pair.1.clone(),
1686                )]
1687            }
1688        }
1689        #[async_trait::async_trait]
1690        impl FunctionInvoker for TestInvoker {
1691            async fn register(
1692                &self,
1693                _def: FunctionDefinition,
1694                _route_id: Option<&str>,
1695            ) -> Result<(), FunctionInvocationError> {
1696                Ok(())
1697            }
1698            async fn unregister(
1699                &self,
1700                _id: &FunctionId,
1701                _route_id: Option<&str>,
1702            ) -> Result<(), FunctionInvocationError> {
1703                Ok(())
1704            }
1705            async fn invoke(
1706                &self,
1707                _id: &FunctionId,
1708                _exchange: &Exchange,
1709            ) -> Result<ExchangePatch, FunctionInvocationError> {
1710                Ok(ExchangePatch::default())
1711            }
1712            async fn prepare_reload(
1713                &self,
1714                _diff: FunctionDiff,
1715                _generation: u64,
1716            ) -> Result<PrepareToken, FunctionInvocationError> {
1717                Ok(PrepareToken::default())
1718            }
1719            async fn finalize_reload(
1720                &self,
1721                _diff: &FunctionDiff,
1722                _generation: u64,
1723            ) -> Result<(), FunctionInvocationError> {
1724                Ok(())
1725            }
1726            async fn rollback_reload(
1727                &self,
1728                _token: PrepareToken,
1729                _generation: u64,
1730            ) -> Result<(), FunctionInvocationError> {
1731                Ok(())
1732            }
1733            async fn commit_reload(
1734                &self,
1735                _diff: FunctionDiff,
1736                _generation: u64,
1737            ) -> Result<(), FunctionInvocationError> {
1738                Ok(())
1739            }
1740            async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
1741                Ok(())
1742            }
1743        }
1744
1745        let invoker: Arc<dyn FunctionInvoker> = Arc::new(TestInvoker { pair });
1746        let diff = compute_function_diff_for_route(&invoker, "route-c", 0);
1747        assert_eq!(diff.added.len(), 0);
1748        assert_eq!(diff.removed.len(), 0);
1749        assert_eq!(diff.unchanged.len(), 1);
1750    }
1751
1752    #[test]
1753    fn reload_error_debug_format() {
1754        let err = ReloadError {
1755            route_id: "r1".into(),
1756            action: "Swap".into(),
1757            error: CamelError::RouteError("test error".into()),
1758        };
1759        let debug = format!("{:?}", err);
1760        assert!(debug.contains("r1"));
1761        assert!(debug.contains("Swap"));
1762    }
1763}