Skip to main content

camel_core/hot_reload/application/
reload.rs

1//! Route reload coordinator.
2//!
3//! Compares a new set of route definitions against the currently running routes
4//! and computes the minimal set of actions: SWAP, RESTART, ADD, REMOVE, or SKIP.
5
6use camel_api::CamelError;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::Duration;
9
10use crate::context::RuntimeExecutionHandle;
11use crate::hot_reload::application::drain::drain_route;
12use crate::hot_reload::domain::ReloadAction;
13#[cfg(test)]
14use crate::lifecycle::adapters::route_controller::DefaultRouteController;
15use crate::lifecycle::application::route_definition::RouteDefinition;
16
17pub struct FunctionReloadContext {
18    pub invoker: std::sync::Arc<dyn camel_api::function::FunctionInvoker>,
19    pub generation: u64,
20}
21
22fn compute_function_diff_for_route(
23    invoker: &std::sync::Arc<dyn camel_api::function::FunctionInvoker>,
24    route_id: &str,
25    generation: u64,
26) -> camel_api::function::FunctionDiff {
27    let current_refs: std::collections::HashSet<_> = invoker
28        .function_refs_for_route(route_id)
29        .into_iter()
30        .collect();
31    let staged_defs = invoker.staged_defs_for_route(route_id, generation);
32    let staged_refs: std::collections::HashSet<_> = staged_defs
33        .iter()
34        .map(|(def, rid)| (def.id.clone(), rid.clone()))
35        .collect();
36
37    let removed: Vec<_> = current_refs.difference(&staged_refs).cloned().collect();
38    let added_refs: std::collections::HashSet<_> =
39        staged_refs.difference(&current_refs).cloned().collect();
40    let added: Vec<_> = staged_defs
41        .into_iter()
42        .filter(|(def, rid)| added_refs.contains(&(def.id.clone(), rid.clone())))
43        .collect();
44    let unchanged: Vec<_> = current_refs
45        .intersection(&staged_refs)
46        .map(|(id, _)| id.clone())
47        .collect();
48
49    camel_api::function::FunctionDiff {
50        added,
51        removed,
52        unchanged,
53    }
54}
55
56static RELOAD_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
57
58fn next_reload_command_id(op: &str, route_id: &str) -> String {
59    let seq = RELOAD_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
60    format!("reload:{op}:{route_id}:{seq}")
61}
62
63fn is_invalid_stop_transition(err: &CamelError) -> bool {
64    err.to_string().contains("invalid transition")
65}
66
67fn should_stop_before_mutation(runtime_status: Option<&str>) -> bool {
68    !matches!(runtime_status, Some("Registered" | "Stopped"))
69}
70
71fn should_start_after_restart(runtime_status: Option<&str>) -> bool {
72    !matches!(runtime_status, Some("Registered" | "Stopped"))
73}
74
75/// A non-fatal error during reload action execution.
76///
77/// The watcher logs these and continues watching for future changes.
78#[derive(Debug)]
79pub struct ReloadError {
80    pub route_id: String,
81    pub action: String,
82    pub error: CamelError,
83}
84
85/// Compute the diff between new definitions and active routes.
86#[cfg(test)]
87fn compute_reload_actions(
88    new_definitions: &[RouteDefinition],
89    controller: &DefaultRouteController,
90) -> Vec<ReloadAction> {
91    let active_ids: std::collections::HashSet<String> =
92        controller.route_ids().into_iter().collect();
93    let mut new_ids = std::collections::HashSet::new();
94    let mut actions = Vec::new();
95
96    for def in new_definitions {
97        let route_id = def.route_id().to_string();
98        new_ids.insert(route_id.clone());
99
100        if active_ids.contains(&route_id) {
101            // Route exists — check what changed
102            if let Some(from_uri) = controller.route_from_uri(&route_id) {
103                if from_uri != def.from_uri() {
104                    actions.push(ReloadAction::Restart { route_id });
105                } else {
106                    let existing_hash = controller.route_source_hash(&route_id);
107                    let new_hash = def.source_hash();
108                    match (existing_hash, new_hash) {
109                        (Some(h_existing), Some(h_new)) if h_existing == h_new => {
110                            actions.push(ReloadAction::Skip { route_id });
111                        }
112                        _ => {
113                            actions.push(ReloadAction::Swap { route_id });
114                        }
115                    }
116                }
117            }
118        } else {
119            actions.push(ReloadAction::Add { route_id });
120        }
121    }
122
123    // Routes in active but not in new definitions → remove
124    for id in &active_ids {
125        if !new_ids.contains(id) {
126            actions.push(ReloadAction::Remove {
127                route_id: id.clone(),
128            });
129        }
130    }
131
132    actions
133}
134
135/// Compute reload actions using runtime projection route IDs as primary source.
136///
137/// This variant is used by the file watcher hard-cut path where runtime projection
138/// is authoritative for route existence.
139pub(crate) fn compute_reload_actions_from_runtime_snapshot(
140    new_definitions: &[RouteDefinition],
141    runtime_route_ids: &[String],
142    runtime_source_hash: &dyn Fn(&str) -> Option<u64>,
143) -> Vec<ReloadAction> {
144    let active_ids: std::collections::HashSet<String> = runtime_route_ids.iter().cloned().collect();
145    let mut new_ids = std::collections::HashSet::new();
146    let mut actions = Vec::new();
147
148    for def in new_definitions {
149        let route_id = def.route_id().to_string();
150        new_ids.insert(route_id.clone());
151
152        if active_ids.contains(&route_id) {
153            let existing_hash = runtime_source_hash(&route_id);
154            let new_hash = def.source_hash();
155            match (existing_hash, new_hash) {
156                (Some(h_existing), Some(h_new)) if h_existing == h_new => {
157                    actions.push(ReloadAction::Skip { route_id });
158                }
159                _ => {
160                    actions.push(ReloadAction::Restart { route_id });
161                }
162            }
163        } else {
164            actions.push(ReloadAction::Add { route_id });
165        }
166    }
167
168    for id in &active_ids {
169        if !new_ids.contains(id) {
170            actions.push(ReloadAction::Remove {
171                route_id: id.clone(),
172            });
173        }
174    }
175
176    actions
177}
178
179/// Execute a list of reload actions against a live controller.
180///
181/// Non-fatal: errors for individual routes are collected and returned.
182/// The caller should log them as warnings and continue watching.
183///
184/// `new_definitions` is consumed — each definition is moved to the controller for Add/Swap/Restart.
185pub async fn execute_reload_actions(
186    actions: Vec<ReloadAction>,
187    mut new_definitions: Vec<RouteDefinition>,
188    controller: &RuntimeExecutionHandle,
189    drain_timeout: Duration,
190    function_ctx: Option<&FunctionReloadContext>,
191) -> Vec<ReloadError> {
192    let mut errors = Vec::new();
193
194    for action in actions {
195        match action {
196            ReloadAction::Swap { route_id } => {
197                let def_pos = new_definitions
198                    .iter()
199                    .position(|d| d.route_id() == route_id);
200                let def = match def_pos {
201                    Some(pos) => new_definitions.remove(pos),
202                    None => {
203                        errors.push(ReloadError {
204                            route_id: route_id.clone(),
205                            action: "Swap".into(),
206                            error: CamelError::RouteError(format!(
207                                "No definition found for route '{}'",
208                                route_id
209                            )),
210                        });
211                        continue;
212                    }
213                };
214
215                let in_flight = controller.in_flight_count(&route_id).await.unwrap_or(0);
216
217                let pipeline = if let Some(ctx) = function_ctx {
218                    controller
219                        .compile_route_definition_with_generation(def, ctx.generation)
220                        .await
221                } else {
222                    controller.compile_route_definition(def).await
223                };
224
225                match pipeline {
226                    Ok(p) => {
227                        let prepare_token = if let Some(ctx) = function_ctx {
228                            let diff = compute_function_diff_for_route(
229                                &ctx.invoker,
230                                &route_id,
231                                ctx.generation,
232                            );
233                            match ctx.invoker.prepare_reload(diff, ctx.generation).await {
234                                Ok(token) => Some(token),
235                                Err(e) => {
236                                    errors.push(ReloadError {
237                                        route_id: route_id.clone(),
238                                        action: "Swap (prepare)".into(),
239                                        error: CamelError::ProcessorError(format!("{e}")),
240                                    });
241                                    continue;
242                                }
243                            }
244                        } else {
245                            None
246                        };
247
248                        let result = controller.swap_route_pipeline(&route_id, p).await;
249                        if let Err(e) = result {
250                            if let Some(ctx) = function_ctx
251                                && let Some(ref token) = prepare_token
252                            {
253                                let _ = ctx
254                                    .invoker
255                                    .rollback_reload(token.clone(), ctx.generation)
256                                    .await;
257                            }
258                            errors.push(ReloadError {
259                                route_id,
260                                action: "Swap".into(),
261                                error: e,
262                            });
263                        } else {
264                            if let Some(ctx) = function_ctx {
265                                let diff = compute_function_diff_for_route(
266                                    &ctx.invoker,
267                                    &route_id,
268                                    ctx.generation,
269                                );
270                                if let Err(e) =
271                                    ctx.invoker.finalize_reload(&diff, ctx.generation).await
272                                {
273                                    errors.push(ReloadError {
274                                        route_id: route_id.clone(),
275                                        action: "Finalize".into(),
276                                        error: CamelError::ProcessorError(format!("{e}")),
277                                    });
278                                }
279                            }
280                            if in_flight > 0 {
281                                tracing::info!(
282                                    route_id = %route_id,
283                                    action = "swap",
284                                    in_flight = in_flight,
285                                    "hot-reload: swapped route pipeline ({} exchanges continuing with previous pipeline)",
286                                    in_flight
287                                );
288                            } else {
289                                tracing::info!(route_id = %route_id, "hot-reload: swapped route pipeline");
290                            }
291                        }
292                    }
293                    Err(e) => {
294                        errors.push(ReloadError {
295                            route_id,
296                            action: "Swap (compile)".into(),
297                            error: e,
298                        });
299                    }
300                }
301            }
302
303            ReloadAction::Add { route_id } => {
304                let def_pos = new_definitions
305                    .iter()
306                    .position(|d| d.route_id() == route_id);
307                let def = match def_pos {
308                    Some(pos) => new_definitions.remove(pos),
309                    None => {
310                        errors.push(ReloadError {
311                            route_id: route_id.clone(),
312                            action: "Add".into(),
313                            error: CamelError::RouteError(format!(
314                                "No definition found for route '{}'",
315                                route_id
316                            )),
317                        });
318                        continue;
319                    }
320                };
321
322                if let Some(ctx) = function_ctx {
323                    let prepared = match controller
324                        .prepare_route_definition_with_generation(def, ctx.generation)
325                        .await
326                    {
327                        Ok(p) => p,
328                        Err(e) => {
329                            ctx.invoker.discard_staging(ctx.generation);
330                            errors.push(ReloadError {
331                                route_id,
332                                action: "Add (prepare-route)".into(),
333                                error: e,
334                            });
335                            continue;
336                        }
337                    };
338
339                    let diff =
340                        compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
341
342                    let prepare_token = match ctx.invoker.prepare_reload(diff, ctx.generation).await
343                    {
344                        Ok(token) => token,
345                        Err(e) => {
346                            ctx.invoker.discard_staging(ctx.generation);
347                            errors.push(ReloadError {
348                                route_id,
349                                action: "Add (prepare)".into(),
350                                error: CamelError::ProcessorError(format!("{e}")),
351                            });
352                            continue;
353                        }
354                    };
355
356                    if let Err(e) = controller.insert_prepared_route(prepared).await {
357                        let _ = ctx
358                            .invoker
359                            .rollback_reload(prepare_token, ctx.generation)
360                            .await;
361                        errors.push(ReloadError {
362                            route_id,
363                            action: "Add (insert)".into(),
364                            error: e,
365                        });
366                        continue;
367                    }
368
369                    if let Err(e) = controller.register_route_aggregate(route_id.clone()).await {
370                        let _ = controller
371                            .remove_route_preserving_functions(route_id.clone())
372                            .await;
373                        let _ = ctx
374                            .invoker
375                            .rollback_reload(prepare_token, ctx.generation)
376                            .await;
377                        errors.push(ReloadError {
378                            route_id,
379                            action: "Add (aggregate)".into(),
380                            error: e,
381                        });
382                        continue;
383                    }
384
385                    let diff =
386                        compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
387                    if let Err(e) = ctx.invoker.finalize_reload(&diff, ctx.generation).await {
388                        errors.push(ReloadError {
389                            route_id: route_id.clone(),
390                            action: "Finalize".into(),
391                            error: CamelError::ProcessorError(format!("{e}")),
392                        });
393                    }
394                } else {
395                    if let Err(e) = controller.add_route_definition(def).await {
396                        errors.push(ReloadError {
397                            route_id,
398                            action: "Add".into(),
399                            error: e,
400                        });
401                        continue;
402                    }
403                }
404
405                let start_result = controller
406                    .execute_runtime_command(camel_api::RuntimeCommand::StartRoute {
407                        route_id: route_id.clone(),
408                        command_id: next_reload_command_id("add-start", &route_id),
409                        causation_id: None,
410                    })
411                    .await;
412                if let Err(e) = start_result {
413                    errors.push(ReloadError {
414                        route_id,
415                        action: "Add (start)".into(),
416                        error: e,
417                    });
418                } else {
419                    tracing::info!(route_id = %route_id, "hot-reload: added and started route");
420                }
421            }
422
423            ReloadAction::Remove { route_id } => {
424                if let Some(ctx) = function_ctx {
425                    let diff =
426                        compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
427
428                    let runtime_status = match controller.runtime_route_status(&route_id).await {
429                        Ok(status) => status,
430                        Err(e) => {
431                            errors.push(ReloadError {
432                                route_id: route_id.clone(),
433                                action: "Remove (status)".into(),
434                                error: e,
435                            });
436                            continue;
437                        }
438                    };
439
440                    if should_stop_before_mutation(runtime_status.as_deref()) {
441                        let stop_result = controller
442                            .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
443                                route_id: route_id.clone(),
444                                command_id: next_reload_command_id("remove-stop", &route_id),
445                                causation_id: None,
446                            })
447                            .await;
448                        if let Err(e) = stop_result
449                            && !is_invalid_stop_transition(&e)
450                        {
451                            errors.push(ReloadError {
452                                route_id: route_id.clone(),
453                                action: "Remove (stop)".into(),
454                                error: e,
455                            });
456                            continue;
457                        }
458
459                        let _ = drain_route(&route_id, "remove", controller, drain_timeout).await;
460                    }
461
462                    if let Err(e) = controller
463                        .remove_route_preserving_functions(route_id.clone())
464                        .await
465                    {
466                        errors.push(ReloadError {
467                            route_id,
468                            action: "Remove".into(),
469                            error: e,
470                        });
471                        continue;
472                    }
473
474                    if let Err(e) = ctx.invoker.finalize_reload(&diff, ctx.generation).await {
475                        errors.push(ReloadError {
476                            route_id: route_id.clone(),
477                            action: "Finalize".into(),
478                            error: CamelError::ProcessorError(format!("{e}")),
479                        });
480                    }
481
482                    tracing::info!(route_id = %route_id, "hot-reload: stopped and removed route");
483                } else {
484                    let runtime_status = match controller.runtime_route_status(&route_id).await {
485                        Ok(status) => status,
486                        Err(e) => {
487                            errors.push(ReloadError {
488                                route_id,
489                                action: "Remove (status)".into(),
490                                error: e,
491                            });
492                            continue;
493                        }
494                    };
495
496                    if should_stop_before_mutation(runtime_status.as_deref()) {
497                        let stop_result = controller
498                            .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
499                                route_id: route_id.clone(),
500                                command_id: next_reload_command_id("remove-stop", &route_id),
501                                causation_id: None,
502                            })
503                            .await;
504                        if let Err(e) = stop_result
505                            && !is_invalid_stop_transition(&e)
506                        {
507                            errors.push(ReloadError {
508                                route_id: route_id.clone(),
509                                action: "Remove (stop)".into(),
510                                error: e,
511                            });
512                            continue;
513                        }
514
515                        let _ = drain_route(&route_id, "remove", controller, drain_timeout).await;
516                    }
517
518                    let remove_result = controller
519                        .execute_runtime_command(camel_api::RuntimeCommand::RemoveRoute {
520                            route_id: route_id.clone(),
521                            command_id: next_reload_command_id("remove", &route_id),
522                            causation_id: None,
523                        })
524                        .await;
525                    match remove_result {
526                        Ok(_) => {
527                            tracing::info!(route_id = %route_id, "hot-reload: stopped and removed route");
528                        }
529                        Err(e) => {
530                            errors.push(ReloadError {
531                                route_id,
532                                action: "Remove".into(),
533                                error: e,
534                            });
535                        }
536                    }
537                }
538            }
539
540            ReloadAction::Restart { route_id } => {
541                tracing::info!(route_id = %route_id, "hot-reload: restarting route (from_uri changed)");
542
543                let def_pos = new_definitions
544                    .iter()
545                    .position(|d| d.route_id() == route_id);
546                let def = match def_pos {
547                    Some(pos) => new_definitions.remove(pos),
548                    None => {
549                        errors.push(ReloadError {
550                            route_id: route_id.clone(),
551                            action: "Restart".into(),
552                            error: CamelError::RouteError(format!(
553                                "No definition found for route '{}'",
554                                route_id
555                            )),
556                        });
557                        continue;
558                    }
559                };
560
561                if let Some(ctx) = function_ctx {
562                    let prepared = match controller
563                        .prepare_route_definition_with_generation(def, ctx.generation)
564                        .await
565                    {
566                        Ok(p) => p,
567                        Err(e) => {
568                            errors.push(ReloadError {
569                                route_id,
570                                action: "Restart (prepare-route)".into(),
571                                error: e,
572                            });
573                            continue;
574                        }
575                    };
576
577                    let diff =
578                        compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
579                    let prepare_token = match ctx.invoker.prepare_reload(diff, ctx.generation).await
580                    {
581                        Ok(token) => token,
582                        Err(e) => {
583                            errors.push(ReloadError {
584                                route_id,
585                                action: "Restart (prepare)".into(),
586                                error: CamelError::ProcessorError(format!("{e}")),
587                            });
588                            continue;
589                        }
590                    };
591
592                    let runtime_status = match controller.runtime_route_status(&route_id).await {
593                        Ok(status) => status,
594                        Err(e) => {
595                            let _ = ctx
596                                .invoker
597                                .rollback_reload(prepare_token, ctx.generation)
598                                .await;
599                            errors.push(ReloadError {
600                                route_id,
601                                action: "Restart (status)".into(),
602                                error: e,
603                            });
604                            continue;
605                        }
606                    };
607
608                    if should_stop_before_mutation(runtime_status.as_deref()) {
609                        let stop_result = controller
610                            .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
611                                route_id: route_id.clone(),
612                                command_id: next_reload_command_id("restart-stop", &route_id),
613                                causation_id: None,
614                            })
615                            .await;
616                        if let Err(e) = stop_result
617                            && !is_invalid_stop_transition(&e)
618                        {
619                            let _ = ctx
620                                .invoker
621                                .rollback_reload(prepare_token, ctx.generation)
622                                .await;
623                            errors.push(ReloadError {
624                                route_id,
625                                action: "Restart (stop)".into(),
626                                error: e,
627                            });
628                            continue;
629                        }
630
631                        let _ = drain_route(&route_id, "restart", controller, drain_timeout).await;
632                    }
633
634                    if let Err(e) = controller
635                        .remove_route_preserving_functions(route_id.clone())
636                        .await
637                    {
638                        let _ = ctx
639                            .invoker
640                            .rollback_reload(prepare_token, ctx.generation)
641                            .await;
642                        errors.push(ReloadError {
643                            route_id,
644                            action: "Restart (remove)".into(),
645                            error: e,
646                        });
647                        continue;
648                    }
649
650                    if let Err(e) = controller.insert_prepared_route(prepared).await {
651                        let _ = ctx
652                            .invoker
653                            .rollback_reload(prepare_token, ctx.generation)
654                            .await;
655                        errors.push(ReloadError {
656                            route_id,
657                            action: "Restart (insert)".into(),
658                            error: e,
659                        });
660                        continue;
661                    }
662
663                    let diff =
664                        compute_function_diff_for_route(&ctx.invoker, &route_id, ctx.generation);
665                    if let Err(e) = ctx.invoker.finalize_reload(&diff, ctx.generation).await {
666                        errors.push(ReloadError {
667                            route_id: route_id.clone(),
668                            action: "Finalize".into(),
669                            error: CamelError::ProcessorError(format!("{e}")),
670                        });
671                    }
672
673                    if should_start_after_restart(runtime_status.as_deref()) {
674                        let start_result = controller
675                            .execute_runtime_command(camel_api::RuntimeCommand::StartRoute {
676                                route_id: route_id.clone(),
677                                command_id: next_reload_command_id("restart-start", &route_id),
678                                causation_id: None,
679                            })
680                            .await;
681                        if let Err(e) = start_result {
682                            errors.push(ReloadError {
683                                route_id,
684                                action: "Restart (start)".into(),
685                                error: e,
686                            });
687                        } else {
688                            tracing::info!(
689                                route_id = %route_id,
690                                "hot-reload: route restarted successfully"
691                            );
692                        }
693                    } else {
694                        tracing::info!(
695                            route_id = %route_id,
696                            "hot-reload: restart applied while preserving stopped lifecycle state"
697                        );
698                    }
699                } else {
700                    let runtime_status = match controller.runtime_route_status(&route_id).await {
701                        Ok(status) => status,
702                        Err(e) => {
703                            errors.push(ReloadError {
704                                route_id,
705                                action: "Restart (status)".into(),
706                                error: e,
707                            });
708                            continue;
709                        }
710                    };
711
712                    if should_stop_before_mutation(runtime_status.as_deref()) {
713                        let stop_result = controller
714                            .execute_runtime_command(camel_api::RuntimeCommand::StopRoute {
715                                route_id: route_id.clone(),
716                                command_id: next_reload_command_id("restart-stop", &route_id),
717                                causation_id: None,
718                            })
719                            .await;
720                        if let Err(e) = stop_result
721                            && !is_invalid_stop_transition(&e)
722                        {
723                            errors.push(ReloadError {
724                                route_id,
725                                action: "Restart (stop)".into(),
726                                error: e,
727                            });
728                            continue;
729                        }
730
731                        let _ = drain_route(&route_id, "restart", controller, drain_timeout).await;
732                    }
733
734                    if let Err(e) = controller
735                        .execute_runtime_command(camel_api::RuntimeCommand::RemoveRoute {
736                            route_id: route_id.clone(),
737                            command_id: next_reload_command_id("restart-remove", &route_id),
738                            causation_id: None,
739                        })
740                        .await
741                    {
742                        errors.push(ReloadError {
743                            route_id,
744                            action: "Restart (remove)".into(),
745                            error: e,
746                        });
747                        continue;
748                    }
749
750                    if let Err(e) = controller.add_route_definition(def).await {
751                        errors.push(ReloadError {
752                            route_id,
753                            action: "Restart (add)".into(),
754                            error: e,
755                        });
756                        continue;
757                    }
758
759                    if should_start_after_restart(runtime_status.as_deref()) {
760                        let start_result = controller
761                            .execute_runtime_command(camel_api::RuntimeCommand::StartRoute {
762                                route_id: route_id.clone(),
763                                command_id: next_reload_command_id("restart-start", &route_id),
764                                causation_id: None,
765                            })
766                            .await;
767                        if let Err(e) = start_result {
768                            errors.push(ReloadError {
769                                route_id,
770                                action: "Restart (start)".into(),
771                                error: e,
772                            });
773                        } else {
774                            tracing::info!(
775                                route_id = %route_id,
776                                "hot-reload: route restarted successfully"
777                            );
778                        }
779                    } else {
780                        tracing::info!(
781                            route_id = %route_id,
782                            "hot-reload: restart applied while preserving stopped lifecycle state"
783                        );
784                    }
785                }
786            }
787
788            ReloadAction::Skip { route_id } => {
789                tracing::debug!(route_id = %route_id, "hot-reload: skipped unchanged route");
790            }
791        }
792    }
793
794    errors
795}
796
797#[cfg(test)]
798mod tests {
799    use super::*;
800    use crate::lifecycle::adapters::route_controller::DefaultRouteController;
801    use crate::shared::components::domain::Registry;
802    use std::sync::Arc;
803    use std::time::Duration;
804
805    fn make_controller() -> DefaultRouteController {
806        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
807        DefaultRouteController::new(
808            registry,
809            Arc::new(camel_api::NoopPlatformService::default()),
810        )
811    }
812
813    #[test]
814    fn test_new_route_detected_as_add() {
815        let controller = make_controller();
816        let defs = vec![RouteDefinition::new("timer:tick", vec![]).with_route_id("new-route")];
817        let actions = compute_reload_actions(&defs, &controller);
818        assert_eq!(
819            actions,
820            vec![ReloadAction::Add {
821                route_id: "new-route".into()
822            }]
823        );
824    }
825
826    #[tokio::test]
827    async fn test_removed_route_detected() {
828        let mut controller = make_controller();
829        let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("old-route");
830        controller.add_route(def).await.unwrap();
831
832        let actions = compute_reload_actions(&[], &controller);
833        assert_eq!(
834            actions,
835            vec![ReloadAction::Remove {
836                route_id: "old-route".into()
837            }]
838        );
839    }
840
841    #[tokio::test]
842    async fn test_same_from_uri_detected_as_swap() {
843        let mut controller = make_controller();
844        let def = RouteDefinition::new("timer:tick", vec![])
845            .with_route_id("my-route")
846            .with_source_hash(100);
847        controller.add_route(def).await.unwrap();
848
849        let new_defs = vec![
850            RouteDefinition::new("timer:tick", vec![])
851                .with_route_id("my-route")
852                .with_source_hash(200),
853        ];
854        let actions = compute_reload_actions(&new_defs, &controller);
855        assert_eq!(
856            actions,
857            vec![ReloadAction::Swap {
858                route_id: "my-route".into()
859            }]
860        );
861    }
862
863    #[tokio::test]
864    async fn test_changed_from_uri_detected_as_restart() {
865        let mut controller = make_controller();
866        let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
867        controller.add_route(def).await.unwrap();
868
869        let new_defs =
870            vec![RouteDefinition::new("timer:tock?period=500", vec![]).with_route_id("my-route")];
871        let actions = compute_reload_actions(&new_defs, &controller);
872        assert_eq!(
873            actions,
874            vec![ReloadAction::Restart {
875                route_id: "my-route".into()
876            }]
877        );
878    }
879
880    #[tokio::test]
881    async fn test_runtime_snapshot_drives_remove_set() {
882        let mut controller = make_controller();
883        controller
884            .add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("runtime-route"))
885            .await
886            .unwrap();
887        controller
888            .add_route(RouteDefinition::new("timer:ghost", vec![]).with_route_id("ghost-route"))
889            .await
890            .unwrap();
891
892        let runtime_ids = vec!["runtime-route".to_string()];
893        let actions =
894            compute_reload_actions_from_runtime_snapshot(&[], &runtime_ids, &|_id: &str| None);
895        assert_eq!(
896            actions,
897            vec![ReloadAction::Remove {
898                route_id: "runtime-route".into()
899            }]
900        );
901    }
902
903    #[test]
904    fn test_runtime_snapshot_existing_routes_map_to_restart() {
905        let defs = vec![
906            RouteDefinition::new("timer:tick", vec![])
907                .with_route_id("runtime-r1")
908                .with_source_hash(10),
909            RouteDefinition::new("timer:tock", vec![])
910                .with_route_id("runtime-r2")
911                .with_source_hash(20),
912        ];
913        let runtime_ids = vec!["runtime-r1".to_string(), "runtime-r2".to_string()];
914        let runtime_hashes = std::collections::HashMap::from([
915            ("runtime-r1".to_string(), 11u64),
916            ("runtime-r2".to_string(), 22u64),
917        ]);
918
919        let actions =
920            compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
921                runtime_hashes.get(id).copied()
922            });
923        assert_eq!(
924            actions,
925            vec![
926                ReloadAction::Restart {
927                    route_id: "runtime-r1".into()
928                },
929                ReloadAction::Restart {
930                    route_id: "runtime-r2".into()
931                }
932            ]
933        );
934    }
935
936    #[tokio::test]
937    async fn test_same_hash_detected_as_skip() {
938        let mut controller = make_controller();
939        let def = RouteDefinition::new("timer:tick", vec![])
940            .with_route_id("my-route")
941            .with_source_hash(42);
942        controller.add_route(def).await.unwrap();
943
944        let new_defs = vec![
945            RouteDefinition::new("timer:tick", vec![])
946                .with_route_id("my-route")
947                .with_source_hash(42),
948        ];
949        let actions = compute_reload_actions(&new_defs, &controller);
950        assert_eq!(
951            actions,
952            vec![ReloadAction::Skip {
953                route_id: "my-route".into()
954            }]
955        );
956    }
957
958    #[tokio::test]
959    async fn test_none_hash_detected_as_swap() {
960        let mut controller = make_controller();
961        let def = RouteDefinition::new("timer:tick", vec![]).with_route_id("my-route");
962        controller.add_route(def).await.unwrap();
963
964        let new_defs = vec![
965            RouteDefinition::new("timer:tick", vec![])
966                .with_route_id("my-route")
967                .with_source_hash(99),
968        ];
969        let actions = compute_reload_actions(&new_defs, &controller);
970        assert_eq!(
971            actions,
972            vec![ReloadAction::Swap {
973                route_id: "my-route".into()
974            }]
975        );
976    }
977
978    #[test]
979    fn test_runtime_snapshot_same_hash_detected_as_skip() {
980        let defs = vec![
981            RouteDefinition::new("timer:tick", vec![])
982                .with_route_id("r1")
983                .with_source_hash(42),
984        ];
985        let runtime_ids = vec!["r1".to_string()];
986        let runtime_hashes = std::collections::HashMap::from([("r1".to_string(), 42u64)]);
987
988        let actions =
989            compute_reload_actions_from_runtime_snapshot(&defs, &runtime_ids, &|id: &str| {
990                runtime_hashes.get(id).copied()
991            });
992        assert_eq!(
993            actions,
994            vec![ReloadAction::Skip {
995                route_id: "r1".into()
996            }]
997        );
998    }
999
1000    // ---- execute_reload_actions tests ----
1001    // These use a full CamelContext with real components so that start/stop work.
1002
1003    #[tokio::test]
1004    async fn test_execute_add_action_inserts_route() {
1005        use crate::CamelContext;
1006        use camel_component_timer::TimerComponent;
1007
1008        let mut ctx = CamelContext::builder().build().await.unwrap();
1009        ctx.register_component(TimerComponent::new());
1010        ctx.start().await.unwrap();
1011
1012        let def = RouteDefinition::new("timer:tick?period=50&repeatCount=1", vec![])
1013            .with_route_id("exec-add-test");
1014        let actions = vec![ReloadAction::Add {
1015            route_id: "exec-add-test".into(),
1016        }];
1017        let errors = execute_reload_actions(
1018            actions,
1019            vec![def],
1020            &ctx.runtime_execution_handle(),
1021            Duration::from_secs(10),
1022            None,
1023        )
1024        .await;
1025        assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1026
1027        assert_eq!(
1028            ctx.runtime_execution_handle()
1029                .controller_route_count_for_test()
1030                .await,
1031            1
1032        );
1033
1034        ctx.stop().await.unwrap();
1035    }
1036
1037    #[tokio::test]
1038    async fn test_execute_remove_action_deletes_route() {
1039        use crate::CamelContext;
1040        use camel_component_timer::TimerComponent;
1041
1042        let mut ctx = CamelContext::builder().build().await.unwrap();
1043        ctx.register_component(TimerComponent::new());
1044        ctx.start().await.unwrap();
1045
1046        // Add route through context so runtime aggregate/projection are seeded.
1047        let def =
1048            RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-remove-test");
1049        ctx.add_route_definition(def).await.unwrap();
1050        assert_eq!(
1051            ctx.runtime_execution_handle()
1052                .controller_route_count_for_test()
1053                .await,
1054            1
1055        );
1056
1057        let actions = vec![ReloadAction::Remove {
1058            route_id: "exec-remove-test".into(),
1059        }];
1060        let errors = execute_reload_actions(
1061            actions,
1062            vec![],
1063            &ctx.runtime_execution_handle(),
1064            Duration::from_secs(10),
1065            None,
1066        )
1067        .await;
1068        assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1069
1070        assert_eq!(
1071            ctx.runtime_execution_handle()
1072                .controller_route_count_for_test()
1073                .await,
1074            0
1075        );
1076
1077        ctx.stop().await.unwrap();
1078    }
1079
1080    #[tokio::test]
1081    async fn test_execute_swap_action_replaces_pipeline() {
1082        use crate::CamelContext;
1083        use camel_component_timer::TimerComponent;
1084
1085        let mut ctx = CamelContext::builder().build().await.unwrap();
1086        ctx.register_component(TimerComponent::new());
1087        ctx.start().await.unwrap();
1088
1089        // Add route through context so runtime aggregate/projection are seeded.
1090        let def =
1091            RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
1092        ctx.add_route_definition(def).await.unwrap();
1093
1094        // Swap with same from_uri (exercises compile + swap_pipeline code path)
1095        let new_def =
1096            RouteDefinition::new("timer:tick?period=100", vec![]).with_route_id("exec-swap-test");
1097        let actions = vec![ReloadAction::Swap {
1098            route_id: "exec-swap-test".into(),
1099        }];
1100        let errors = execute_reload_actions(
1101            actions,
1102            vec![new_def],
1103            &ctx.runtime_execution_handle(),
1104            Duration::from_secs(10),
1105            None,
1106        )
1107        .await;
1108        assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1109
1110        // Route should still exist after swap
1111        assert_eq!(
1112            ctx.runtime_execution_handle()
1113                .controller_route_count_for_test()
1114                .await,
1115            1
1116        );
1117
1118        ctx.stop().await.unwrap();
1119    }
1120
1121    #[tokio::test]
1122    async fn test_execute_restart_action_preserves_registered_lifecycle_state() {
1123        use crate::CamelContext;
1124        use camel_api::{RuntimeQuery, RuntimeQueryResult};
1125        use camel_component_timer::TimerComponent;
1126
1127        let mut ctx = CamelContext::builder().build().await.unwrap();
1128        ctx.register_component(TimerComponent::new());
1129        ctx.start().await.unwrap();
1130
1131        // Add route through context so runtime aggregate/projection are seeded.
1132        let initial = RouteDefinition::new("timer:tick?period=100", vec![])
1133            .with_route_id("exec-restart-test");
1134        ctx.add_route_definition(initial).await.unwrap();
1135
1136        // Route is seeded as Registered by context registration.
1137        let before = ctx
1138            .runtime()
1139            .ask(RuntimeQuery::GetRouteStatus {
1140                route_id: "exec-restart-test".into(),
1141            })
1142            .await
1143            .unwrap();
1144        match before {
1145            RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
1146            other => panic!("unexpected query result: {other:?}"),
1147        }
1148
1149        let replacement = RouteDefinition::new("timer:tick?period=250", vec![])
1150            .with_route_id("exec-restart-test");
1151        let actions = vec![ReloadAction::Restart {
1152            route_id: "exec-restart-test".into(),
1153        }];
1154        let errors = execute_reload_actions(
1155            actions,
1156            vec![replacement],
1157            &ctx.runtime_execution_handle(),
1158            Duration::from_secs(10),
1159            None,
1160        )
1161        .await;
1162        assert!(errors.is_empty(), "Expected no errors, got: {:?}", errors);
1163
1164        // Restart re-adds through RuntimeExecutionHandle::add_route_definition,
1165        // which now goes through InternalRuntimeCommandBus and preserves Registered state.
1166        let after = ctx
1167            .runtime()
1168            .ask(RuntimeQuery::GetRouteStatus {
1169                route_id: "exec-restart-test".into(),
1170            })
1171            .await
1172            .unwrap();
1173        match after {
1174            RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
1175            other => panic!("unexpected query result: {other:?}"),
1176        }
1177
1178        assert_eq!(
1179            ctx.runtime_route_status("exec-restart-test").await.unwrap(),
1180            Some("Registered".to_string())
1181        );
1182
1183        ctx.stop().await.unwrap();
1184    }
1185
1186    #[tokio::test]
1187    async fn test_execute_swap_action_missing_definition_returns_error() {
1188        use crate::CamelContext;
1189
1190        let ctx = CamelContext::builder().build().await.unwrap();
1191        let errors = execute_reload_actions(
1192            vec![ReloadAction::Swap {
1193                route_id: "missing-swap-def".into(),
1194            }],
1195            vec![],
1196            &ctx.runtime_execution_handle(),
1197            Duration::from_millis(1),
1198            None,
1199        )
1200        .await;
1201
1202        assert_eq!(errors.len(), 1);
1203        assert_eq!(errors[0].action, "Swap");
1204        assert_eq!(errors[0].route_id, "missing-swap-def");
1205    }
1206
1207    #[tokio::test]
1208    async fn test_execute_add_action_missing_definition_returns_error() {
1209        use crate::CamelContext;
1210
1211        let ctx = CamelContext::builder().build().await.unwrap();
1212        let errors = execute_reload_actions(
1213            vec![ReloadAction::Add {
1214                route_id: "missing-add-def".into(),
1215            }],
1216            vec![],
1217            &ctx.runtime_execution_handle(),
1218            Duration::from_millis(1),
1219            None,
1220        )
1221        .await;
1222
1223        assert_eq!(errors.len(), 1);
1224        assert_eq!(errors[0].action, "Add");
1225        assert_eq!(errors[0].route_id, "missing-add-def");
1226    }
1227
1228    #[tokio::test]
1229    async fn test_execute_remove_action_status_error_returns_error() {
1230        use crate::CamelContext;
1231
1232        let ctx = CamelContext::builder().build().await.unwrap();
1233        let errors = execute_reload_actions(
1234            vec![ReloadAction::Remove {
1235                route_id: "missing-remove-route".into(),
1236            }],
1237            vec![],
1238            &ctx.runtime_execution_handle(),
1239            Duration::from_millis(1),
1240            None,
1241        )
1242        .await;
1243
1244        assert_eq!(errors.len(), 1);
1245        assert!(errors[0].action.starts_with("Remove"));
1246        assert_eq!(errors[0].route_id, "missing-remove-route");
1247    }
1248
1249    #[tokio::test]
1250    async fn test_execute_restart_action_missing_definition_returns_error() {
1251        use crate::CamelContext;
1252
1253        let ctx = CamelContext::builder().build().await.unwrap();
1254        let errors = execute_reload_actions(
1255            vec![ReloadAction::Restart {
1256                route_id: "missing-restart-def".into(),
1257            }],
1258            vec![],
1259            &ctx.runtime_execution_handle(),
1260            Duration::from_millis(1),
1261            None,
1262        )
1263        .await;
1264
1265        assert_eq!(errors.len(), 1);
1266        assert_eq!(errors[0].action, "Restart");
1267        assert_eq!(errors[0].route_id, "missing-restart-def");
1268    }
1269}