1use std::collections::HashMap;
7use std::sync::{Arc, Weak};
8use std::time::Duration;
9
10use tokio::sync::mpsc;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, ServiceExt};
13use tracing::{info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::metrics::MetricsCollector;
17#[allow(unused_imports)]
18use camel_api::{
19 BoxProcessor, CamelError, Exchange, FunctionInvoker, IdentityProcessor, NoOpMetrics,
20 NoopPlatformService, PlatformService, ProducerContext, RouteController, RuntimeHandle,
21};
22use camel_component_api::{Consumer, ConsumerContext, consumer::ExchangeEnvelope};
23use camel_processor::aggregator::AggregatorService;
24pub use camel_processor::aggregator::SharedLanguageRegistry;
25
26use crate::health_registry::HealthCheckRegistry;
27use crate::lifecycle::adapters::controller_component_context::ControllerComponentContext;
28use crate::lifecycle::adapters::route_compiler::compose_pipeline;
29use crate::lifecycle::adapters::route_compiler_ext::{RouteCompilerExt, build_eh_config_pipeline};
30pub(crate) use crate::lifecycle::adapters::route_helpers::PreparedRoute;
31use crate::lifecycle::adapters::route_helpers::{
32 AggregateSplitInfo, CrashNotification, ManagedRoute, find_top_level_aggregate_requiring_split,
33 handle_is_running, inferred_lifecycle_label, is_pending,
34};
35#[cfg(test)]
36pub(super) use crate::lifecycle::adapters::route_helpers::{
37 emit_start_route_event, set_start_route_event_hook,
38};
39use crate::lifecycle::adapters::route_registry::RouteRegistry;
40use crate::lifecycle::adapters::route_runtime_state;
41use crate::lifecycle::application::route_definition::{BuilderStep, RouteDefinition};
42use crate::shared::components::domain::Registry;
43use crate::shared::observability::domain::{DetailLevel, TracerConfig};
44use camel_bean::BeanRegistry;
45
46pub struct DefaultRouteController {
54 pub(super) routes: RouteRegistry,
56 pub(super) registry: Arc<std::sync::Mutex<Registry>>,
58 pub(super) languages: SharedLanguageRegistry,
60 pub(super) beans: Arc<std::sync::Mutex<BeanRegistry>>,
62 pub(super) runtime: Option<Weak<dyn RuntimeHandle>>,
64 pub(super) global_error_handler: Option<ErrorHandlerConfig>,
66 pub(super) crash_notifier: Option<mpsc::Sender<CrashNotification>>,
68 pub(super) tracing_enabled: bool,
70 pub(super) tracer_detail_level: DetailLevel,
72 pub(super) tracer_metrics: Option<Arc<dyn MetricsCollector>>,
74 pub(super) platform_service: Arc<dyn PlatformService>,
75 pub(super) function_invoker: Option<Arc<dyn FunctionInvoker>>,
76 pub(super) health_registry: Option<Arc<HealthCheckRegistry>>,
77}
78
79impl DefaultRouteController {
80 pub(super) fn health_registry(&self) -> Arc<HealthCheckRegistry> {
81 self.health_registry.clone().unwrap_or_else(|| {
82 warn!("health_registry not configured — creating isolated fallback");
83 Arc::new(HealthCheckRegistry::new(Duration::from_secs(5)))
84 })
85 }
86
87 pub fn new(
89 registry: Arc<std::sync::Mutex<Registry>>,
90 platform_service: Arc<dyn PlatformService>,
91 ) -> Self {
92 Self::with_beans_and_platform_service(
93 registry,
94 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
95 platform_service,
96 )
97 }
98
99 pub fn with_beans(
101 registry: Arc<std::sync::Mutex<Registry>>,
102 beans: Arc<std::sync::Mutex<BeanRegistry>>,
103 ) -> Self {
104 Self::with_beans_and_platform_service(
105 registry,
106 beans,
107 Arc::new(NoopPlatformService::default()),
108 )
109 }
110
111 fn with_beans_and_platform_service(
112 registry: Arc<std::sync::Mutex<Registry>>,
113 beans: Arc<std::sync::Mutex<BeanRegistry>>,
114 platform_service: Arc<dyn PlatformService>,
115 ) -> Self {
116 Self {
117 routes: RouteRegistry::new(),
118 registry,
119 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
120 beans,
121 runtime: None,
122 global_error_handler: None,
123 crash_notifier: None,
124 tracing_enabled: false,
125 tracer_detail_level: DetailLevel::Minimal,
126 tracer_metrics: None,
127 platform_service,
128 function_invoker: None,
129 health_registry: None,
130 }
131 }
132
133 pub fn with_languages(
135 registry: Arc<std::sync::Mutex<Registry>>,
136 languages: SharedLanguageRegistry,
137 platform_service: Arc<dyn PlatformService>,
138 ) -> Self {
139 Self {
140 routes: RouteRegistry::new(),
141 registry,
142 languages,
143 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
144 runtime: None,
145 global_error_handler: None,
146 crash_notifier: None,
147 tracing_enabled: false,
148 tracer_detail_level: DetailLevel::Minimal,
149 tracer_metrics: None,
150 platform_service,
151 function_invoker: None,
152 health_registry: None,
153 }
154 }
155
156 pub fn with_languages_and_beans(
157 registry: Arc<std::sync::Mutex<Registry>>,
158 languages: SharedLanguageRegistry,
159 platform_service: Arc<dyn PlatformService>,
160 beans: Arc<std::sync::Mutex<BeanRegistry>>,
161 ) -> Self {
162 Self {
163 routes: RouteRegistry::new(),
164 registry,
165 languages,
166 beans,
167 runtime: None,
168 global_error_handler: None,
169 crash_notifier: None,
170 tracing_enabled: false,
171 tracer_detail_level: DetailLevel::Minimal,
172 tracer_metrics: None,
173 platform_service,
174 function_invoker: None,
175 health_registry: None,
176 }
177 }
178
179 pub fn with_function_invoker(mut self, function_invoker: Arc<dyn FunctionInvoker>) -> Self {
180 self.function_invoker = Some(function_invoker);
181 self
182 }
183
184 pub fn set_health_registry(&mut self, registry: Arc<HealthCheckRegistry>) {
185 self.health_registry = Some(registry);
186 }
187
188 pub fn set_function_invoker(&mut self, invoker: Arc<dyn FunctionInvoker>) {
189 self.function_invoker = Some(invoker);
190 }
191
192 pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
194 self.runtime = Some(Arc::downgrade(&runtime));
195 }
196
197 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
202 self.crash_notifier = Some(tx);
203 }
204
205 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
207 self.global_error_handler = Some(config);
208 }
209
210 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
212 self.tracing_enabled = config.enabled;
213 self.tracer_detail_level = config.detail_level.clone();
214 self.tracer_metrics = config.metrics_collector.clone();
215 }
216
217 fn build_producer_context(&self, route_id: &str) -> Result<ProducerContext, CamelError> {
218 let mut producer_ctx = ProducerContext::new().with_route_id(route_id);
219 if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
220 producer_ctx = producer_ctx.with_runtime(runtime);
221 }
222 Ok(producer_ctx)
223 }
224
225 fn route_compiler_ext(&self) -> RouteCompilerExt<'_> {
227 RouteCompilerExt {
228 registry: &self.registry,
229 languages: &self.languages,
230 beans: &self.beans,
231 function_invoker: &self.function_invoker,
232 tracing_enabled: self.tracing_enabled,
233 tracer_detail_level: &self.tracer_detail_level,
234 tracer_metrics: &self.tracer_metrics,
235 platform_service: &self.platform_service,
236 runtime: &self.runtime,
237 global_error_handler: &self.global_error_handler,
238 health_registry: &self.health_registry,
239 route_registry: &self.routes,
240 }
241 }
242
243 pub(crate) fn resolve_steps(
245 &self,
246 steps: Vec<BuilderStep>,
247 producer_ctx: &ProducerContext,
248 registry: &Arc<std::sync::Mutex<Registry>>,
249 route_id: Option<&str>,
250 staging_mode: &super::step_resolution::FunctionStagingMode,
251 ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
252 let component_ctx = Arc::new(ControllerComponentContext::new(
253 Arc::clone(registry),
254 Arc::clone(&self.languages),
255 self.tracer_metrics
256 .clone()
257 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
258 Arc::clone(&self.platform_service),
259 self.health_registry(),
260 route_id.map(|s| s.to_string()),
261 ));
262 let rt: Arc<dyn camel_component_api::RuntimeObservability> =
263 Arc::clone(&component_ctx) as Arc<_>;
264
265 super::step_resolution::resolve_steps(
266 steps,
267 producer_ctx,
268 rt,
269 registry,
270 &self.languages,
271 &self.beans,
272 self.function_invoker.clone(),
273 component_ctx,
274 route_id,
275 staging_mode,
276 )
277 }
278
279 pub async fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
289 let route_id = definition.route_id().to_string();
290
291 if self.routes.contains_key(&route_id) {
292 return Err(CamelError::RouteError(format!(
293 "Route '{}' already exists",
294 route_id
295 )));
296 }
297
298 info!(route_id = %route_id, "Adding route to controller");
299
300 let prepared = match self.build_managed_route(
301 definition,
302 &super::step_resolution::FunctionStagingMode::DirectAdd,
303 ) {
304 Ok(prepared) => prepared,
305 Err(err) => {
306 self.discard_function_staging();
307 return Err(err);
308 }
309 };
310
311 if let Some(invoker) = &self.function_invoker
312 && let Err(err) = invoker.commit_staged().await
313 {
314 invoker.discard_staging(0);
315 return Err(CamelError::Config(err.to_string()));
316 }
317
318 self.routes
319 .insert(prepared.route_id.clone(), prepared.managed);
320
321 Ok(())
322 }
323
324 fn build_managed_route(
325 &self,
326 definition: RouteDefinition,
327 staging_mode: &super::step_resolution::FunctionStagingMode,
328 ) -> Result<PreparedRoute, CamelError> {
329 let route_id = definition.route_id().to_string();
330
331 let definition_info = definition.to_info();
332 let RouteDefinition {
333 from_uri,
334 steps,
335 error_handler,
336 circuit_breaker,
337 security_policy,
338 security_authenticator,
339 unit_of_work,
340 concurrency,
341 ..
342 } = definition;
343
344 let producer_ctx = self.build_producer_context(&route_id)?;
345
346 let mut aggregate_split: Option<AggregateSplitInfo> = None;
347 let processors_with_contracts = match find_top_level_aggregate_requiring_split(&steps) {
348 Some((idx, agg_config)) => {
349 let mut pre_steps = steps;
350 let mut rest = pre_steps.split_off(idx);
351 let _agg_step = rest.remove(0);
352 let post_steps = rest;
353
354 let pre_pairs = self.resolve_steps(
355 pre_steps,
356 &producer_ctx,
357 &self.registry,
358 Some(&route_id),
359 staging_mode,
360 )?;
361 let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
362 let pre_pipeline =
363 super::pipeline_runtime::new_shared_pipeline(compose_pipeline(pre_procs));
364
365 let post_pairs = self.resolve_steps(
366 post_steps,
367 &producer_ctx,
368 &self.registry,
369 Some(&route_id),
370 staging_mode,
371 )?;
372 let post_procs: Vec<BoxProcessor> =
373 post_pairs.into_iter().map(|(p, _)| p).collect();
374 let post_pipeline =
375 super::pipeline_runtime::new_shared_pipeline(compose_pipeline(post_procs));
376
377 aggregate_split = Some(AggregateSplitInfo {
378 pre_pipeline,
379 agg_config,
380 post_pipeline,
381 });
382
383 vec![]
384 }
385 None => self.resolve_steps(
386 steps,
387 &producer_ctx,
388 &self.registry,
389 Some(&route_id),
390 staging_mode,
391 )?,
392 };
393 let route_id_for_tracing = route_id.clone();
394 let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
395
396 let mut pipeline = build_eh_config_pipeline(
397 eh_config.as_ref(),
398 Arc::clone(&self.registry),
399 Arc::clone(&self.languages),
400 self.tracer_metrics.clone(),
401 Arc::clone(&self.platform_service),
402 self.health_registry(),
403 &route_id_for_tracing,
404 &producer_ctx,
405 processors_with_contracts,
406 self.tracing_enabled,
407 self.tracer_detail_level.clone(),
408 security_policy.clone(),
409 circuit_breaker,
410 )?;
411
412 let uow_counter = if let Some(uow_config) = &unit_of_work {
413 let component_ctx = Arc::new(ControllerComponentContext::new(
414 Arc::clone(&self.registry),
415 Arc::clone(&self.languages),
416 self.tracer_metrics
417 .clone()
418 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
419 Arc::clone(&self.platform_service),
420 self.health_registry(),
421 Some(route_id.clone()),
422 ));
423 let rt: Arc<dyn camel_component_api::RuntimeObservability> =
424 Arc::clone(&component_ctx) as Arc<_>;
425 let (uow_layer, counter) = super::route_compiler_ext::resolve_uow_layer(
426 uow_config,
427 &producer_ctx,
428 rt,
429 component_ctx.as_ref(),
430 None,
431 )?;
432 pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
433 Some(counter)
434 } else {
435 None
436 };
437
438 Ok(PreparedRoute {
439 route_id,
440 managed: ManagedRoute {
441 definition: definition_info,
442 from_uri,
443 pipeline: super::pipeline_runtime::new_shared_pipeline(pipeline),
444 concurrency,
445 consumer_handle: None,
446 pipeline_handle: None,
447 consumer_cancel_token: CancellationToken::new(),
448 pipeline_cancel_token: CancellationToken::new(),
449 channel_sender: None,
450 in_flight: uow_counter,
451 aggregate_split,
452 agg_service: None,
453 compiled: route_runtime_state::CompiledRoute {
454 security_policy,
455 security_authenticator,
456 },
457 },
458 })
459 }
460
461 pub(crate) fn insert_prepared_route(
462 &mut self,
463 prepared: PreparedRoute,
464 ) -> Result<(), CamelError> {
465 if self.routes.contains_key(&prepared.route_id) {
466 return Err(CamelError::RouteError(format!(
467 "Route '{}' already exists",
468 prepared.route_id
469 )));
470 }
471 self.routes
472 .insert(prepared.route_id.clone(), prepared.managed);
473 Ok(())
474 }
475
476 pub async fn add_route_with_generation(
477 &mut self,
478 definition: RouteDefinition,
479 generation: u64,
480 ) -> Result<(), CamelError> {
481 let route_id = definition.route_id().to_string();
482
483 if self.routes.contains_key(&route_id) {
484 return Err(CamelError::RouteError(format!(
485 "Route '{}' already exists",
486 route_id
487 )));
488 }
489
490 info!(route_id = %route_id, generation, "Adding route to controller with generation");
491
492 let prepared = self.build_managed_route(
493 definition,
494 &super::step_resolution::FunctionStagingMode::HotReload { generation },
495 )?;
496
497 self.routes
498 .insert(prepared.route_id.clone(), prepared.managed);
499
500 Ok(())
501 }
502
503 pub(crate) fn prepare_route_definition_with_generation(
504 &self,
505 definition: RouteDefinition,
506 generation: u64,
507 ) -> Result<PreparedRoute, CamelError> {
508 self.build_managed_route(
509 definition,
510 &super::step_resolution::FunctionStagingMode::HotReload { generation },
511 )
512 }
513
514 pub async fn remove_route_preserving_functions(
515 &mut self,
516 route_id: &str,
517 ) -> Result<(), CamelError> {
518 let managed = self.routes.get(route_id).ok_or_else(|| {
519 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
520 })?;
521 if handle_is_running(&managed.consumer_handle)
522 || handle_is_running(&managed.pipeline_handle)
523 {
524 return Err(CamelError::RouteError(format!(
525 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
526 route_id,
527 inferred_lifecycle_label(managed)
528 )));
529 }
530 self.routes.remove(route_id);
531 if let Some(reg) = &self.health_registry {
532 reg.unregister_for_route(route_id);
533 }
534 info!(route_id = %route_id, "Route removed from controller (functions preserved for reload finalize)");
535 Ok(())
536 }
537
538 pub fn compile_route_definition(
541 &self,
542 def: RouteDefinition,
543 ) -> Result<BoxProcessor, CamelError> {
544 self.route_compiler_ext().compile_route_definition(def)
545 }
546
547 pub fn compile_route_definition_with_generation(
549 &self,
550 def: RouteDefinition,
551 generation: u64,
552 ) -> Result<BoxProcessor, CamelError> {
553 self.route_compiler_ext()
554 .compile_route_definition_with_generation(def, generation)
555 }
556
557 pub async fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
563 let managed = self.routes.get(route_id).ok_or_else(|| {
564 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
565 })?;
566 if handle_is_running(&managed.consumer_handle)
567 || handle_is_running(&managed.pipeline_handle)
568 {
569 return Err(CamelError::RouteError(format!(
570 "Route '{}' must be stopped before removal (current execution lifecycle: {})",
571 route_id,
572 inferred_lifecycle_label(managed)
573 )));
574 }
575 if let Some(invoker) = &self.function_invoker {
576 for (id, rid) in self.collect_function_refs(route_id) {
577 if let Err(e) = invoker.unregister(&id, rid.as_deref()).await {
578 warn!(route_id = %route_id, error = %e, "Failed to unregister function during route removal");
579 }
580 }
581 }
582 self.routes.remove(route_id);
583 if let Some(reg) = &self.health_registry {
584 reg.unregister_for_route(route_id);
585 }
586 info!(route_id = %route_id, "Route removed from controller");
587 Ok(())
588 }
589
590 fn collect_function_refs(
591 &self,
592 route_id: &str,
593 ) -> Vec<(camel_api::FunctionId, Option<String>)> {
594 self.function_invoker
595 .as_ref()
596 .map(|invoker| invoker.function_refs_for_route(route_id))
597 .unwrap_or_default()
598 }
599
600 fn discard_function_staging(&self) {
601 if let Some(invoker) = &self.function_invoker {
602 invoker.discard_staging(0);
603 }
604 }
605
606 pub fn route_count(&self) -> usize {
608 self.routes.route_count()
609 }
610
611 pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
612 self.routes.in_flight_count(route_id)
613 }
614
615 pub fn route_exists(&self, route_id: &str) -> bool {
617 self.routes.route_exists(route_id)
618 }
619
620 pub fn route_ids(&self) -> Vec<String> {
622 self.routes.route_ids()
623 }
624
625 pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
626 self.routes.route_source_hash(route_id)
627 }
628
629 pub fn auto_startup_route_ids(&self) -> Vec<String> {
631 self.routes.auto_startup_route_ids()
632 }
633
634 pub fn shutdown_route_ids(&self) -> Vec<String> {
636 self.routes.shutdown_route_ids()
637 }
638
639 pub fn swap_pipeline(
644 &self,
645 route_id: &str,
646 new_pipeline: BoxProcessor,
647 ) -> Result<(), CamelError> {
648 let managed = self
649 .routes
650 .get(route_id)
651 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
652
653 if managed.aggregate_split.is_some() {
654 tracing::warn!(
655 route_id = %route_id,
656 "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
657 );
658 }
659
660 super::pipeline_runtime::swap_pipeline(&managed.pipeline, new_pipeline);
661 info!(route_id = %route_id, "Pipeline swapped atomically");
662 Ok(())
663 }
664
665 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
667 self.routes.route_from_uri(route_id)
668 }
669
670 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
675 self.routes.get_pipeline(route_id)
676 }
677
678 pub(super) async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
680 self.routes.stop_route(route_id).await
681 }
682
683 pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
684 self.start_route(route_id).await
685 }
686
687 pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
688 self.stop_route(route_id).await
689 }
690}
691
692impl DefaultRouteController {
695 #[allow(clippy::too_many_arguments)]
701 pub(super) async fn start_aggregate_route(
702 &mut self,
703 route_id: &str,
704 split: AggregateSplitInfo,
705 consumer: Box<dyn Consumer>,
706 consumer_ctx: ConsumerContext,
707 mut rx: mpsc::Receiver<ExchangeEnvelope>,
708 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
709 runtime_for_consumer: Option<Weak<dyn RuntimeHandle>>,
710 tx_for_storage: mpsc::Sender<ExchangeEnvelope>,
711 pipeline_cancel: CancellationToken,
713 ) -> Result<(), CamelError> {
714 let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
715
716 let route_cancel_clone = pipeline_cancel.clone();
717 let svc = AggregatorService::new(
718 split.agg_config.clone(),
719 late_tx,
720 Arc::clone(&self.languages),
721 route_cancel_clone,
722 );
723 let agg = Arc::new(std::sync::Mutex::new(svc));
724
725 let pipeline_cancel_for_monitor = pipeline_cancel.clone();
726 let agg_for_monitor = Arc::clone(&agg);
727
728 {
729 let managed = self
730 .routes
731 .get_mut(route_id)
732 .expect("invariant: route must exist"); managed.agg_service = Some(Arc::clone(&agg));
734 }
735
736 let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
737 let pre_pipeline = split.pre_pipeline;
738 let post_pipeline = split.post_pipeline;
739
740 let pipeline_handle = tokio::spawn(async move {
742 loop {
743 tokio::select! {
744 biased;
745
746 late_ex = async {
747 let mut rx = late_rx.lock().await;
748 rx.recv().await
749 } => {
750 match late_ex {
751 Some(ex) => {
752 let pipe = post_pipeline.load();
753 if let Err(e) = pipe.clone_inner().oneshot(ex).await {
754 tracing::warn!(error = %e, "late exchange post-pipeline failed");
755 }
756 }
757 None => return,
758 }
759 }
760
761 envelope_opt = rx.recv() => {
762 match envelope_opt {
763 Some(envelope) => {
764 let ExchangeEnvelope { exchange, reply_tx } = envelope;
765 let pre_pipe = pre_pipeline.load();
766 let ex = match pre_pipe.clone_inner().oneshot(exchange).await {
767 Ok(ex) => ex,
768 Err(e) => {
769 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
770 continue;
771 }
772 };
773
774 let ex = {
775 let cloned_svc = agg
776 .lock()
777 .expect("mutex poisoned: another thread panicked while holding this lock") .clone();
779 cloned_svc.oneshot(ex).await
780 };
781
782 match ex {
783 Ok(ex) => {
784 if !is_pending(&ex) {
785 let post_pipe = post_pipeline.load();
786 let out = post_pipe.clone_inner().oneshot(ex).await;
787 if let Some(tx) = reply_tx { let _ = tx.send(out); }
788 } else if let Some(tx) = reply_tx {
789 let _ = tx.send(Ok(ex));
790 }
791 }
792 Err(e) => {
793 if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
794 }
795 }
796 }
797 None => return,
798 }
799 }
800
801 _ = pipeline_cancel.cancelled() => {
802 {
803 let guard = agg
804 .lock()
805 .expect("mutex poisoned: another thread panicked while holding this lock"); guard.force_complete_all();
807 }
808 let mut rx_guard = late_rx.lock().await;
809 while let Ok(late_ex) = rx_guard.try_recv() {
810 let pipe = post_pipeline.load();
811 let _ = pipe.clone_inner().oneshot(late_ex).await;
812 }
813 break;
814 }
815 }
816 }
817 });
818 #[cfg(test)]
819 emit_start_route_event("pipeline_spawned");
820
821 let consumer_handle = super::consumer_management::spawn_consumer_task(
824 route_id.to_string(),
825 consumer,
826 consumer_ctx,
827 crash_notifier,
828 runtime_for_consumer,
829 false,
830 );
831
832 let force_on_stop = agg_for_monitor
836 .lock()
837 .expect("mutex poisoned: another thread panicked while holding this lock") .config()
839 .force_completion_on_stop;
840 let consumer_handle = tokio::spawn(async move {
841 let _ = consumer_handle.await;
842 if !pipeline_cancel_for_monitor.is_cancelled() {
843 let guard = agg_for_monitor
844 .lock()
845 .expect("mutex poisoned: another thread panicked while holding this lock"); guard.force_complete_all();
847 drop(guard);
848 if force_on_stop {
849 pipeline_cancel_for_monitor.cancel();
850 }
851 }
852 });
853 #[cfg(test)]
854 emit_start_route_event("consumer_spawned");
855
856 {
857 let managed = self
858 .routes
859 .get_mut(route_id)
860 .expect("invariant: route must exist"); managed.consumer_handle = Some(consumer_handle);
862 managed.pipeline_handle = Some(pipeline_handle);
863 managed.channel_sender = Some(tx_for_storage);
864 }
865
866 info!(route_id = %route_id, "Route started (aggregate with timeout)");
867 Ok(())
868 }
869}
870
871#[cfg(test)]
872#[path = "route_controller_tests.rs"]
873mod tests;