Skip to main content

camel_core/lifecycle/adapters/
route_controller_trait.rs

1//! `RouteController` trait implementation for `DefaultRouteController`.
2//!
3//! Extracted from `route_controller.rs` to reduce file size. All lifecycle methods
4//! (start, stop, suspend, resume, etc.) live here.
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11use tower::Service;
12use tracing::{error, info, warn};
13
14use camel_api::{CamelError, NoOpMetrics};
15use camel_component_api::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
16
17use crate::lifecycle::adapters::consumer_management;
18use crate::lifecycle::adapters::controller_component_context::ControllerComponentContext;
19use crate::lifecycle::adapters::route_controller::DefaultRouteController;
20#[cfg(test)]
21use crate::lifecycle::adapters::route_helpers::emit_start_route_event;
22use crate::lifecycle::adapters::route_helpers::{
23    handle_is_running, inferred_lifecycle_label, ready_with_backoff,
24};
25use crate::lifecycle::adapters::route_registry::DEFAULT_SHUTDOWN_TIMEOUT;
26
27#[async_trait::async_trait]
28impl camel_api::RouteController for DefaultRouteController {
29    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
30        // Check if route exists and can be started.
31        {
32            let managed = self
33                .routes
34                .get_mut(route_id)
35                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
36
37            let consumer_running = handle_is_running(&managed.consumer_handle);
38            let pipeline_running = handle_is_running(&managed.pipeline_handle);
39            if consumer_running && pipeline_running {
40                return Ok(());
41            }
42            if !consumer_running && pipeline_running {
43                return Err(CamelError::RouteError(format!(
44                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
45                    route_id
46                )));
47            }
48            if consumer_running && !pipeline_running {
49                return Err(CamelError::RouteError(format!(
50                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
51                    route_id
52                )));
53            }
54        }
55
56        info!(route_id = %route_id, "Starting route");
57
58        // Get the resolved route info
59        let (from_uri, pipeline, concurrency) = {
60            let managed = self
61                .routes
62                .get(route_id)
63                .expect("invariant: route must exist after prior existence check"); // allow-unwrap
64            (
65                managed.from_uri.clone(),
66                Arc::clone(&managed.pipeline),
67                managed.concurrency.clone(),
68            )
69        };
70
71        // Clone crash notifier for consumer task
72        let crash_notifier = self.crash_notifier.clone();
73        let runtime_for_consumer = self.runtime.clone();
74
75        let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
76            Arc::clone(&self.registry),
77            Arc::clone(&self.languages),
78            self.tracer_metrics
79                .clone()
80                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
81            Arc::clone(&self.platform_service),
82            self.health_registry(),
83            Some(route_id.to_string()),
84        ));
85        let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
86            Arc::clone(&consumer_component_ctx) as Arc<_>;
87        let (mut consumer, consumer_concurrency) = consumer_management::create_route_consumer(
88            consumer_rt,
89            &self.registry,
90            &from_uri,
91            consumer_component_ctx.as_ref(),
92        )?;
93
94        // Resolve effective concurrency: route override > consumer default
95        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
96
97        // Get the managed route for mutation
98        let managed = self
99            .routes
100            .get_mut(route_id)
101            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
102
103        // Wire security context before spawning consumer
104        if let (Some(sp_config), Some(authenticator)) = (
105            managed.compiled.security_policy.as_ref(),
106            managed.compiled.security_authenticator.as_ref(),
107        ) {
108            use camel_component_api::SecurityContext;
109            let sec_ctx =
110                SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
111            consumer.set_security_context(sec_ctx);
112        }
113
114        // Create channel for consumer to send exchanges
115        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
116        // Create child tokens for independent lifecycle control
117        let consumer_cancel = managed.consumer_cancel_token.child_token();
118        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
119        // Clone sender for storage (to reuse on resume)
120        let tx_for_storage = tx.clone();
121        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone(), route_id.to_string());
122
123        // --- Aggregator v2: check for aggregate route with timeout ---
124        let split_clone = managed.aggregate_split.clone();
125        if let Some(split) = split_clone {
126            return self
127                .start_aggregate_route(
128                    route_id,
129                    split,
130                    consumer,
131                    consumer_ctx,
132                    rx,
133                    crash_notifier,
134                    runtime_for_consumer,
135                    tx_for_storage,
136                    pipeline_cancel,
137                )
138                .await;
139        }
140        // --- End aggregator v2 branch ---
141
142        // Spawn pipeline task with its own cancellation token
143        let pipeline_handle = match effective_concurrency {
144            ConcurrencyModel::Sequential => {
145                tokio::spawn(async move {
146                    loop {
147                        // Use select! to exit promptly on cancellation even when idle
148                        let envelope = tokio::select! {
149                            envelope = rx.recv() => match envelope {
150                                Some(e) => e,
151                                None => return, // Channel closed
152                            },
153                            _ = pipeline_cancel.cancelled() => {
154                                // Cancellation requested - exit gracefully
155                                return;
156                            }
157                        };
158                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
159
160                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
161                        let mut pipeline = pipeline.load().clone_inner();
162
163                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
164                            if let Some(tx) = reply_tx {
165                                let _ = tx.send(Err(e));
166                            }
167                            return;
168                        }
169
170                        let result = pipeline.call(exchange).await;
171                        if let Some(tx) = reply_tx {
172                            let _ = tx.send(result);
173                        } else if let Err(ref e) = result {
174                            // log-policy: system-broken
175                            error!("Pipeline error: {e}");
176                        }
177                    }
178                })
179            }
180            ConcurrencyModel::Concurrent { max } => {
181                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
182                tokio::spawn(async move {
183                    loop {
184                        // Use select! to exit promptly on cancellation even when idle
185                        let envelope = tokio::select! {
186                            envelope = rx.recv() => match envelope {
187                                Some(e) => e,
188                                None => return, // Channel closed
189                            },
190                            _ = pipeline_cancel.cancelled() => {
191                                // Cancellation requested - exit gracefully
192                                return;
193                            }
194                        };
195                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
196                        let pipe_ref = Arc::clone(&pipeline);
197                        let sem = sem.clone();
198                        let cancel = pipeline_cancel.clone();
199                        tokio::spawn(async move {
200                            // Acquire semaphore permit if bounded
201                            let _permit = match &sem {
202                                Some(s) => Some(s.acquire().await.expect("semaphore closed")), // allow-unwrap
203                                None => None,
204                            };
205
206                            // Load current pipeline from ArcSwap
207                            let mut pipe = pipe_ref.load().clone_inner();
208
209                            // Wait for service ready with circuit breaker backoff
210                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
211                                if let Some(tx) = reply_tx {
212                                    let _ = tx.send(Err(e));
213                                }
214                                return;
215                            }
216
217                            let result = pipe.call(exchange).await;
218                            if let Some(tx) = reply_tx {
219                                let _ = tx.send(result);
220                            } else if let Err(ref e) = result {
221                                // log-policy: system-broken
222                                error!("Pipeline error: {e}");
223                            }
224                        });
225                    }
226                })
227            }
228        };
229        #[cfg(test)]
230        emit_start_route_event("pipeline_spawned");
231
232        // Start consumer after pipeline task is spawned to minimize the chance of
233        // fire-and-forget events being produced before the pipeline loop is active.
234        let consumer_handle = consumer_management::spawn_consumer_task(
235            route_id.to_string(),
236            consumer,
237            consumer_ctx,
238            crash_notifier,
239            runtime_for_consumer,
240            false,
241        );
242        #[cfg(test)]
243        emit_start_route_event("consumer_spawned");
244
245        // Store handles and update status
246        let managed = self
247            .routes
248            .get_mut(route_id)
249            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
250        managed.consumer_handle = Some(consumer_handle);
251        managed.pipeline_handle = Some(pipeline_handle);
252        managed.channel_sender = Some(tx_for_storage);
253
254        info!(route_id = %route_id, "Route started");
255        self.health_registry().mark_route_started(route_id);
256        Ok(())
257    }
258
259    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
260        self.stop_route_internal(route_id).await?;
261        self.health_registry().mark_route_stopped(route_id);
262        Ok(())
263    }
264
265    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
266        self.stop_route(route_id).await?;
267        tokio::time::sleep(Duration::from_millis(100)).await;
268        self.start_route(route_id).await
269    }
270
271    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
272        // Check route exists and state.
273        let managed = self
274            .routes
275            .get_mut(route_id)
276            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
277
278        let consumer_running = handle_is_running(&managed.consumer_handle);
279        let pipeline_running = handle_is_running(&managed.pipeline_handle);
280
281        // Can only suspend from active started state.
282        if !consumer_running || !pipeline_running {
283            return Err(CamelError::RouteError(format!(
284                "Cannot suspend route '{}' with execution lifecycle {}",
285                route_id,
286                inferred_lifecycle_label(managed)
287            )));
288        }
289
290        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
291
292        // Cancel consumer token only (keep pipeline running)
293        let managed = self
294            .routes
295            .get_mut(route_id)
296            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
297        managed.consumer_cancel_token.cancel();
298
299        // Take and join consumer handle
300        let managed = self
301            .routes
302            .get_mut(route_id)
303            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
304        let consumer_handle = managed.consumer_handle.take();
305
306        // Wait for consumer task to complete with timeout
307        let timeout_result = tokio::time::timeout(DEFAULT_SHUTDOWN_TIMEOUT, async {
308            if let Some(handle) = consumer_handle {
309                let _ = handle.await;
310            }
311        })
312        .await;
313
314        if timeout_result.is_err() {
315            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
316        }
317
318        // Get the managed route again (can't hold across await)
319        let managed = self
320            .routes
321            .get_mut(route_id)
322            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
323
324        // Create fresh cancellation token for consumer (for resume)
325        managed.consumer_cancel_token = CancellationToken::new();
326
327        info!(route_id = %route_id, "Route suspended (pipeline still running)");
328        self.health_registry().mark_route_stopped(route_id);
329        Ok(())
330    }
331
332    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
333        // Check route exists and is Suspended-equivalent execution state.
334        let managed = self
335            .routes
336            .get(route_id)
337            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
338
339        let consumer_running = handle_is_running(&managed.consumer_handle);
340        let pipeline_running = handle_is_running(&managed.pipeline_handle);
341        if consumer_running || !pipeline_running {
342            return Err(CamelError::RouteError(format!(
343                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
344                route_id,
345                inferred_lifecycle_label(managed)
346            )));
347        }
348
349        // Get the stored channel sender (must exist for a suspended route)
350        let sender = managed.channel_sender.clone().ok_or_else(|| {
351            CamelError::RouteError("Suspended route has no channel sender".into())
352        })?;
353
354        // Get from_uri and concurrency for creating new consumer
355        let from_uri = managed.from_uri.clone();
356
357        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
358
359        let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
360            Arc::clone(&self.registry),
361            Arc::clone(&self.languages),
362            self.tracer_metrics
363                .clone()
364                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
365            Arc::clone(&self.platform_service),
366            self.health_registry(),
367            Some(route_id.to_string()),
368        ));
369        let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
370            Arc::clone(&consumer_component_ctx) as Arc<_>;
371        let (mut consumer, _) = consumer_management::create_route_consumer(
372            consumer_rt,
373            &self.registry,
374            &from_uri,
375            consumer_component_ctx.as_ref(),
376        )?;
377
378        // Wire security context before spawning consumer
379        let managed = self
380            .routes
381            .get(route_id)
382            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
383        if let (Some(sp_config), Some(authenticator)) = (
384            managed.compiled.security_policy.as_ref(),
385            managed.compiled.security_authenticator.as_ref(),
386        ) {
387            use camel_component_api::SecurityContext;
388            let sec_ctx =
389                SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
390            consumer.set_security_context(sec_ctx);
391        }
392
393        // Get the managed route for mutation
394        let managed = self
395            .routes
396            .get_mut(route_id)
397            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
398
399        // Create child token for consumer lifecycle
400        let consumer_cancel = managed.consumer_cancel_token.child_token();
401
402        let crash_notifier = self.crash_notifier.clone();
403        let runtime_for_consumer = self.runtime.clone();
404
405        // Create ConsumerContext with the stored sender
406        let consumer_ctx =
407            ConsumerContext::new(sender, consumer_cancel.clone(), route_id.to_string());
408
409        // Spawn consumer task
410        let consumer_handle = consumer_management::spawn_consumer_task(
411            route_id.to_string(),
412            consumer,
413            consumer_ctx,
414            crash_notifier,
415            runtime_for_consumer,
416            true,
417        );
418
419        // Store consumer handle and update status
420        let managed = self
421            .routes
422            .get_mut(route_id)
423            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
424        managed.consumer_handle = Some(consumer_handle);
425
426        info!(route_id = %route_id, "Route resumed");
427        self.health_registry().mark_route_started(route_id);
428        Ok(())
429    }
430
431    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
432        // Only start routes where auto_startup() == true
433        // Sort by startup_order() ascending before starting
434        let route_ids: Vec<String> = {
435            let pairs = self.routes.auto_startup_sorted();
436            pairs.into_iter().map(|(id, _)| id).collect()
437        };
438
439        info!("Starting {} auto-startup routes", route_ids.len());
440
441        // Collect errors but continue starting remaining routes
442        let mut errors: Vec<String> = Vec::new();
443        for route_id in route_ids {
444            if let Err(e) = self.start_route(&route_id).await {
445                errors.push(format!("Route '{}': {}", route_id, e));
446            }
447        }
448
449        if !errors.is_empty() {
450            return Err(CamelError::RouteError(format!(
451                "Failed to start routes: {}",
452                errors.join(", ")
453            )));
454        }
455
456        info!("All auto-startup routes started");
457        Ok(())
458    }
459
460    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
461        // Sort by startup_order descending (reverse order)
462        let route_ids: Vec<String> = {
463            let pairs = self.routes.shutdown_sorted();
464            pairs.into_iter().map(|(id, _)| id).collect()
465        };
466
467        info!("Stopping {} routes", route_ids.len());
468
469        for route_id in route_ids {
470            let _ = self.stop_route(&route_id).await;
471        }
472
473        info!("All routes stopped");
474        Ok(())
475    }
476}