1use std::collections::HashMap;
7use std::sync::{Arc, Weak};
8use std::time::Duration;
9
10use tokio::sync::mpsc;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, ServiceExt};
13use tracing::{info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::metrics::MetricsCollector;
17#[allow(unused_imports)]
18use camel_api::{
19 BoxProcessor, CamelError, Exchange, FunctionInvoker, IdentityProcessor, NoOpMetrics,
20 NoopPlatformService, PlatformService, ProducerContext, RouteController, RuntimeHandle,
21};
22use camel_component_api::{Consumer, ConsumerContext, consumer::ExchangeEnvelope};
23use camel_processor::aggregator::AggregatorService;
24pub use camel_processor::aggregator::SharedLanguageRegistry;
25
26use crate::health_registry::HealthCheckRegistry;
27use crate::lifecycle::adapters::controller_component_context::ControllerComponentContext;
28use crate::lifecycle::adapters::route_compiler::compose_pipeline;
29use crate::lifecycle::adapters::route_compiler_ext::{RouteCompilerExt, build_eh_config_pipeline};
30pub(crate) use crate::lifecycle::adapters::route_helpers::PreparedRoute;
31use crate::lifecycle::adapters::route_helpers::{
32 AggregateSplitInfo, CrashNotification, ManagedRoute, find_top_level_aggregate_requiring_split,
33 handle_is_running, inferred_lifecycle_label, is_pending,
34};
35#[cfg(test)]
36pub(super) use crate::lifecycle::adapters::route_helpers::{
37 emit_start_route_event, set_start_route_event_hook,
38};
39use crate::lifecycle::adapters::route_registry::RouteRegistry;
40use crate::lifecycle::adapters::route_runtime_state;
41use crate::lifecycle::adapters::step_compilers::CompiledStep;
42use crate::lifecycle::application::route_definition::{BuilderStep, RouteDefinition};
43use crate::shared::components::domain::Registry;
44use crate::shared::observability::domain::{DetailLevel, TracerConfig};
45use camel_bean::BeanRegistry;
46
47pub struct DefaultRouteController {
55 pub(super) routes: RouteRegistry,
57 pub(super) registry: Arc<std::sync::Mutex<Registry>>,
59 pub(super) languages: SharedLanguageRegistry,
61 pub(super) beans: Arc<std::sync::Mutex<BeanRegistry>>,
63 pub(super) runtime: Option<Weak<dyn RuntimeHandle>>,
65 pub(super) global_error_handler: Option<ErrorHandlerConfig>,
67 pub(super) crash_notifier: Option<mpsc::Sender<CrashNotification>>,
69 pub(super) tracing_enabled: bool,
71 pub(super) tracer_detail_level: DetailLevel,
73 pub(super) tracer_metrics: Option<Arc<dyn MetricsCollector>>,
75 pub(super) platform_service: Arc<dyn PlatformService>,
76 pub(super) function_invoker: Option<Arc<dyn FunctionInvoker>>,
77 pub(super) health_registry: Option<Arc<HealthCheckRegistry>>,
78}
79
80impl DefaultRouteController {
81 pub(super) fn health_registry(&self) -> Arc<HealthCheckRegistry> {
82 self.health_registry.clone().unwrap_or_else(|| {
83 warn!("health_registry not configured — creating isolated fallback");
84 Arc::new(HealthCheckRegistry::new(Duration::from_secs(5)))
85 })
86 }
87
88 pub fn new(
90 registry: Arc<std::sync::Mutex<Registry>>,
91 platform_service: Arc<dyn PlatformService>,
92 ) -> Self {
93 Self::with_beans_and_platform_service(
94 registry,
95 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
96 platform_service,
97 )
98 }
99
100 pub fn with_beans(
102 registry: Arc<std::sync::Mutex<Registry>>,
103 beans: Arc<std::sync::Mutex<BeanRegistry>>,
104 ) -> Self {
105 Self::with_beans_and_platform_service(
106 registry,
107 beans,
108 Arc::new(NoopPlatformService::default()),
109 )
110 }
111
112 fn with_beans_and_platform_service(
113 registry: Arc<std::sync::Mutex<Registry>>,
114 beans: Arc<std::sync::Mutex<BeanRegistry>>,
115 platform_service: Arc<dyn PlatformService>,
116 ) -> Self {
117 Self {
118 routes: RouteRegistry::new(),
119 registry,
120 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
121 beans,
122 runtime: None,
123 global_error_handler: None,
124 crash_notifier: None,
125 tracing_enabled: false,
126 tracer_detail_level: DetailLevel::Minimal,
127 tracer_metrics: None,
128 platform_service,
129 function_invoker: None,
130 health_registry: None,
131 }
132 }
133
134 pub fn with_languages(
136 registry: Arc<std::sync::Mutex<Registry>>,
137 languages: SharedLanguageRegistry,
138 platform_service: Arc<dyn PlatformService>,
139 ) -> Self {
140 Self {
141 routes: RouteRegistry::new(),
142 registry,
143 languages,
144 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
145 runtime: None,
146 global_error_handler: None,
147 crash_notifier: None,
148 tracing_enabled: false,
149 tracer_detail_level: DetailLevel::Minimal,
150 tracer_metrics: None,
151 platform_service,
152 function_invoker: None,
153 health_registry: None,
154 }
155 }
156
157 pub fn with_languages_and_beans(
158 registry: Arc<std::sync::Mutex<Registry>>,
159 languages: SharedLanguageRegistry,
160 platform_service: Arc<dyn PlatformService>,
161 beans: Arc<std::sync::Mutex<BeanRegistry>>,
162 ) -> Self {
163 Self {
164 routes: RouteRegistry::new(),
165 registry,
166 languages,
167 beans,
168 runtime: None,
169 global_error_handler: None,
170 crash_notifier: None,
171 tracing_enabled: false,
172 tracer_detail_level: DetailLevel::Minimal,
173 tracer_metrics: None,
174 platform_service,
175 function_invoker: None,
176 health_registry: None,
177 }
178 }
179
180 pub fn with_function_invoker(mut self, function_invoker: Arc<dyn FunctionInvoker>) -> Self {
181 self.function_invoker = Some(function_invoker);
182 self
183 }
184
185 pub fn set_health_registry(&mut self, registry: Arc<HealthCheckRegistry>) {
186 self.health_registry = Some(registry);
187 }
188
189 pub fn set_function_invoker(&mut self, invoker: Arc<dyn FunctionInvoker>) {
190 self.function_invoker = Some(invoker);
191 }
192
193 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
195 self.runtime = Some(Arc::downgrade(&runtime));
196 }
197
198 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
203 self.crash_notifier = Some(tx);
204 }
205
206 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
208 self.global_error_handler = Some(config);
209 }
210
211 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
213 self.tracing_enabled = config.enabled;
214 self.tracer_detail_level = config.detail_level.clone();
215 self.tracer_metrics = config.metrics_collector.clone();
216 }
217
218 fn build_producer_context(&self, route_id: &str) -> Result<ProducerContext, CamelError> {
219 let mut producer_ctx = ProducerContext::new().with_route_id(route_id);
220 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
221 producer_ctx = producer_ctx.with_runtime(runtime);
222 }
223 Ok(producer_ctx)
224 }
225
226 fn route_compiler_ext(&self) -> RouteCompilerExt<'_> {
228 RouteCompilerExt {
229 registry: &self.registry,
230 languages: &self.languages,
231 beans: &self.beans,
232 function_invoker: &self.function_invoker,
233 tracing_enabled: self.tracing_enabled,
234 tracer_detail_level: &self.tracer_detail_level,
235 tracer_metrics: &self.tracer_metrics,
236 platform_service: &self.platform_service,
237 runtime: &self.runtime,
238 global_error_handler: &self.global_error_handler,
239 health_registry: &self.health_registry,
240 route_registry: &self.routes,
241 }
242 }
243
244 pub(crate) fn resolve_steps(
246 &self,
247 steps: Vec<BuilderStep>,
248 producer_ctx: &ProducerContext,
249 registry: &Arc<std::sync::Mutex<Registry>>,
250 route_id: Option<&str>,
251 staging_mode: &super::step_resolution::FunctionStagingMode,
252 ) -> Result<Vec<CompiledStep>, CamelError> {
253 let component_ctx = Arc::new(ControllerComponentContext::new(
254 Arc::clone(registry),
255 Arc::clone(&self.languages),
256 self.tracer_metrics
257 .clone()
258 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
259 Arc::clone(&self.platform_service),
260 self.health_registry(),
261 route_id.map(|s| s.to_string()),
262 ));
263 let rt: Arc<dyn camel_component_api::RuntimeObservability> =
264 Arc::clone(&component_ctx) as Arc<_>;
265
266 super::step_resolution::resolve_steps(
267 steps,
268 producer_ctx,
269 rt,
270 registry,
271 &self.languages,
272 &self.beans,
273 self.function_invoker.clone(),
274 component_ctx,
275 route_id,
276 staging_mode,
277 )
278 }
279
280 pub async fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
290 let route_id = definition.route_id().to_string();
291
292 if self.routes.contains_key(&route_id) {
293 return Err(CamelError::RouteError(format!(
294 "Route '{}' already exists",
295 route_id
296 )));
297 }
298
299 info!(route_id = %route_id, "Adding route to controller");
300
301 let prepared = match self.build_managed_route(
302 definition,
303 &super::step_resolution::FunctionStagingMode::DirectAdd,
304 ) {
305 Ok(prepared) => prepared,
306 Err(err) => {
307 self.discard_function_staging();
308 return Err(err);
309 }
310 };
311
312 if let Some(invoker) = &self.function_invoker
313 && let Err(err) = invoker.commit_staged().await
314 {
315 invoker.discard_staging(0);
316 return Err(CamelError::Config(err.to_string()));
317 }
318
319 self.routes
320 .insert(prepared.route_id.clone(), prepared.managed);
321
322 Ok(())
323 }
324
325 fn build_managed_route(
326 &self,
327 definition: RouteDefinition,
328 staging_mode: &super::step_resolution::FunctionStagingMode,
329 ) -> Result<PreparedRoute, CamelError> {
330 let route_id = definition.route_id().to_string();
331
332 let definition_info = definition.to_info();
333 let RouteDefinition {
334 from_uri,
335 steps,
336 error_handler,
337 circuit_breaker,
338 security_policy,
339 security_authenticator,
340 unit_of_work,
341 concurrency,
342 ..
343 } = definition;
344
345 let producer_ctx = self.build_producer_context(&route_id)?;
346
347 let mut aggregate_split: Option<AggregateSplitInfo> = None;
348 let processors_with_contracts = match find_top_level_aggregate_requiring_split(&steps) {
349 Some((idx, agg_config)) => {
350 let mut pre_steps = steps;
351 let mut rest = pre_steps.split_off(idx);
352 let _agg_step = rest.remove(0);
353 let post_steps = rest;
354
355 let pre_pairs = self.resolve_steps(
356 pre_steps,
357 &producer_ctx,
358 &self.registry,
359 Some(&route_id),
360 staging_mode,
361 )?;
362 let pre_pipeline =
363 super::pipeline_runtime::new_shared_pipeline(compose_pipeline(pre_pairs));
364
365 let post_pairs = self.resolve_steps(
366 post_steps,
367 &producer_ctx,
368 &self.registry,
369 Some(&route_id),
370 staging_mode,
371 )?;
372 let post_pipeline =
373 super::pipeline_runtime::new_shared_pipeline(compose_pipeline(post_pairs));
374
375 aggregate_split = Some(AggregateSplitInfo {
376 pre_pipeline,
377 agg_config,
378 post_pipeline,
379 });
380
381 vec![]
382 }
383 None => self.resolve_steps(
384 steps,
385 &producer_ctx,
386 &self.registry,
387 Some(&route_id),
388 staging_mode,
389 )?,
390 };
391 let route_id_for_tracing = route_id.clone();
392 let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
393
394 let mut pipeline = build_eh_config_pipeline(
395 eh_config.as_ref(),
396 Arc::clone(&self.registry),
397 Arc::clone(&self.languages),
398 self.tracer_metrics.clone(),
399 Arc::clone(&self.platform_service),
400 self.health_registry(),
401 &route_id_for_tracing,
402 &producer_ctx,
403 processors_with_contracts,
404 self.tracing_enabled,
405 self.tracer_detail_level.clone(),
406 security_policy.clone(),
407 circuit_breaker,
408 )?;
409
410 let uow_counter = if let Some(uow_config) = &unit_of_work {
411 let component_ctx = Arc::new(ControllerComponentContext::new(
412 Arc::clone(&self.registry),
413 Arc::clone(&self.languages),
414 self.tracer_metrics
415 .clone()
416 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
417 Arc::clone(&self.platform_service),
418 self.health_registry(),
419 Some(route_id.clone()),
420 ));
421 let rt: Arc<dyn camel_component_api::RuntimeObservability> =
422 Arc::clone(&component_ctx) as Arc<_>;
423 let (uow_layer, counter) = super::route_compiler_ext::resolve_uow_layer(
424 uow_config,
425 &producer_ctx,
426 rt,
427 component_ctx.as_ref(),
428 None,
429 )?;
430 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
431 Some(counter)
432 } else {
433 None
434 };
435
436 Ok(PreparedRoute {
437 route_id,
438 managed: ManagedRoute {
439 definition: definition_info,
440 from_uri,
441 pipeline: super::pipeline_runtime::new_shared_pipeline(pipeline),
442 concurrency,
443 consumer_handle: None,
444 pipeline_handle: None,
445 consumer_cancel_token: CancellationToken::new(),
446 pipeline_cancel_token: CancellationToken::new(),
447 channel_sender: None,
448 in_flight: uow_counter,
449 aggregate_split,
450 agg_service: None,
451 compiled: route_runtime_state::CompiledRoute {
452 security_policy,
453 security_authenticator,
454 },
455 },
456 })
457 }
458
459 pub(crate) fn insert_prepared_route(
460 &mut self,
461 prepared: PreparedRoute,
462 ) -> Result<(), CamelError> {
463 if self.routes.contains_key(&prepared.route_id) {
464 return Err(CamelError::RouteError(format!(
465 "Route '{}' already exists",
466 prepared.route_id
467 )));
468 }
469 self.routes
470 .insert(prepared.route_id.clone(), prepared.managed);
471 Ok(())
472 }
473
474 pub async fn add_route_with_generation(
475 &mut self,
476 definition: RouteDefinition,
477 generation: u64,
478 ) -> Result<(), CamelError> {
479 let route_id = definition.route_id().to_string();
480
481 if self.routes.contains_key(&route_id) {
482 return Err(CamelError::RouteError(format!(
483 "Route '{}' already exists",
484 route_id
485 )));
486 }
487
488 info!(route_id = %route_id, generation, "Adding route to controller with generation");
489
490 let prepared = self.build_managed_route(
491 definition,
492 &super::step_resolution::FunctionStagingMode::HotReload { generation },
493 )?;
494
495 self.routes
496 .insert(prepared.route_id.clone(), prepared.managed);
497
498 Ok(())
499 }
500
501 pub(crate) fn prepare_route_definition_with_generation(
502 &self,
503 definition: RouteDefinition,
504 generation: u64,
505 ) -> Result<PreparedRoute, CamelError> {
506 self.build_managed_route(
507 definition,
508 &super::step_resolution::FunctionStagingMode::HotReload { generation },
509 )
510 }
511
512 pub async fn remove_route_preserving_functions(
513 &mut self,
514 route_id: &str,
515 ) -> Result<(), CamelError> {
516 let managed = self.routes.get(route_id).ok_or_else(|| {
517 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
518 })?;
519 if handle_is_running(&managed.consumer_handle)
520 || handle_is_running(&managed.pipeline_handle)
521 {
522 return Err(CamelError::RouteError(format!(
523 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
524 route_id,
525 inferred_lifecycle_label(managed)
526 )));
527 }
528 self.routes.remove(route_id);
529 if let Some(reg) = &self.health_registry {
530 reg.unregister_for_route(route_id);
531 }
532 info!(route_id = %route_id, "Route removed from controller (functions preserved for reload finalize)");
533 Ok(())
534 }
535
536 pub fn compile_route_definition(
539 &self,
540 def: RouteDefinition,
541 ) -> Result<BoxProcessor, CamelError> {
542 self.route_compiler_ext().compile_route_definition(def)
543 }
544
545 pub fn compile_route_definition_with_generation(
547 &self,
548 def: RouteDefinition,
549 generation: u64,
550 ) -> Result<BoxProcessor, CamelError> {
551 self.route_compiler_ext()
552 .compile_route_definition_with_generation(def, generation)
553 }
554
555 pub async fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
561 let managed = self.routes.get(route_id).ok_or_else(|| {
562 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
563 })?;
564 if handle_is_running(&managed.consumer_handle)
565 || handle_is_running(&managed.pipeline_handle)
566 {
567 return Err(CamelError::RouteError(format!(
568 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
569 route_id,
570 inferred_lifecycle_label(managed)
571 )));
572 }
573 if let Some(invoker) = &self.function_invoker {
574 for (id, rid) in self.collect_function_refs(route_id) {
575 if let Err(e) = invoker.unregister(&id, rid.as_deref()).await {
576 warn!(route_id = %route_id, error = %e, "Failed to unregister function during route removal");
577 }
578 }
579 }
580 self.routes.remove(route_id);
581 if let Some(reg) = &self.health_registry {
582 reg.unregister_for_route(route_id);
583 }
584 info!(route_id = %route_id, "Route removed from controller");
585 Ok(())
586 }
587
588 fn collect_function_refs(
589 &self,
590 route_id: &str,
591 ) -> Vec<(camel_api::FunctionId, Option<String>)> {
592 self.function_invoker
593 .as_ref()
594 .map(|invoker| invoker.function_refs_for_route(route_id))
595 .unwrap_or_default()
596 }
597
598 fn discard_function_staging(&self) {
599 if let Some(invoker) = &self.function_invoker {
600 invoker.discard_staging(0);
601 }
602 }
603
604 pub fn route_count(&self) -> usize {
606 self.routes.route_count()
607 }
608
609 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
610 self.routes.in_flight_count(route_id)
611 }
612
613 pub fn route_exists(&self, route_id: &str) -> bool {
615 self.routes.route_exists(route_id)
616 }
617
618 pub fn route_ids(&self) -> Vec<String> {
620 self.routes.route_ids()
621 }
622
623 pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
624 self.routes.route_source_hash(route_id)
625 }
626
627 pub fn auto_startup_route_ids(&self) -> Vec<String> {
629 self.routes.auto_startup_route_ids()
630 }
631
632 pub fn shutdown_route_ids(&self) -> Vec<String> {
634 self.routes.shutdown_route_ids()
635 }
636
637 pub fn swap_pipeline(
642 &self,
643 route_id: &str,
644 new_pipeline: BoxProcessor,
645 ) -> Result<(), CamelError> {
646 let managed = self
647 .routes
648 .get(route_id)
649 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
650
651 if managed.aggregate_split.is_some() {
652 tracing::warn!(
653 route_id = %route_id,
654 "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
655 );
656 }
657
658 super::pipeline_runtime::swap_pipeline(&managed.pipeline, new_pipeline);
659 info!(route_id = %route_id, "Pipeline swapped atomically");
660 Ok(())
661 }
662
663 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
665 self.routes.route_from_uri(route_id)
666 }
667
668 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
673 self.routes.get_pipeline(route_id)
674 }
675
676 pub(super) async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
678 self.routes.stop_route(route_id).await
679 }
680
681 pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
682 self.start_route(route_id).await
683 }
684
685 pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
686 self.stop_route(route_id).await
687 }
688}
689
690impl DefaultRouteController {
693 #[allow(clippy::too_many_arguments)]
699 pub(super) async fn start_aggregate_route(
700 &mut self,
701 route_id: &str,
702 split: AggregateSplitInfo,
703 consumer: Box<dyn Consumer>,
704 consumer_ctx: ConsumerContext,
705 mut rx: mpsc::Receiver<ExchangeEnvelope>,
706 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
707 runtime_for_consumer: Option<Weak<dyn RuntimeHandle>>,
708 tx_for_storage: mpsc::Sender<ExchangeEnvelope>,
709 pipeline_cancel: CancellationToken,
711 ) -> Result<(), CamelError> {
712 let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
713
714 let route_cancel_clone = pipeline_cancel.clone();
715 let svc = AggregatorService::new(
716 split.agg_config.clone(),
717 late_tx,
718 Arc::clone(&self.languages),
719 route_cancel_clone,
720 );
721 let agg = Arc::new(std::sync::Mutex::new(svc));
722
723 let pipeline_cancel_for_monitor = pipeline_cancel.clone();
724 let agg_for_monitor = Arc::clone(&agg);
725
726 {
727 let managed = self
728 .routes
729 .get_mut(route_id)
730 .expect("invariant: route must exist"); managed.agg_service = Some(Arc::clone(&agg));
732 }
733
734 let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
735 let pre_pipeline = split.pre_pipeline;
736 let post_pipeline = split.post_pipeline;
737
738 let pipeline_handle = tokio::spawn(async move {
740 loop {
741 tokio::select! {
742 biased;
743
744 late_ex = async {
745 let mut rx = late_rx.lock().await;
746 rx.recv().await
747 } => {
748 match late_ex {
749 Some(ex) => {
750 let pipe = post_pipeline.load();
751 if let Err(e) = pipe.clone_inner().oneshot(ex).await {
752 tracing::warn!(error = %e, "late exchange post-pipeline failed");
753 }
754 }
755 None => return,
756 }
757 }
758
759 envelope_opt = rx.recv() => {
760 match envelope_opt {
761 Some(envelope) => {
762 let ExchangeEnvelope { exchange, reply_tx } = envelope;
763 let pre_pipe = pre_pipeline.load();
764 let ex = match pre_pipe.clone_inner().oneshot(exchange).await {
765 Ok(ex) => ex,
766 Err(e) => {
767 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
768 continue;
769 }
770 };
771
772 let ex = {
773 let cloned_svc = agg
774 .lock()
775 .expect("mutex poisoned: another thread panicked while holding this lock") .clone();
777 cloned_svc.oneshot(ex).await
778 };
779
780 match ex {
781 Ok(ex) => {
782 if !is_pending(&ex) {
783 let post_pipe = post_pipeline.load();
784 let out = post_pipe.clone_inner().oneshot(ex).await;
785 if let Some(tx) = reply_tx { let _ = tx.send(out); }
786 } else if let Some(tx) = reply_tx {
787 let _ = tx.send(Ok(ex));
788 }
789 }
790 Err(e) => {
791 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
792 }
793 }
794 }
795 None => return,
796 }
797 }
798
799 _ = pipeline_cancel.cancelled() => {
800 {
801 let guard = agg
802 .lock()
803 .expect("mutex poisoned: another thread panicked while holding this lock"); guard.force_complete_all();
805 }
806 let mut rx_guard = late_rx.lock().await;
807 while let Ok(late_ex) = rx_guard.try_recv() {
808 let pipe = post_pipeline.load();
809 let _ = pipe.clone_inner().oneshot(late_ex).await;
810 }
811 break;
812 }
813 }
814 }
815 });
816 #[cfg(test)]
817 emit_start_route_event("pipeline_spawned");
818
819 let consumer_handle = super::consumer_management::spawn_consumer_task(
822 route_id.to_string(),
823 consumer,
824 consumer_ctx,
825 crash_notifier,
826 runtime_for_consumer,
827 false,
828 );
829
830 let force_on_stop = agg_for_monitor
834 .lock()
835 .expect("mutex poisoned: another thread panicked while holding this lock") .config()
837 .force_completion_on_stop;
838 let consumer_handle = tokio::spawn(async move {
839 let _ = consumer_handle.await;
840 if !pipeline_cancel_for_monitor.is_cancelled() {
841 let guard = agg_for_monitor
842 .lock()
843 .expect("mutex poisoned: another thread panicked while holding this lock"); guard.force_complete_all();
845 drop(guard);
846 if force_on_stop {
847 pipeline_cancel_for_monitor.cancel();
848 }
849 }
850 });
851 #[cfg(test)]
852 emit_start_route_event("consumer_spawned");
853
854 {
855 let managed = self
856 .routes
857 .get_mut(route_id)
858 .expect("invariant: route must exist"); managed.consumer_handle = Some(consumer_handle);
860 managed.pipeline_handle = Some(pipeline_handle);
861 managed.channel_sender = Some(tx_for_storage);
862 }
863
864 info!(route_id = %route_id, "Route started (aggregate with timeout)");
865 Ok(())
866 }
867}
868
869#[cfg(test)]
870#[path = "route_controller_tests.rs"]
871mod tests;