1use std::collections::HashMap;
7use std::sync::{Arc, Weak};
8use std::time::Duration;
9
10use tokio::sync::mpsc;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, ServiceExt};
13use tracing::{info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::metrics::MetricsCollector;
17#[allow(unused_imports)]
18use camel_api::{
19 BoxProcessor, CamelError, Exchange, FunctionInvoker, IdentityProcessor, NoOpMetrics,
20 NoopPlatformService, PlatformService, ProducerContext, RouteController, RuntimeHandle,
21 StepLifecycle,
22};
23use camel_component_api::{Consumer, ConsumerContext, consumer::ExchangeEnvelope};
24use camel_processor::aggregator::AggregatorService;
25pub use camel_processor::aggregator::SharedLanguageRegistry;
26
27use crate::health_registry::HealthCheckRegistry;
28use crate::lifecycle::adapters::controller_component_context::ControllerComponentContext;
29use crate::lifecycle::adapters::route_compiler_ext::{RouteCompilerExt, build_eh_config_pipeline};
30use crate::lifecycle::adapters::route_helpers::{
31 AggregateSplitInfo, CrashNotification, ManagedRoute, assert_no_mixed_top_level_splits,
32 handle_is_running, inferred_lifecycle_label, is_pending,
33};
34pub(crate) use crate::lifecycle::adapters::route_helpers::{CompiledPipeline, PreparedRoute};
35#[cfg(test)]
36pub(super) use crate::lifecycle::adapters::route_helpers::{
37 emit_start_route_event, set_start_route_event_hook,
38};
39use crate::lifecycle::adapters::route_registry::RouteRegistry;
40use crate::lifecycle::adapters::route_runtime_state;
41use crate::lifecycle::adapters::step_compilers::CompiledStep;
42use crate::lifecycle::application::route_definition::{BuilderStep, RouteDefinition};
43use crate::shared::components::domain::Registry;
44use crate::shared::observability::domain::{DetailLevel, TracerConfig};
45use camel_bean::BeanRegistry;
46
47pub struct DefaultRouteController {
55 pub(super) routes: RouteRegistry,
57 pub(super) registry: Arc<std::sync::Mutex<Registry>>,
59 pub(super) languages: SharedLanguageRegistry,
61 pub(super) beans: Arc<std::sync::Mutex<BeanRegistry>>,
63 pub(super) runtime: Option<Weak<dyn RuntimeHandle>>,
65 pub(super) global_error_handler: Option<ErrorHandlerConfig>,
67 pub(super) crash_notifier: Option<mpsc::Sender<CrashNotification>>,
69 pub(super) tracing_enabled: bool,
71 pub(super) tracer_detail_level: DetailLevel,
73 pub(super) tracer_metrics: Option<Arc<dyn MetricsCollector>>,
75 pub(super) platform_service: Arc<dyn PlatformService>,
76 pub(super) function_invoker: Option<Arc<dyn FunctionInvoker>>,
77 pub(super) health_registry: Option<Arc<HealthCheckRegistry>>,
78 pub(super) idempotent_repositories: crate::SharedIdempotentRegistry,
82 pub(super) claim_check_repositories: crate::SharedClaimCheckRegistry,
83}
84
85impl DefaultRouteController {
86 pub(super) fn health_registry(&self) -> Arc<HealthCheckRegistry> {
87 self.health_registry.clone().unwrap_or_else(|| {
88 warn!("health_registry not configured — creating isolated fallback");
89 Arc::new(HealthCheckRegistry::new(Duration::from_secs(5)))
90 })
91 }
92
93 pub fn new(
95 registry: Arc<std::sync::Mutex<Registry>>,
96 platform_service: Arc<dyn PlatformService>,
97 ) -> Self {
98 Self::with_beans_and_platform_service(
99 registry,
100 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
101 platform_service,
102 )
103 }
104
105 pub fn with_beans(
107 registry: Arc<std::sync::Mutex<Registry>>,
108 beans: Arc<std::sync::Mutex<BeanRegistry>>,
109 ) -> Self {
110 Self::with_beans_and_platform_service(
111 registry,
112 beans,
113 Arc::new(NoopPlatformService::default()),
114 )
115 }
116
117 fn with_beans_and_platform_service(
118 registry: Arc<std::sync::Mutex<Registry>>,
119 beans: Arc<std::sync::Mutex<BeanRegistry>>,
120 platform_service: Arc<dyn PlatformService>,
121 ) -> Self {
122 Self {
123 routes: RouteRegistry::new(),
124 registry,
125 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
126 beans,
127 runtime: None,
128 global_error_handler: None,
129 crash_notifier: None,
130 tracing_enabled: false,
131 tracer_detail_level: DetailLevel::Minimal,
132 tracer_metrics: None,
133 platform_service,
134 function_invoker: None,
135 health_registry: None,
136 idempotent_repositories: Arc::new(crate::IdempotentRegistry::new()),
137 claim_check_repositories: Arc::new(crate::ClaimCheckRegistry::new()),
138 }
139 }
140
141 pub fn with_languages(
143 registry: Arc<std::sync::Mutex<Registry>>,
144 languages: SharedLanguageRegistry,
145 platform_service: Arc<dyn PlatformService>,
146 ) -> Self {
147 Self {
148 routes: RouteRegistry::new(),
149 registry,
150 languages,
151 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
152 runtime: None,
153 global_error_handler: None,
154 crash_notifier: None,
155 tracing_enabled: false,
156 tracer_detail_level: DetailLevel::Minimal,
157 tracer_metrics: None,
158 platform_service,
159 function_invoker: None,
160 health_registry: None,
161 idempotent_repositories: Arc::new(crate::IdempotentRegistry::new()),
162 claim_check_repositories: Arc::new(crate::ClaimCheckRegistry::new()),
163 }
164 }
165
166 pub fn with_languages_and_beans(
167 registry: Arc<std::sync::Mutex<Registry>>,
168 languages: SharedLanguageRegistry,
169 platform_service: Arc<dyn PlatformService>,
170 beans: Arc<std::sync::Mutex<BeanRegistry>>,
171 ) -> Self {
172 Self {
173 routes: RouteRegistry::new(),
174 registry,
175 languages,
176 beans,
177 runtime: None,
178 global_error_handler: None,
179 crash_notifier: None,
180 tracing_enabled: false,
181 tracer_detail_level: DetailLevel::Minimal,
182 tracer_metrics: None,
183 platform_service,
184 function_invoker: None,
185 health_registry: None,
186 idempotent_repositories: Arc::new(crate::IdempotentRegistry::new()),
187 claim_check_repositories: Arc::new(crate::ClaimCheckRegistry::new()),
188 }
189 }
190
191 pub fn with_function_invoker(mut self, function_invoker: Arc<dyn FunctionInvoker>) -> Self {
192 self.function_invoker = Some(function_invoker);
193 self
194 }
195
196 pub(crate) fn set_idempotent_repositories(
197 &mut self,
198 repositories: crate::SharedIdempotentRegistry,
199 ) {
200 self.idempotent_repositories = repositories;
201 }
202
203 pub(crate) fn set_claim_check_repositories(
204 &mut self,
205 repositories: crate::SharedClaimCheckRegistry,
206 ) {
207 self.claim_check_repositories = repositories;
208 }
209
210 pub fn set_health_registry(&mut self, registry: Arc<HealthCheckRegistry>) {
211 self.health_registry = Some(registry);
212 }
213
214 pub fn set_function_invoker(&mut self, invoker: Arc<dyn FunctionInvoker>) {
215 self.function_invoker = Some(invoker);
216 }
217
218 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
220 self.runtime = Some(Arc::downgrade(&runtime));
221 }
222
223 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
228 self.crash_notifier = Some(tx);
229 }
230
231 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
233 self.global_error_handler = Some(config);
234 }
235
236 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
238 self.tracing_enabled = config.enabled;
239 self.tracer_detail_level = config.detail_level.clone();
240 self.tracer_metrics = config.metrics_collector.clone();
241 }
242
243 fn build_producer_context(&self, route_id: &str) -> Result<ProducerContext, CamelError> {
244 let mut producer_ctx = ProducerContext::new().with_route_id(route_id);
245 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
246 producer_ctx = producer_ctx.with_runtime(runtime);
247 }
248 Ok(producer_ctx)
249 }
250
251 fn route_compiler_ext(&self) -> RouteCompilerExt<'_> {
253 RouteCompilerExt {
254 registry: &self.registry,
255 languages: &self.languages,
256 beans: &self.beans,
257 function_invoker: &self.function_invoker,
258 tracing_enabled: self.tracing_enabled,
259 tracer_detail_level: &self.tracer_detail_level,
260 tracer_metrics: &self.tracer_metrics,
261 platform_service: &self.platform_service,
262 runtime: &self.runtime,
263 global_error_handler: &self.global_error_handler,
264 health_registry: &self.health_registry,
265 route_registry: &self.routes,
266 idempotent_repositories: Arc::clone(&self.idempotent_repositories),
267 claim_check_repositories: Arc::clone(&self.claim_check_repositories),
268 }
269 }
270
271 #[allow(dead_code)] pub(crate) fn resolve_steps(
274 &self,
275 steps: Vec<BuilderStep>,
276 producer_ctx: &ProducerContext,
277 registry: &Arc<std::sync::Mutex<Registry>>,
278 route_id: Option<&str>,
279 staging_mode: &super::step_resolution::FunctionStagingMode,
280 ) -> Result<Vec<CompiledStep>, CamelError> {
281 let component_ctx = Arc::new(ControllerComponentContext::new(
282 Arc::clone(registry),
283 Arc::clone(&self.languages),
284 self.tracer_metrics
285 .clone()
286 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
287 Arc::clone(&self.platform_service),
288 self.health_registry(),
289 route_id.map(|s| s.to_string()),
290 ));
291 let rt: Arc<dyn camel_component_api::RuntimeObservability> =
292 Arc::clone(&component_ctx) as Arc<_>;
293
294 super::step_resolution::resolve_steps(
295 steps,
296 producer_ctx,
297 rt,
298 registry,
299 &self.languages,
300 &self.beans,
301 self.function_invoker.clone(),
302 component_ctx,
303 route_id,
304 staging_mode,
305 &self.idempotent_repositories,
306 &self.claim_check_repositories,
307 )
308 }
309
310 pub async fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
320 let route_id = definition.route_id().to_string();
321
322 if self.routes.contains_key(&route_id) {
323 return Err(CamelError::RouteError(format!(
324 "Route '{}' already exists",
325 route_id
326 )));
327 }
328
329 info!(route_id = %route_id, "Adding route to controller");
330
331 let prepared = match self.build_managed_route(
332 definition,
333 &super::step_resolution::FunctionStagingMode::DirectAdd,
334 ) {
335 Ok(prepared) => prepared,
336 Err(err) => {
337 self.discard_function_staging();
338 return Err(err);
339 }
340 };
341
342 if let Some(invoker) = &self.function_invoker
343 && let Err(err) = invoker.commit_staged().await
344 {
345 invoker.discard_staging(0);
346 return Err(CamelError::Config(err.to_string()));
347 }
348
349 self.routes
350 .insert(prepared.route_id.clone(), prepared.managed);
351
352 Ok(())
353 }
354
355 fn build_managed_route(
356 &self,
357 definition: RouteDefinition,
358 staging_mode: &super::step_resolution::FunctionStagingMode,
359 ) -> Result<PreparedRoute, CamelError> {
360 let route_id = definition.route_id().to_string();
361
362 let definition_info = definition.to_info();
363 let RouteDefinition {
364 from_uri,
365 steps,
366 error_handler,
367 circuit_breaker,
368 security_policy,
369 security_authenticator,
370 unit_of_work,
371 concurrency,
372 ..
373 } = definition;
374
375 let producer_ctx = self.build_producer_context(&route_id)?;
376
377 assert_no_mixed_top_level_splits(&steps)?;
379
380 let (aggregate_split, processors_with_contracts) = self
381 .route_compiler_ext()
382 .detect_and_validate_route_split(steps, &producer_ctx, &route_id, staging_mode)?;
383 let lifecycle = super::route_helpers::collect_lifecycle(&processors_with_contracts);
384 let route_id_for_tracing = route_id.clone();
385 let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
386
387 let mut pipeline = build_eh_config_pipeline(
388 eh_config.as_ref(),
389 Arc::clone(&self.registry),
390 Arc::clone(&self.languages),
391 self.tracer_metrics.clone(),
392 Arc::clone(&self.platform_service),
393 self.health_registry(),
394 &route_id_for_tracing,
395 &producer_ctx,
396 processors_with_contracts,
397 self.tracing_enabled,
398 self.tracer_detail_level.clone(),
399 security_policy.clone(),
400 circuit_breaker,
401 )?;
402
403 let uow_counter = if let Some(uow_config) = &unit_of_work {
404 let component_ctx = Arc::new(ControllerComponentContext::new(
405 Arc::clone(&self.registry),
406 Arc::clone(&self.languages),
407 self.tracer_metrics
408 .clone()
409 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
410 Arc::clone(&self.platform_service),
411 self.health_registry(),
412 Some(route_id.clone()),
413 ));
414 let rt: Arc<dyn camel_component_api::RuntimeObservability> =
415 Arc::clone(&component_ctx) as Arc<_>;
416 let (uow_layer, counter) = super::route_compiler_ext::resolve_uow_layer(
417 uow_config,
418 &producer_ctx,
419 rt,
420 component_ctx.as_ref(),
421 None,
422 )?;
423 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
424 Some(counter)
425 } else {
426 None
427 };
428
429 Ok(PreparedRoute {
430 route_id,
431 managed: ManagedRoute {
432 definition: definition_info,
433 from_uri,
434 pipeline: super::pipeline_runtime::new_shared_pipeline_with_lifecycle(
435 pipeline, lifecycle,
436 ),
437 concurrency,
438 consumer_handle: None,
439 pipeline_handle: None,
440 consumer_cancel_token: CancellationToken::new(),
441 pipeline_cancel_token: CancellationToken::new(),
442 channel_sender: None,
443 in_flight: uow_counter,
444 aggregate_split,
445 agg_service: None,
446 compiled: route_runtime_state::CompiledRoute {
447 security_policy,
448 security_authenticator,
449 },
450 },
451 })
452 }
453
454 pub(crate) fn insert_prepared_route(
455 &mut self,
456 prepared: PreparedRoute,
457 ) -> Result<(), CamelError> {
458 if self.routes.contains_key(&prepared.route_id) {
459 return Err(CamelError::RouteError(format!(
460 "Route '{}' already exists",
461 prepared.route_id
462 )));
463 }
464 self.routes
465 .insert(prepared.route_id.clone(), prepared.managed);
466 Ok(())
467 }
468
469 pub async fn add_route_with_generation(
470 &mut self,
471 definition: RouteDefinition,
472 generation: u64,
473 ) -> Result<(), CamelError> {
474 let route_id = definition.route_id().to_string();
475
476 if self.routes.contains_key(&route_id) {
477 return Err(CamelError::RouteError(format!(
478 "Route '{}' already exists",
479 route_id
480 )));
481 }
482
483 info!(route_id = %route_id, generation, "Adding route to controller with generation");
484
485 let prepared = self.build_managed_route(
486 definition,
487 &super::step_resolution::FunctionStagingMode::HotReload { generation },
488 )?;
489
490 self.routes
491 .insert(prepared.route_id.clone(), prepared.managed);
492
493 Ok(())
494 }
495
496 pub(crate) fn prepare_route_definition_with_generation(
497 &self,
498 definition: RouteDefinition,
499 generation: u64,
500 ) -> Result<PreparedRoute, CamelError> {
501 self.build_managed_route(
502 definition,
503 &super::step_resolution::FunctionStagingMode::HotReload { generation },
504 )
505 }
506
507 pub async fn remove_route_preserving_functions(
508 &mut self,
509 route_id: &str,
510 ) -> Result<(), CamelError> {
511 let managed = self.routes.get(route_id).ok_or_else(|| {
512 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
513 })?;
514 if handle_is_running(&managed.consumer_handle)
515 || handle_is_running(&managed.pipeline_handle)
516 {
517 return Err(CamelError::RouteError(format!(
518 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
519 route_id,
520 inferred_lifecycle_label(managed)
521 )));
522 }
523 self.routes.remove(route_id);
524 if let Some(reg) = &self.health_registry {
525 reg.unregister_for_route(route_id);
526 }
527 info!(route_id = %route_id, "Route removed from controller (functions preserved for reload finalize)");
528 Ok(())
529 }
530
531 pub fn compile_route_definition(
534 &self,
535 def: RouteDefinition,
536 ) -> Result<BoxProcessor, CamelError> {
537 self.route_compiler_ext().compile_route_definition(def)
538 }
539
540 pub fn compile_route_definition_with_generation(
542 &self,
543 def: RouteDefinition,
544 generation: u64,
545 ) -> Result<BoxProcessor, CamelError> {
546 self.route_compiler_ext()
547 .compile_route_definition_with_generation(def, generation)
548 }
549
550 pub(crate) fn compile_route_definition_pipeline(
555 &self,
556 def: RouteDefinition,
557 generation: u64,
558 ) -> Result<CompiledPipeline, CamelError> {
559 self.route_compiler_ext()
560 .compile_route_definition_pipeline(def, generation)
561 }
562
563 pub(crate) fn compile_route_definition_dry_pipeline(
568 &self,
569 def: RouteDefinition,
570 ) -> Result<CompiledPipeline, CamelError> {
571 self.route_compiler_ext()
572 .compile_route_definition_dry_pipeline(def)
573 }
574
575 pub async fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
581 let managed = self.routes.get(route_id).ok_or_else(|| {
582 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
583 })?;
584 if handle_is_running(&managed.consumer_handle)
585 || handle_is_running(&managed.pipeline_handle)
586 {
587 return Err(CamelError::RouteError(format!(
588 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
589 route_id,
590 inferred_lifecycle_label(managed)
591 )));
592 }
593 if let Some(invoker) = &self.function_invoker {
594 for (id, rid) in self.collect_function_refs(route_id) {
595 if let Err(e) = invoker.unregister(&id, rid.as_deref()).await {
596 warn!(route_id = %route_id, error = %e, "Failed to unregister function during route removal");
597 }
598 }
599 }
600 self.routes.remove(route_id);
601 if let Some(reg) = &self.health_registry {
602 reg.unregister_for_route(route_id);
603 }
604 info!(route_id = %route_id, "Route removed from controller");
605 Ok(())
606 }
607
608 fn collect_function_refs(
609 &self,
610 route_id: &str,
611 ) -> Vec<(camel_api::FunctionId, Option<String>)> {
612 self.function_invoker
613 .as_ref()
614 .map(|invoker| invoker.function_refs_for_route(route_id))
615 .unwrap_or_default()
616 }
617
618 fn discard_function_staging(&self) {
619 if let Some(invoker) = &self.function_invoker {
620 invoker.discard_staging(0);
621 }
622 }
623
624 pub fn route_count(&self) -> usize {
626 self.routes.route_count()
627 }
628
629 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
630 self.routes.in_flight_count(route_id)
631 }
632
633 pub fn route_exists(&self, route_id: &str) -> bool {
635 self.routes.route_exists(route_id)
636 }
637
638 pub fn route_ids(&self) -> Vec<String> {
640 self.routes.route_ids()
641 }
642
643 pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
644 self.routes.route_source_hash(route_id)
645 }
646
647 pub fn auto_startup_route_ids(&self) -> Vec<String> {
649 self.routes.auto_startup_route_ids()
650 }
651
652 pub fn shutdown_route_ids(&self) -> Vec<String> {
654 self.routes.shutdown_route_ids()
655 }
656
657 pub fn swap_pipeline(
676 &self,
677 route_id: &str,
678 new_pipeline: BoxProcessor,
679 ) -> Result<(), CamelError> {
680 let managed = self
681 .routes
682 .get(route_id)
683 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
684
685 let assembly = managed.pipeline.load();
686 let has_lifecycle = !assembly.lifecycle.is_empty();
687
688 if has_lifecycle || managed.agg_service.is_some() {
689 warn!(
690 route_id = %route_id,
691 "Hot-swap rejected — route has lifecycle/agg steps; use Restart path"
692 );
693 return Err(CamelError::RouteError(format!(
694 "Route '{}' contains stateful steps (lifecycle-bearing). Hot-swap not supported — use restart.",
695 route_id
696 )));
697 }
698
699 drop(assembly);
700
701 if managed.aggregate_split.is_some() {
702 warn!(
703 route_id = %route_id,
704 "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
705 );
706 }
707
708 super::pipeline_runtime::swap_pipeline_raw(&managed.pipeline, new_pipeline, vec![]);
709 info!(route_id = %route_id, "Pipeline swapped atomically");
710 Ok(())
711 }
712
713 pub(crate) fn swap_pipeline_raw(
723 &self,
724 route_id: &str,
725 new_pipeline: BoxProcessor,
726 lifecycle: Vec<Arc<dyn StepLifecycle>>,
727 ) -> Result<(), CamelError> {
728 let managed = self
729 .routes
730 .get(route_id)
731 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
732 super::pipeline_runtime::swap_pipeline_raw(&managed.pipeline, new_pipeline, lifecycle);
733 info!(route_id = %route_id, "Pipeline swapped (raw — lifecycle bypass)");
734 Ok(())
735 }
736
737 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
739 self.routes.route_from_uri(route_id)
740 }
741
742 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
747 self.routes.get_pipeline(route_id)
748 }
749
750 pub(crate) fn route_has_lifecycle(&self, route_id: &str) -> bool {
754 self.routes
755 .get(route_id)
756 .map(|managed| !managed.pipeline.load().lifecycle.is_empty())
757 .unwrap_or(false)
758 }
759
760 pub(super) async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
762 self.routes.stop_route(route_id).await
763 }
764
765 pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
766 self.start_route(route_id).await
767 }
768
769 pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
770 self.stop_route(route_id).await
771 }
772}
773
774impl DefaultRouteController {
777 #[allow(clippy::too_many_arguments)]
783 pub(super) async fn start_aggregate_route(
784 &mut self,
785 route_id: &str,
786 split: AggregateSplitInfo,
787 consumer: Box<dyn Consumer>,
788 consumer_ctx: ConsumerContext,
789 mut rx: mpsc::Receiver<ExchangeEnvelope>,
790 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
791 runtime_for_consumer: Option<Weak<dyn RuntimeHandle>>,
792 tx_for_storage: mpsc::Sender<ExchangeEnvelope>,
793 pipeline_cancel: CancellationToken,
795 ) -> Result<(), CamelError> {
796 let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
797
798 let route_cancel_clone = pipeline_cancel.clone();
799 let svc = AggregatorService::new(
800 split.agg_config.clone(),
801 late_tx,
802 Arc::clone(&self.languages),
803 route_cancel_clone,
804 );
805 let agg = Arc::new(svc);
806
807 let pipeline_cancel_for_monitor = pipeline_cancel.clone();
808 let agg_for_monitor = Arc::clone(&agg);
809
810 {
811 let managed = self
812 .routes
813 .get_mut(route_id)
814 .expect("invariant: route must exist"); managed.agg_service = Some(Arc::clone(&agg));
816 }
817
818 let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
819 let pre_pipeline = split.pre_pipeline;
820 let post_pipeline = split.post_pipeline;
821
822 let pipeline_handle = tokio::spawn(async move {
824 loop {
825 tokio::select! {
826 biased;
827
828 late_ex = async {
829 let mut rx = late_rx.lock().await;
830 rx.recv().await
831 } => {
832 match late_ex {
833 Some(ex) => {
834 let pipe = post_pipeline.load();
835 if let Err(e) = pipe.processor.clone_inner().oneshot(ex).await {
836 tracing::warn!(error = %e, "late exchange post-pipeline failed");
837 }
838 }
839 None => return,
840 }
841 }
842
843 envelope_opt = rx.recv() => {
844 match envelope_opt {
845 Some(envelope) => {
846 let ExchangeEnvelope { exchange, reply_tx } = envelope;
847 let pre_pipe = pre_pipeline.load();
848 let ex = match pre_pipe.processor.clone_inner().oneshot(exchange).await {
849 Ok(ex) => ex,
850 Err(e) => {
851 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
852 continue;
853 }
854 };
855
856 let ex = {
857 let cloned_svc = agg.as_ref().clone();
858 cloned_svc.oneshot(ex).await
859 };
860
861 match ex {
862 Ok(ex) => {
863 if !is_pending(&ex) {
864 let post_pipe = post_pipeline.load();
865 let out = post_pipe.processor.clone_inner().oneshot(ex).await;
866 if let Some(tx) = reply_tx { let _ = tx.send(out); }
867 } else if let Some(tx) = reply_tx {
868 let _ = tx.send(Ok(ex));
869 }
870 }
871 Err(e) => {
872 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
873 }
874 }
875 }
876 None => return,
877 }
878 }
879
880 _ = pipeline_cancel.cancelled() => {
881 agg.force_complete_all();
882 let mut rx_guard = late_rx.lock().await;
883 while let Ok(late_ex) = rx_guard.try_recv() {
884 let pipe = post_pipeline.load();
885 let _ = pipe.processor.clone_inner().oneshot(late_ex).await;
886 }
887 break;
888 }
889 }
890 }
891 });
892 #[cfg(test)]
893 emit_start_route_event("pipeline_spawned");
894
895 let consumer_handle = super::consumer_management::spawn_consumer_task(
898 route_id.to_string(),
899 consumer,
900 consumer_ctx,
901 crash_notifier,
902 runtime_for_consumer,
903 false,
904 );
905
906 let force_on_stop = agg_for_monitor.config().force_completion_on_stop;
910 let consumer_handle = tokio::spawn(async move {
911 let _ = consumer_handle.await;
912 if !pipeline_cancel_for_monitor.is_cancelled() {
913 agg_for_monitor.force_complete_all();
914 if force_on_stop {
915 pipeline_cancel_for_monitor.cancel();
916 }
917 }
918 });
919 #[cfg(test)]
920 emit_start_route_event("consumer_spawned");
921
922 {
923 let managed = self
924 .routes
925 .get_mut(route_id)
926 .expect("invariant: route must exist"); managed.consumer_handle = Some(consumer_handle);
928 managed.pipeline_handle = Some(pipeline_handle);
929 managed.channel_sender = Some(tx_for_storage);
930 }
931
932 info!(route_id = %route_id, "Route started (aggregate with timeout)");
933 Ok(())
934 }
935
936 #[cfg(test)]
941 pub(crate) fn set_route_lifecycle_for_test(
942 &mut self,
943 route_id: &str,
944 lifecycle: Vec<Arc<dyn StepLifecycle>>,
945 ) -> Result<(), CamelError> {
946 use super::pipeline_runtime::PipelineAssembly;
947 use camel_api::SyncBoxProcessor;
948 use std::sync::Arc;
949
950 let managed = self
951 .routes
952 .get_mut(route_id)
953 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
954 let old_processor = managed.pipeline.load().processor.clone_inner();
955 managed.pipeline.store(Arc::new(PipelineAssembly::new(
956 SyncBoxProcessor::new(old_processor),
957 lifecycle,
958 )));
959 Ok(())
960 }
961}
962
963#[cfg(test)]
964#[path = "route_controller_tests.rs"]
965mod tests;