Skip to main content

camel_core/lifecycle/adapters/
route_controller_trait.rs

1//! `RouteController` trait implementation for `DefaultRouteController`.
2//!
3//! Extracted from `route_controller.rs` to reduce file size. All lifecycle methods
4//! (start, stop, suspend, resume, etc.) live here.
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11use tower::Service;
12use tracing::{error, info, warn};
13
14use camel_api::{CamelError, NoOpMetrics};
15use camel_component_api::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
16
17use crate::lifecycle::adapters::consumer_management;
18use crate::lifecycle::adapters::controller_component_context::ControllerComponentContext;
19use crate::lifecycle::adapters::route_controller::DefaultRouteController;
20#[cfg(test)]
21use crate::lifecycle::adapters::route_helpers::emit_start_route_event;
22use crate::lifecycle::adapters::route_helpers::{
23    handle_is_running, inferred_lifecycle_label, ready_with_backoff,
24};
25use crate::lifecycle::adapters::route_registry::DEFAULT_SHUTDOWN_TIMEOUT;
26
27#[async_trait::async_trait]
28impl camel_api::RouteController for DefaultRouteController {
29    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
30        // Check if route exists and can be started.
31        {
32            let managed = self
33                .routes
34                .get_mut(route_id)
35                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
36
37            let consumer_running = handle_is_running(&managed.consumer_handle);
38            let pipeline_running = handle_is_running(&managed.pipeline_handle);
39            if consumer_running && pipeline_running {
40                return Ok(());
41            }
42            if !consumer_running && pipeline_running {
43                return Err(CamelError::RouteError(format!(
44                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
45                    route_id
46                )));
47            }
48            if consumer_running && !pipeline_running {
49                return Err(CamelError::RouteError(format!(
50                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
51                    route_id
52                )));
53            }
54        }
55
56        info!(route_id = %route_id, "Starting route");
57
58        // Get the resolved route info
59        let (from_uri, pipeline, concurrency) = {
60            let managed = self
61                .routes
62                .get(route_id)
63                .expect("invariant: route must exist after prior existence check"); // allow-unwrap
64            (
65                managed.from_uri.clone(),
66                Arc::clone(&managed.pipeline),
67                managed.concurrency.clone(),
68            )
69        };
70
71        // Clone crash notifier for consumer task
72        let crash_notifier = self.crash_notifier.clone();
73        let runtime_for_consumer = self.runtime.clone();
74
75        let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
76            Arc::clone(&self.registry),
77            Arc::clone(&self.languages),
78            self.tracer_metrics
79                .clone()
80                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
81            Arc::clone(&self.platform_service),
82            self.health_registry(),
83            Some(route_id.to_string()),
84        ));
85        let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
86            Arc::clone(&consumer_component_ctx) as Arc<_>;
87        let (mut consumer, consumer_concurrency) = consumer_management::create_route_consumer(
88            consumer_rt,
89            &self.registry,
90            &from_uri,
91            consumer_component_ctx.as_ref(),
92        )?;
93
94        // Resolve effective concurrency: route override > consumer default
95        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
96
97        // Get the managed route for mutation
98        let managed = self
99            .routes
100            .get_mut(route_id)
101            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
102
103        // Wire security context before spawning consumer
104        if let (Some(sp_config), Some(authenticator)) = (
105            managed.compiled.security_policy.as_ref(),
106            managed.compiled.security_authenticator.as_ref(),
107        ) {
108            use camel_component_api::SecurityContext;
109            let sec_ctx =
110                SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
111            consumer.set_security_context(sec_ctx);
112        }
113
114        // Create channel for consumer to send exchanges
115        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
116        // Create child tokens for independent lifecycle control
117        let consumer_cancel = managed.consumer_cancel_token.child_token();
118        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
119        // Clone sender for storage (to reuse on resume)
120        let tx_for_storage = tx.clone();
121        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone(), route_id.to_string());
122
123        // --- Aggregator v2: check for aggregate route with timeout ---
124        let split_clone = managed.aggregate_split.clone();
125        if let Some(split) = split_clone {
126            return self
127                .start_aggregate_route(
128                    route_id,
129                    split,
130                    consumer,
131                    consumer_ctx,
132                    rx,
133                    crash_notifier,
134                    runtime_for_consumer,
135                    tx_for_storage,
136                    pipeline_cancel,
137                )
138                .await;
139        }
140        // --- End aggregator v2 branch ---
141
142        // Spawn pipeline task with its own cancellation token
143        let pipeline_handle = match effective_concurrency {
144            ConcurrencyModel::Sequential => {
145                tokio::spawn(async move {
146                    loop {
147                        // Use select! to exit promptly on cancellation even when idle
148                        let envelope = tokio::select! {
149                            envelope = rx.recv() => match envelope {
150                                Some(e) => e,
151                                None => return, // Channel closed
152                            },
153                            _ = pipeline_cancel.cancelled() => {
154                                // Cancellation requested - exit gracefully
155                                return;
156                            }
157                        };
158                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
159
160                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
161                        let mut pipeline = pipeline.load().clone_inner();
162
163                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
164                            if let Some(tx) = reply_tx {
165                                let _ = tx.send(Err(e));
166                            }
167                            return;
168                        }
169
170                        let result = pipeline.call(exchange).await;
171                        if let Some(tx) = reply_tx {
172                            let _ = tx.send(result);
173                        } else if let Err(ref e) = result
174                            && !matches!(e, CamelError::Stopped)
175                        {
176                            // log-policy: system-broken
177                            error!("Pipeline error: {e}");
178                        }
179                    }
180                })
181            }
182            ConcurrencyModel::Concurrent { max } => {
183                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
184                tokio::spawn(async move {
185                    loop {
186                        // Use select! to exit promptly on cancellation even when idle
187                        let envelope = tokio::select! {
188                            envelope = rx.recv() => match envelope {
189                                Some(e) => e,
190                                None => return, // Channel closed
191                            },
192                            _ = pipeline_cancel.cancelled() => {
193                                // Cancellation requested - exit gracefully
194                                return;
195                            }
196                        };
197                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
198                        let pipe_ref = Arc::clone(&pipeline);
199                        let sem = sem.clone();
200                        let cancel = pipeline_cancel.clone();
201                        tokio::spawn(async move {
202                            // Acquire semaphore permit if bounded
203                            let _permit = match &sem {
204                                Some(s) => Some(s.acquire().await.expect("semaphore closed")), // allow-unwrap
205                                None => None,
206                            };
207
208                            // Load current pipeline from ArcSwap
209                            let mut pipe = pipe_ref.load().clone_inner();
210
211                            // Wait for service ready with circuit breaker backoff
212                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
213                                if let Some(tx) = reply_tx {
214                                    let _ = tx.send(Err(e));
215                                }
216                                return;
217                            }
218
219                            let result = pipe.call(exchange).await;
220                            if let Some(tx) = reply_tx {
221                                let _ = tx.send(result);
222                            } else if let Err(ref e) = result
223                                && !matches!(e, CamelError::Stopped)
224                            {
225                                // log-policy: system-broken
226                                error!("Pipeline error: {e}");
227                            }
228                        });
229                    }
230                })
231            }
232        };
233        #[cfg(test)]
234        emit_start_route_event("pipeline_spawned");
235
236        // Start consumer after pipeline task is spawned to minimize the chance of
237        // fire-and-forget events being produced before the pipeline loop is active.
238        let consumer_handle = consumer_management::spawn_consumer_task(
239            route_id.to_string(),
240            consumer,
241            consumer_ctx,
242            crash_notifier,
243            runtime_for_consumer,
244            false,
245        );
246        #[cfg(test)]
247        emit_start_route_event("consumer_spawned");
248
249        // Store handles and update status
250        let managed = self
251            .routes
252            .get_mut(route_id)
253            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
254        managed.consumer_handle = Some(consumer_handle);
255        managed.pipeline_handle = Some(pipeline_handle);
256        managed.channel_sender = Some(tx_for_storage);
257
258        info!(route_id = %route_id, "Route started");
259        self.health_registry().mark_route_started(route_id);
260        Ok(())
261    }
262
263    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
264        self.stop_route_internal(route_id).await?;
265        self.health_registry().mark_route_stopped(route_id);
266        Ok(())
267    }
268
269    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
270        self.stop_route(route_id).await?;
271        tokio::time::sleep(Duration::from_millis(100)).await;
272        self.start_route(route_id).await
273    }
274
275    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
276        // Check route exists and state.
277        let managed = self
278            .routes
279            .get_mut(route_id)
280            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
281
282        let consumer_running = handle_is_running(&managed.consumer_handle);
283        let pipeline_running = handle_is_running(&managed.pipeline_handle);
284
285        // Can only suspend from active started state.
286        if !consumer_running || !pipeline_running {
287            return Err(CamelError::RouteError(format!(
288                "Cannot suspend route '{}' with execution lifecycle {}",
289                route_id,
290                inferred_lifecycle_label(managed)
291            )));
292        }
293
294        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
295
296        // Cancel consumer token only (keep pipeline running)
297        let managed = self
298            .routes
299            .get_mut(route_id)
300            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
301        managed.consumer_cancel_token.cancel();
302
303        // Take and join consumer handle
304        let managed = self
305            .routes
306            .get_mut(route_id)
307            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
308        let consumer_handle = managed.consumer_handle.take();
309
310        // Wait for consumer task to complete with timeout
311        let timeout_result = tokio::time::timeout(DEFAULT_SHUTDOWN_TIMEOUT, async {
312            if let Some(handle) = consumer_handle {
313                let _ = handle.await;
314            }
315        })
316        .await;
317
318        if timeout_result.is_err() {
319            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
320        }
321
322        // Get the managed route again (can't hold across await)
323        let managed = self
324            .routes
325            .get_mut(route_id)
326            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
327
328        // Create fresh cancellation token for consumer (for resume)
329        managed.consumer_cancel_token = CancellationToken::new();
330
331        info!(route_id = %route_id, "Route suspended (pipeline still running)");
332        self.health_registry().mark_route_stopped(route_id);
333        Ok(())
334    }
335
336    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
337        // Check route exists and is Suspended-equivalent execution state.
338        let managed = self
339            .routes
340            .get(route_id)
341            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
342
343        let consumer_running = handle_is_running(&managed.consumer_handle);
344        let pipeline_running = handle_is_running(&managed.pipeline_handle);
345        if consumer_running || !pipeline_running {
346            return Err(CamelError::RouteError(format!(
347                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
348                route_id,
349                inferred_lifecycle_label(managed)
350            )));
351        }
352
353        // Get the stored channel sender (must exist for a suspended route)
354        let sender = managed.channel_sender.clone().ok_or_else(|| {
355            CamelError::RouteError("Suspended route has no channel sender".into())
356        })?;
357
358        // Get from_uri and concurrency for creating new consumer
359        let from_uri = managed.from_uri.clone();
360
361        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
362
363        let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
364            Arc::clone(&self.registry),
365            Arc::clone(&self.languages),
366            self.tracer_metrics
367                .clone()
368                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
369            Arc::clone(&self.platform_service),
370            self.health_registry(),
371            Some(route_id.to_string()),
372        ));
373        let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
374            Arc::clone(&consumer_component_ctx) as Arc<_>;
375        let (mut consumer, _) = consumer_management::create_route_consumer(
376            consumer_rt,
377            &self.registry,
378            &from_uri,
379            consumer_component_ctx.as_ref(),
380        )?;
381
382        // Wire security context before spawning consumer
383        let managed = self
384            .routes
385            .get(route_id)
386            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
387        if let (Some(sp_config), Some(authenticator)) = (
388            managed.compiled.security_policy.as_ref(),
389            managed.compiled.security_authenticator.as_ref(),
390        ) {
391            use camel_component_api::SecurityContext;
392            let sec_ctx =
393                SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
394            consumer.set_security_context(sec_ctx);
395        }
396
397        // Get the managed route for mutation
398        let managed = self
399            .routes
400            .get_mut(route_id)
401            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
402
403        // Create child token for consumer lifecycle
404        let consumer_cancel = managed.consumer_cancel_token.child_token();
405
406        let crash_notifier = self.crash_notifier.clone();
407        let runtime_for_consumer = self.runtime.clone();
408
409        // Create ConsumerContext with the stored sender
410        let consumer_ctx =
411            ConsumerContext::new(sender, consumer_cancel.clone(), route_id.to_string());
412
413        // Spawn consumer task
414        let consumer_handle = consumer_management::spawn_consumer_task(
415            route_id.to_string(),
416            consumer,
417            consumer_ctx,
418            crash_notifier,
419            runtime_for_consumer,
420            true,
421        );
422
423        // Store consumer handle and update status
424        let managed = self
425            .routes
426            .get_mut(route_id)
427            .expect("invariant: route must exist after prior existence check"); // allow-unwrap
428        managed.consumer_handle = Some(consumer_handle);
429
430        info!(route_id = %route_id, "Route resumed");
431        self.health_registry().mark_route_started(route_id);
432        Ok(())
433    }
434
435    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
436        // Only start routes where auto_startup() == true
437        // Sort by startup_order() ascending before starting
438        let route_ids: Vec<String> = {
439            let pairs = self.routes.auto_startup_sorted();
440            pairs.into_iter().map(|(id, _)| id).collect()
441        };
442
443        info!("Starting {} auto-startup routes", route_ids.len());
444
445        // Collect errors but continue starting remaining routes
446        let mut errors: Vec<String> = Vec::new();
447        for route_id in route_ids {
448            if let Err(e) = self.start_route(&route_id).await {
449                errors.push(format!("Route '{}': {}", route_id, e));
450            }
451        }
452
453        if !errors.is_empty() {
454            return Err(CamelError::RouteError(format!(
455                "Failed to start routes: {}",
456                errors.join(", ")
457            )));
458        }
459
460        info!("All auto-startup routes started");
461        Ok(())
462    }
463
464    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
465        // Sort by startup_order descending (reverse order)
466        let route_ids: Vec<String> = {
467            let pairs = self.routes.shutdown_sorted();
468            pairs.into_iter().map(|(id, _)| id).collect()
469        };
470
471        info!("Stopping {} routes", route_ids.len());
472
473        for route_id in route_ids {
474            let _ = self.stop_route(&route_id).await;
475        }
476
477        info!("All routes stopped");
478        Ok(())
479    }
480}