Skip to main content

camel_core/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::{Mutex, mpsc};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, Service, ServiceExt};
13use tracing::{error, info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::{BoxProcessor, CamelError, ProducerContext, RouteController, RouteStatus};
17use camel_component::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
18use camel_endpoint::parse_uri;
19use camel_processor::circuit_breaker::CircuitBreakerLayer;
20use camel_processor::error_handler::ErrorHandlerLayer;
21use camel_processor::{ChoiceService, WhenClause};
22
23use crate::registry::Registry;
24use crate::route::{BuilderStep, RouteDefinition, RouteDefinitionInfo, compose_pipeline};
25use arc_swap::ArcSwap;
26
27/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
28///
29/// # Safety
30///
31/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
32/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
33///
34/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
35///    (creates a new boxed service from the inner clone).
36/// 2. The clone is owned by the calling thread and never shared.
37/// 3. ArcSwap guarantees we only get & references (no &mut).
38///
39/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
40/// clone() does not mutate shared state and each thread gets an independent copy.
41pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
42unsafe impl Sync for SyncBoxProcessor {}
43
44type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
45
46/// Internal state for a managed route.
47struct ManagedRoute {
48    /// The route definition metadata (for introspection).
49    definition: RouteDefinitionInfo,
50    /// Source endpoint URI.
51    from_uri: String,
52    /// Resolved processor pipeline (wrapped for atomic swap).
53    pipeline: SharedPipeline,
54    /// Concurrency model override (if any).
55    concurrency: Option<ConcurrencyModel>,
56    /// Current lifecycle status.
57    status: RouteStatus,
58    /// Handle for the consumer task (if running).
59    consumer_handle: Option<JoinHandle<()>>,
60    /// Handle for the pipeline task (if running).
61    pipeline_handle: Option<JoinHandle<()>>,
62    /// Cancellation token for stopping this route.
63    cancel_token: CancellationToken,
64}
65
66/// Wait for a pipeline service to be ready with circuit breaker backoff.
67///
68/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
69/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
70/// cancellation checks. It returns `Ok(())` when the service is ready, or
71/// `Err(e)` if cancellation occurred or a fatal error was encountered.
72async fn ready_with_backoff(
73    pipeline: &mut BoxProcessor,
74    cancel: &CancellationToken,
75) -> Result<(), CamelError> {
76    loop {
77        match pipeline.ready().await {
78            Ok(_) => return Ok(()),
79            Err(CamelError::CircuitOpen(ref msg)) => {
80                warn!("Circuit open, backing off: {msg}");
81                tokio::select! {
82                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
83                        continue;
84                    }
85                    _ = cancel.cancelled() => {
86                        // Shutting down — don't retry.
87                        return Err(CamelError::CircuitOpen(msg.clone()));
88                    }
89                }
90            }
91            Err(e) => {
92                error!("Pipeline not ready: {e}");
93                return Err(e);
94            }
95        }
96    }
97}
98
99/// Default implementation of [`RouteController`].
100///
101/// Manages route lifecycle with support for:
102/// - Starting/stopping individual routes
103/// - Suspending and resuming routes
104/// - Auto-startup with startup ordering
105/// - Graceful shutdown
106pub struct DefaultRouteController {
107    /// Routes indexed by route ID.
108    routes: HashMap<String, ManagedRoute>,
109    /// Reference to the component registry for resolving endpoints.
110    registry: Arc<std::sync::Mutex<Registry>>,
111    /// Self-reference for creating ProducerContext.
112    /// Set after construction via `set_self_ref()`.
113    self_ref: Option<Arc<Mutex<dyn RouteController>>>,
114    /// Optional global error handler applied to all routes without a per-route handler.
115    global_error_handler: Option<ErrorHandlerConfig>,
116}
117
118impl DefaultRouteController {
119    /// Create a new `DefaultRouteController` with the given registry.
120    pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
121        Self {
122            routes: HashMap::new(),
123            registry,
124            self_ref: None,
125            global_error_handler: None,
126        }
127    }
128
129    /// Set the self-reference for creating ProducerContext.
130    ///
131    /// This must be called after wrapping the controller in `Arc<Mutex<>>`.
132    pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
133        self.self_ref = Some(self_ref);
134    }
135
136    /// Set a global error handler applied to all routes without a per-route handler.
137    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
138        self.global_error_handler = Some(config);
139    }
140
141    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
142    fn resolve_error_handler(
143        &self,
144        config: ErrorHandlerConfig,
145        producer_ctx: &ProducerContext,
146        registry: &Registry,
147    ) -> Result<ErrorHandlerLayer, CamelError> {
148        // Resolve DLC URI → producer.
149        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
150            let parsed = parse_uri(uri)?;
151            let component = registry.get_or_err(&parsed.scheme)?;
152            let endpoint = component.create_endpoint(uri)?;
153            Some(endpoint.create_producer(producer_ctx)?)
154        } else {
155            None
156        };
157
158        // Resolve per-policy `handled_by` URIs.
159        let mut resolved_policies = Vec::new();
160        for policy in config.policies {
161            let handler_producer = if let Some(ref uri) = policy.handled_by {
162                let parsed = parse_uri(uri)?;
163                let component = registry.get_or_err(&parsed.scheme)?;
164                let endpoint = component.create_endpoint(uri)?;
165                Some(endpoint.create_producer(producer_ctx)?)
166            } else {
167                None
168            };
169            resolved_policies.push((policy, handler_producer));
170        }
171
172        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
173    }
174
175    /// Resolve BuilderSteps into BoxProcessors.
176    fn resolve_steps(
177        &self,
178        steps: Vec<BuilderStep>,
179        producer_ctx: &ProducerContext,
180        registry: &Registry,
181    ) -> Result<Vec<BoxProcessor>, CamelError> {
182        let mut processors: Vec<BoxProcessor> = Vec::new();
183        for step in steps {
184            match step {
185                BuilderStep::Processor(svc) => {
186                    processors.push(svc);
187                }
188                BuilderStep::To(uri) => {
189                    let parsed = parse_uri(&uri)?;
190                    let component = registry.get_or_err(&parsed.scheme)?;
191                    let endpoint = component.create_endpoint(&uri)?;
192                    let producer = endpoint.create_producer(producer_ctx)?;
193                    processors.push(producer);
194                }
195                BuilderStep::Split { config, steps } => {
196                    let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
197                    let sub_pipeline = compose_pipeline(sub_processors);
198                    let splitter =
199                        camel_processor::splitter::SplitterService::new(config, sub_pipeline);
200                    processors.push(BoxProcessor::new(splitter));
201                }
202                BuilderStep::Aggregate { config } => {
203                    let svc = camel_processor::AggregatorService::new(config);
204                    processors.push(BoxProcessor::new(svc));
205                }
206                BuilderStep::Filter { predicate, steps } => {
207                    let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
208                    let sub_pipeline = compose_pipeline(sub_processors);
209                    let svc =
210                        camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
211                    processors.push(BoxProcessor::new(svc));
212                }
213                BuilderStep::Choice { whens, otherwise } => {
214                    // Resolve each when clause's sub-steps into a pipeline.
215                    let mut when_clauses = Vec::new();
216                    for when_step in whens {
217                        let sub_processors =
218                            self.resolve_steps(when_step.steps, producer_ctx, registry)?;
219                        let pipeline = compose_pipeline(sub_processors);
220                        when_clauses.push(WhenClause {
221                            predicate: when_step.predicate,
222                            pipeline,
223                        });
224                    }
225                    // Resolve otherwise branch (if present).
226                    let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
227                        let sub_processors =
228                            self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
229                        Some(compose_pipeline(sub_processors))
230                    } else {
231                        None
232                    };
233                    let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
234                    processors.push(BoxProcessor::new(svc));
235                }
236                BuilderStep::WireTap { uri } => {
237                    let parsed = parse_uri(&uri)?;
238                    let component = registry.get_or_err(&parsed.scheme)?;
239                    let endpoint = component.create_endpoint(&uri)?;
240                    let producer = endpoint.create_producer(producer_ctx)?;
241                    let svc = camel_processor::WireTapService::new(producer);
242                    processors.push(BoxProcessor::new(svc));
243                }
244                BuilderStep::Multicast { config, steps } => {
245                    // Each top-level step in the multicast scope becomes an independent endpoint.
246                    let mut endpoints = Vec::new();
247                    for step in steps {
248                        let sub_processors =
249                            self.resolve_steps(vec![step], producer_ctx, registry)?;
250                        let endpoint = compose_pipeline(sub_processors);
251                        endpoints.push(endpoint);
252                    }
253                    let svc = camel_processor::MulticastService::new(endpoints, config);
254                    processors.push(BoxProcessor::new(svc));
255                }
256            }
257        }
258        Ok(processors)
259    }
260
261    /// Add a route definition to the controller.
262    ///
263    /// Steps are resolved immediately using the registry.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if:
268    /// - A route with the same ID already exists
269    /// - Step resolution fails
270    pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
271        let route_id = definition.route_id().to_string();
272
273        if self.routes.contains_key(&route_id) {
274            return Err(CamelError::RouteError(format!(
275                "Route '{}' already exists",
276                route_id
277            )));
278        }
279
280        info!(route_id = %route_id, "Adding route to controller");
281
282        // Extract definition info for storage before steps are consumed
283        let definition_info = definition.to_info();
284        let from_uri = definition.from_uri.to_string();
285        let concurrency = definition.concurrency;
286
287        // Create ProducerContext from self_ref for step resolution
288        let producer_ctx = self
289            .self_ref
290            .clone()
291            .map(ProducerContext::new)
292            .ok_or_else(|| CamelError::RouteError("RouteController self_ref not set".into()))?;
293
294        // Lock registry for step resolution
295        let registry = self
296            .registry
297            .lock()
298            .expect("mutex poisoned: another thread panicked while holding this lock");
299
300        // Resolve steps into processors (takes ownership of steps)
301        let processors = self.resolve_steps(definition.steps, &producer_ctx, &registry)?;
302        let mut pipeline = compose_pipeline(processors);
303
304        // Apply circuit breaker if configured
305        if let Some(cb_config) = definition.circuit_breaker {
306            let cb_layer = CircuitBreakerLayer::new(cb_config);
307            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
308        }
309
310        // Determine which error handler config to use (per-route takes precedence)
311        let eh_config = definition
312            .error_handler
313            .or_else(|| self.global_error_handler.clone());
314
315        if let Some(config) = eh_config {
316            let layer = self.resolve_error_handler(config, &producer_ctx, &registry)?;
317            pipeline = BoxProcessor::new(layer.layer(pipeline));
318        }
319
320        // Drop the lock before modifying self.routes
321        drop(registry);
322
323        self.routes.insert(
324            route_id.clone(),
325            ManagedRoute {
326                definition: definition_info,
327                from_uri,
328                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
329                concurrency,
330                status: RouteStatus::Stopped,
331                consumer_handle: None,
332                pipeline_handle: None,
333                cancel_token: CancellationToken::new(),
334            },
335        );
336
337        Ok(())
338    }
339
340    /// Returns the number of routes in the controller.
341    pub fn route_count(&self) -> usize {
342        self.routes.len()
343    }
344
345    /// Returns all route IDs.
346    pub fn route_ids(&self) -> Vec<String> {
347        self.routes.keys().cloned().collect()
348    }
349
350    /// Atomically swap the pipeline of a route.
351    ///
352    /// In-flight requests finish with the old pipeline (kept alive by Arc).
353    /// New requests immediately use the new pipeline.
354    pub fn swap_pipeline(
355        &self,
356        route_id: &str,
357        new_pipeline: BoxProcessor,
358    ) -> Result<(), CamelError> {
359        let managed = self
360            .routes
361            .get(route_id)
362            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
363
364        managed
365            .pipeline
366            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
367        info!(route_id = %route_id, "Pipeline swapped atomically");
368        Ok(())
369    }
370
371    /// Returns the from_uri of a route, if it exists.
372    pub fn route_from_uri(&self, route_id: &str) -> Option<&str> {
373        self.routes.get(route_id).map(|r| r.from_uri.as_str())
374    }
375
376    /// Get a clone of the current pipeline for a route.
377    ///
378    /// This is useful for testing and introspection.
379    /// Returns `None` if the route doesn't exist.
380    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
381        self.routes
382            .get(route_id)
383            .map(|r| r.pipeline.load().0.clone())
384    }
385
386    /// Internal stop implementation that can set custom status.
387    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
388        let managed = self
389            .routes
390            .get_mut(route_id)
391            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
392
393        let current_status = managed.status.clone();
394        if current_status != RouteStatus::Started && current_status != RouteStatus::Suspended {
395            return Ok(()); // Already stopped or stopping
396        }
397
398        info!(route_id = %route_id, "Stopping route");
399        managed.status = RouteStatus::Stopping;
400
401        // Cancel the token to signal shutdown
402        managed.cancel_token.cancel();
403
404        // Take handles directly (no Arc<Mutex> wrapper needed)
405        let consumer_handle = managed.consumer_handle.take();
406        let pipeline_handle = managed.pipeline_handle.take();
407
408        // Wait for tasks to complete with timeout
409        // The CancellationToken already signaled tasks to stop gracefully.
410        // If timeout fires, log a warning — tasks will stop on their own when
411        // they check the cancel token. This is standard Tokio shutdown practice.
412        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
413            match (consumer_handle, pipeline_handle) {
414                (Some(c), Some(p)) => {
415                    let _ = tokio::join!(c, p);
416                }
417                (Some(c), None) => {
418                    let _ = c.await;
419                }
420                (None, Some(p)) => {
421                    let _ = p.await;
422                }
423                (None, None) => {}
424            }
425        })
426        .await;
427
428        if timeout_result.is_err() {
429            warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
430        }
431
432        // Get the managed route again (can't hold across await)
433        let managed = self
434            .routes
435            .get_mut(route_id)
436            .expect("invariant: route must exist after prior existence check");
437
438        // Create a fresh cancellation token for next start
439        managed.cancel_token = CancellationToken::new();
440        managed.status = RouteStatus::Stopped;
441
442        info!(route_id = %route_id, "Route stopped");
443        Ok(())
444    }
445}
446
447#[async_trait::async_trait]
448impl RouteController for DefaultRouteController {
449    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
450        // Check if route exists and can be started, and update status atomically
451        {
452            let managed = self
453                .routes
454                .get_mut(route_id)
455                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
456
457            match managed.status {
458                RouteStatus::Started => return Ok(()), // Already running
459                RouteStatus::Starting => {
460                    return Err(CamelError::RouteError(format!(
461                        "Route '{}' is already starting",
462                        route_id
463                    )));
464                }
465                RouteStatus::Stopped | RouteStatus::Failed(_) => {} // OK to start
466                RouteStatus::Stopping => {
467                    return Err(CamelError::RouteError(format!(
468                        "Route '{}' is stopping",
469                        route_id
470                    )));
471                }
472                RouteStatus::Suspended => {} // OK to resume
473            }
474            managed.status = RouteStatus::Starting;
475        }
476
477        info!(route_id = %route_id, "Starting route");
478
479        // Get the resolved route info
480        let (from_uri, pipeline, concurrency) = {
481            let managed = self
482                .routes
483                .get(route_id)
484                .expect("invariant: route must exist after prior existence check");
485            (
486                managed.from_uri.clone(),
487                Arc::clone(&managed.pipeline),
488                managed.concurrency.clone(),
489            )
490        };
491
492        // Parse from URI and create consumer (lock registry for lookup)
493        let parsed = parse_uri(&from_uri)?;
494        let registry = self
495            .registry
496            .lock()
497            .expect("mutex poisoned: another thread panicked while holding this lock");
498        let component = registry.get_or_err(&parsed.scheme)?;
499        let endpoint = component.create_endpoint(&from_uri)?;
500        let mut consumer = endpoint.create_consumer()?;
501        let consumer_concurrency = consumer.concurrency_model();
502        // Drop the lock before spawning tasks
503        drop(registry);
504
505        // Resolve effective concurrency: route override > consumer default
506        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
507
508        // Get the managed route for mutation
509        let managed = self
510            .routes
511            .get_mut(route_id)
512            .expect("invariant: route must exist after prior existence check");
513
514        // Create channel for consumer to send exchanges
515        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
516        let child_token = managed.cancel_token.child_token();
517        let consumer_ctx = ConsumerContext::new(tx, child_token.clone());
518
519        // Start consumer in background task
520        // TODO: Update route status to Failed when consumer crashes (requires Arc<Mutex<RouteStatus>> or channel)
521        let route_id_for_consumer = route_id.to_string();
522        let consumer_handle = tokio::spawn(async move {
523            if let Err(e) = consumer.start(consumer_ctx).await {
524                error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
525            }
526        });
527
528        // Spawn pipeline task
529        let pipeline_cancel = child_token;
530        let pipeline_handle = match effective_concurrency {
531            ConcurrencyModel::Sequential => {
532                tokio::spawn(async move {
533                    while let Some(envelope) = rx.recv().await {
534                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
535
536                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
537                        let mut pipeline = pipeline.load().0.clone();
538
539                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
540                            if let Some(tx) = reply_tx {
541                                let _ = tx.send(Err(e));
542                            }
543                            return;
544                        }
545
546                        let result = pipeline.call(exchange).await;
547                        if let Some(tx) = reply_tx {
548                            let _ = tx.send(result);
549                        } else if let Err(ref e) = result
550                            && !matches!(e, CamelError::Stopped)
551                        {
552                            error!("Pipeline error: {e}");
553                        }
554                    }
555                })
556            }
557            ConcurrencyModel::Concurrent { max } => {
558                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
559                tokio::spawn(async move {
560                    while let Some(envelope) = rx.recv().await {
561                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
562                        let pipe_ref = Arc::clone(&pipeline);
563                        let sem = sem.clone();
564                        let cancel = pipeline_cancel.clone();
565                        tokio::spawn(async move {
566                            // Acquire semaphore permit if bounded
567                            let _permit = match &sem {
568                                Some(s) => Some(s.acquire().await.expect("semaphore closed")),
569                                None => None,
570                            };
571
572                            // Load current pipeline from ArcSwap
573                            let mut pipe = pipe_ref.load().0.clone();
574
575                            // Wait for service ready with circuit breaker backoff
576                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
577                                if let Some(tx) = reply_tx {
578                                    let _ = tx.send(Err(e));
579                                }
580                                return;
581                            }
582
583                            let result = pipe.call(exchange).await;
584                            if let Some(tx) = reply_tx {
585                                let _ = tx.send(result);
586                            } else if let Err(ref e) = result
587                                && !matches!(e, CamelError::Stopped)
588                            {
589                                error!("Pipeline error: {e}");
590                            }
591                        });
592                    }
593                })
594            }
595        };
596
597        // Store handles and update status
598        let managed = self
599            .routes
600            .get_mut(route_id)
601            .expect("invariant: route must exist after prior existence check");
602        managed.consumer_handle = Some(consumer_handle);
603        managed.pipeline_handle = Some(pipeline_handle);
604        managed.status = RouteStatus::Started;
605
606        info!(route_id = %route_id, "Route started");
607        Ok(())
608    }
609
610    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
611        self.stop_route_internal(route_id).await
612    }
613
614    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
615        self.stop_route(route_id).await?;
616        tokio::time::sleep(Duration::from_millis(100)).await;
617        self.start_route(route_id).await
618    }
619
620    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
621        self.stop_route_internal(route_id).await?;
622        let managed = self
623            .routes
624            .get_mut(route_id)
625            .expect("invariant: route must exist after prior existence check");
626        managed.status = RouteStatus::Suspended;
627        info!(route_id = %route_id, "Route suspended");
628        Ok(())
629    }
630
631    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
632        // Resume only if Suspended
633        let is_suspended = self
634            .routes
635            .get(route_id)
636            .map(|r| r.status == RouteStatus::Suspended)
637            .unwrap_or(false);
638
639        if !is_suspended {
640            return Err(CamelError::RouteError(format!(
641                "Route '{}' is not suspended",
642                route_id
643            )));
644        }
645
646        self.start_route(route_id).await
647    }
648
649    fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
650        self.routes.get(route_id).map(|r| r.status.clone())
651    }
652
653    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
654        // Only start routes where auto_startup() == true
655        // Sort by startup_order() ascending before starting
656        let route_ids: Vec<String> = {
657            let mut pairs: Vec<_> = self
658                .routes
659                .iter()
660                .filter(|(_, r)| r.definition.auto_startup())
661                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
662                .collect();
663            pairs.sort_by_key(|(_, order)| *order);
664            pairs.into_iter().map(|(id, _)| id).collect()
665        };
666
667        info!("Starting {} auto-startup routes", route_ids.len());
668
669        // Collect errors but continue starting remaining routes
670        let mut errors: Vec<String> = Vec::new();
671        for route_id in route_ids {
672            if let Err(e) = self.start_route(&route_id).await {
673                errors.push(format!("Route '{}': {}", route_id, e));
674            }
675        }
676
677        if !errors.is_empty() {
678            return Err(CamelError::RouteError(format!(
679                "Failed to start routes: {}",
680                errors.join(", ")
681            )));
682        }
683
684        info!("All auto-startup routes started");
685        Ok(())
686    }
687
688    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
689        // Sort by startup_order descending (reverse order)
690        let route_ids: Vec<String> = {
691            let mut pairs: Vec<_> = self
692                .routes
693                .iter()
694                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
695                .collect();
696            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
697            pairs.into_iter().map(|(id, _)| id).collect()
698        };
699
700        info!("Stopping {} routes", route_ids.len());
701
702        for route_id in route_ids {
703            let _ = self.stop_route(&route_id).await;
704        }
705
706        info!("All routes stopped");
707        Ok(())
708    }
709}
710
711#[cfg(test)]
712mod tests {
713    use super::*;
714    use std::sync::Arc;
715
716    #[tokio::test]
717    async fn test_swap_pipeline_updates_stored_pipeline() {
718        use camel_api::IdentityProcessor;
719
720        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
721        let mut controller = DefaultRouteController::new(registry);
722
723        let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
724            DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
725        ));
726        controller.set_self_ref(controller_arc);
727
728        let definition =
729            crate::route::RouteDefinition::new("timer:tick", vec![]).with_route_id("swap-test");
730        controller.add_route(definition).unwrap();
731
732        // Swap pipeline should succeed
733        let new_pipeline = BoxProcessor::new(IdentityProcessor);
734        let result = controller.swap_pipeline("swap-test", new_pipeline);
735        assert!(result.is_ok());
736
737        // Swap on non-existent route should fail
738        let new_pipeline = BoxProcessor::new(IdentityProcessor);
739        let result = controller.swap_pipeline("nonexistent", new_pipeline);
740        assert!(result.is_err());
741    }
742
743    #[tokio::test]
744    async fn test_add_route_duplicate_id_fails() {
745        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
746        let mut controller = DefaultRouteController::new(registry);
747
748        // Set self_ref to avoid error during add_route
749        let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
750            DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
751        ));
752        controller.set_self_ref(controller_arc);
753
754        let definition = crate::route::RouteDefinition::new("timer:tick", vec![])
755            .with_route_id("duplicate-route");
756        assert!(controller.add_route(definition).is_ok());
757
758        // Adding a route with the same ID should fail
759        let definition2 = crate::route::RouteDefinition::new("timer:tock", vec![])
760            .with_route_id("duplicate-route");
761        let result = controller.add_route(definition2);
762        assert!(result.is_err());
763        let err = result.unwrap_err().to_string();
764        assert!(
765            err.contains("already exists"),
766            "error should mention 'already exists', got: {}",
767            err
768        );
769    }
770
771    #[tokio::test]
772    async fn test_add_route_with_id_succeeds() {
773        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
774        let mut controller = DefaultRouteController::new(registry);
775
776        // Set self_ref to avoid error during add_route
777        let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
778            DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
779        ));
780        controller.set_self_ref(controller_arc);
781
782        let definition =
783            crate::route::RouteDefinition::new("timer:tick", vec![]).with_route_id("test-route");
784        assert!(controller.add_route(definition).is_ok());
785        assert_eq!(controller.route_count(), 1);
786    }
787}