Skip to main content

camel_core/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::{Mutex, mpsc};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, Service, ServiceExt};
13use tracing::{error, info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::{BoxProcessor, CamelError, ProducerContext, RouteController, RouteStatus};
17use camel_component::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
18use camel_endpoint::parse_uri;
19use camel_processor::circuit_breaker::CircuitBreakerLayer;
20use camel_processor::error_handler::ErrorHandlerLayer;
21
22use crate::registry::Registry;
23use crate::route::{BuilderStep, RouteDefinition, RouteDefinitionInfo, compose_pipeline};
24
25/// A Sync-safe wrapper around BoxProcessor.
26///
27/// BoxProcessor (BoxCloneService) is Send but not Sync. By wrapping in Arc<Mutex>,
28/// we get both Send and Sync for storage in DefaultRouteController.
29type SyncPipeline = Arc<std::sync::Mutex<BoxProcessor>>;
30
31/// Internal state for a managed route.
32struct ManagedRoute {
33    /// The route definition metadata (for introspection).
34    definition: RouteDefinitionInfo,
35    /// Source endpoint URI.
36    from_uri: String,
37    /// Resolved processor pipeline (wrapped for Sync).
38    pipeline: SyncPipeline,
39    /// Concurrency model override (if any).
40    concurrency: Option<ConcurrencyModel>,
41    /// Current lifecycle status.
42    status: RouteStatus,
43    /// Handle for the consumer task (if running).
44    consumer_handle: Option<JoinHandle<()>>,
45    /// Handle for the pipeline task (if running).
46    pipeline_handle: Option<JoinHandle<()>>,
47    /// Cancellation token for stopping this route.
48    cancel_token: CancellationToken,
49}
50
51/// Wait for a pipeline service to be ready with circuit breaker backoff.
52///
53/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
54/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
55/// cancellation checks. It returns `Ok(())` when the service is ready, or
56/// `Err(e)` if cancellation occurred or a fatal error was encountered.
57async fn ready_with_backoff(
58    pipeline: &mut BoxProcessor,
59    cancel: &CancellationToken,
60) -> Result<(), CamelError> {
61    loop {
62        match pipeline.ready().await {
63            Ok(_) => return Ok(()),
64            Err(CamelError::CircuitOpen(ref msg)) => {
65                warn!("Circuit open, backing off: {msg}");
66                tokio::select! {
67                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
68                        continue;
69                    }
70                    _ = cancel.cancelled() => {
71                        // Shutting down — don't retry.
72                        return Err(CamelError::CircuitOpen(msg.clone()));
73                    }
74                }
75            }
76            Err(e) => {
77                error!("Pipeline not ready: {e}");
78                return Err(e);
79            }
80        }
81    }
82}
83
84/// Default implementation of [`RouteController`].
85///
86/// Manages route lifecycle with support for:
87/// - Starting/stopping individual routes
88/// - Suspending and resuming routes
89/// - Auto-startup with startup ordering
90/// - Graceful shutdown
91pub struct DefaultRouteController {
92    /// Routes indexed by route ID.
93    routes: HashMap<String, ManagedRoute>,
94    /// Reference to the component registry for resolving endpoints.
95    registry: Arc<std::sync::Mutex<Registry>>,
96    /// Self-reference for creating ProducerContext.
97    /// Set after construction via `set_self_ref()`.
98    self_ref: Option<Arc<Mutex<dyn RouteController>>>,
99    /// Optional global error handler applied to all routes without a per-route handler.
100    global_error_handler: Option<ErrorHandlerConfig>,
101}
102
103impl DefaultRouteController {
104    /// Create a new `DefaultRouteController` with the given registry.
105    pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
106        Self {
107            routes: HashMap::new(),
108            registry,
109            self_ref: None,
110            global_error_handler: None,
111        }
112    }
113
114    /// Set the self-reference for creating ProducerContext.
115    ///
116    /// This must be called after wrapping the controller in `Arc<Mutex<>>`.
117    pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
118        self.self_ref = Some(self_ref);
119    }
120
121    /// Set a global error handler applied to all routes without a per-route handler.
122    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
123        self.global_error_handler = Some(config);
124    }
125
126    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
127    fn resolve_error_handler(
128        &self,
129        config: ErrorHandlerConfig,
130        producer_ctx: &ProducerContext,
131        registry: &Registry,
132    ) -> Result<ErrorHandlerLayer, CamelError> {
133        // Resolve DLC URI → producer.
134        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
135            let parsed = parse_uri(uri)?;
136            let component = registry.get_or_err(&parsed.scheme)?;
137            let endpoint = component.create_endpoint(uri)?;
138            Some(endpoint.create_producer(producer_ctx)?)
139        } else {
140            None
141        };
142
143        // Resolve per-policy `handled_by` URIs.
144        let mut resolved_policies = Vec::new();
145        for policy in config.policies {
146            let handler_producer = if let Some(ref uri) = policy.handled_by {
147                let parsed = parse_uri(uri)?;
148                let component = registry.get_or_err(&parsed.scheme)?;
149                let endpoint = component.create_endpoint(uri)?;
150                Some(endpoint.create_producer(producer_ctx)?)
151            } else {
152                None
153            };
154            resolved_policies.push((policy, handler_producer));
155        }
156
157        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
158    }
159
160    /// Resolve BuilderSteps into BoxProcessors.
161    fn resolve_steps(
162        &self,
163        steps: Vec<BuilderStep>,
164        producer_ctx: &ProducerContext,
165        registry: &Registry,
166    ) -> Result<Vec<BoxProcessor>, CamelError> {
167        let mut processors: Vec<BoxProcessor> = Vec::new();
168        for step in steps {
169            match step {
170                BuilderStep::Processor(svc) => {
171                    processors.push(svc);
172                }
173                BuilderStep::To(uri) => {
174                    let parsed = parse_uri(&uri)?;
175                    let component = registry.get_or_err(&parsed.scheme)?;
176                    let endpoint = component.create_endpoint(&uri)?;
177                    let producer = endpoint.create_producer(producer_ctx)?;
178                    processors.push(producer);
179                }
180                BuilderStep::Split { config, steps } => {
181                    let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
182                    let sub_pipeline = compose_pipeline(sub_processors);
183                    let splitter =
184                        camel_processor::splitter::SplitterService::new(config, sub_pipeline);
185                    processors.push(BoxProcessor::new(splitter));
186                }
187                BuilderStep::Aggregate { config } => {
188                    let svc = camel_processor::AggregatorService::new(config);
189                    processors.push(BoxProcessor::new(svc));
190                }
191                BuilderStep::Filter { predicate, steps } => {
192                    let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
193                    let sub_pipeline = compose_pipeline(sub_processors);
194                    let svc =
195                        camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
196                    processors.push(BoxProcessor::new(svc));
197                }
198                BuilderStep::WireTap { uri } => {
199                    let parsed = parse_uri(&uri)?;
200                    let component = registry.get_or_err(&parsed.scheme)?;
201                    let endpoint = component.create_endpoint(&uri)?;
202                    let producer = endpoint.create_producer(producer_ctx)?;
203                    let svc = camel_processor::WireTapService::new(producer);
204                    processors.push(BoxProcessor::new(svc));
205                }
206                BuilderStep::Multicast { config, steps } => {
207                    // Each top-level step in the multicast scope becomes an independent endpoint.
208                    let mut endpoints = Vec::new();
209                    for step in steps {
210                        let sub_processors =
211                            self.resolve_steps(vec![step], producer_ctx, registry)?;
212                        let endpoint = compose_pipeline(sub_processors);
213                        endpoints.push(endpoint);
214                    }
215                    let svc = camel_processor::MulticastService::new(endpoints, config);
216                    processors.push(BoxProcessor::new(svc));
217                }
218            }
219        }
220        Ok(processors)
221    }
222
223    /// Add a route definition to the controller.
224    ///
225    /// Steps are resolved immediately using the registry.
226    ///
227    /// # Errors
228    ///
229    /// Returns an error if:
230    /// - The route doesn't have an ID
231    /// - A route with the same ID already exists
232    /// - Step resolution fails
233    pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
234        let route_id = definition
235            .route_id()
236            .ok_or_else(|| CamelError::RouteError("Route must have an ID".into()))?
237            .to_string();
238
239        if self.routes.contains_key(&route_id) {
240            return Err(CamelError::RouteError(format!(
241                "Route '{}' already exists",
242                route_id
243            )));
244        }
245
246        info!(route_id = %route_id, "Adding route to controller");
247
248        // Extract definition info for storage before steps are consumed
249        let definition_info = definition.to_info();
250        let from_uri = definition.from_uri.to_string();
251        let concurrency = definition.concurrency;
252
253        // Create ProducerContext from self_ref for step resolution
254        let producer_ctx = self
255            .self_ref
256            .clone()
257            .map(ProducerContext::new)
258            .ok_or_else(|| CamelError::RouteError("RouteController self_ref not set".into()))?;
259
260        // Lock registry for step resolution
261        let registry = self
262            .registry
263            .lock()
264            .expect("mutex poisoned: another thread panicked while holding this lock");
265
266        // Resolve steps into processors (takes ownership of steps)
267        let processors = self.resolve_steps(definition.steps, &producer_ctx, &registry)?;
268        let mut pipeline = compose_pipeline(processors);
269
270        // Apply circuit breaker if configured
271        if let Some(cb_config) = definition.circuit_breaker {
272            let cb_layer = CircuitBreakerLayer::new(cb_config);
273            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
274        }
275
276        // Determine which error handler config to use (per-route takes precedence)
277        let eh_config = definition
278            .error_handler
279            .or_else(|| self.global_error_handler.clone());
280
281        if let Some(config) = eh_config {
282            let layer = self.resolve_error_handler(config, &producer_ctx, &registry)?;
283            pipeline = BoxProcessor::new(layer.layer(pipeline));
284        }
285
286        // Drop the lock before modifying self.routes
287        drop(registry);
288
289        self.routes.insert(
290            route_id.clone(),
291            ManagedRoute {
292                definition: definition_info,
293                from_uri,
294                pipeline: Arc::new(std::sync::Mutex::new(pipeline)),
295                concurrency,
296                status: RouteStatus::Stopped,
297                consumer_handle: None,
298                pipeline_handle: None,
299                cancel_token: CancellationToken::new(),
300            },
301        );
302
303        Ok(())
304    }
305
306    /// Returns the number of routes in the controller.
307    pub fn route_count(&self) -> usize {
308        self.routes.len()
309    }
310
311    /// Returns all route IDs.
312    pub fn route_ids(&self) -> Vec<String> {
313        self.routes.keys().cloned().collect()
314    }
315
316    /// Internal stop implementation that can set custom status.
317    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
318        let managed = self
319            .routes
320            .get_mut(route_id)
321            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
322
323        let current_status = managed.status.clone();
324        if current_status != RouteStatus::Started && current_status != RouteStatus::Suspended {
325            return Ok(()); // Already stopped or stopping
326        }
327
328        info!(route_id = %route_id, "Stopping route");
329        managed.status = RouteStatus::Stopping;
330
331        // Cancel the token to signal shutdown
332        managed.cancel_token.cancel();
333
334        // Take handles directly (no Arc<Mutex> wrapper needed)
335        let consumer_handle = managed.consumer_handle.take();
336        let pipeline_handle = managed.pipeline_handle.take();
337
338        // Wait for tasks to complete with timeout
339        // The CancellationToken already signaled tasks to stop gracefully.
340        // If timeout fires, log a warning — tasks will stop on their own when
341        // they check the cancel token. This is standard Tokio shutdown practice.
342        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
343            match (consumer_handle, pipeline_handle) {
344                (Some(c), Some(p)) => {
345                    let _ = tokio::join!(c, p);
346                }
347                (Some(c), None) => {
348                    let _ = c.await;
349                }
350                (None, Some(p)) => {
351                    let _ = p.await;
352                }
353                (None, None) => {}
354            }
355        })
356        .await;
357
358        if timeout_result.is_err() {
359            warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
360        }
361
362        // Get the managed route again (can't hold across await)
363        let managed = self
364            .routes
365            .get_mut(route_id)
366            .expect("invariant: route must exist after prior existence check");
367
368        // Create a fresh cancellation token for next start
369        managed.cancel_token = CancellationToken::new();
370        managed.status = RouteStatus::Stopped;
371
372        info!(route_id = %route_id, "Route stopped");
373        Ok(())
374    }
375}
376
377#[async_trait::async_trait]
378impl RouteController for DefaultRouteController {
379    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
380        // Check if route exists and can be started, and update status atomically
381        {
382            let managed = self
383                .routes
384                .get_mut(route_id)
385                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
386
387            match managed.status {
388                RouteStatus::Started => return Ok(()), // Already running
389                RouteStatus::Starting => {
390                    return Err(CamelError::RouteError(format!(
391                        "Route '{}' is already starting",
392                        route_id
393                    )));
394                }
395                RouteStatus::Stopped | RouteStatus::Failed(_) => {} // OK to start
396                RouteStatus::Stopping => {
397                    return Err(CamelError::RouteError(format!(
398                        "Route '{}' is stopping",
399                        route_id
400                    )));
401                }
402                RouteStatus::Suspended => {} // OK to resume
403            }
404            managed.status = RouteStatus::Starting;
405        }
406
407        info!(route_id = %route_id, "Starting route");
408
409        // Get the resolved route info
410        let (from_uri, pipeline, concurrency) = {
411            let managed = self
412                .routes
413                .get(route_id)
414                .expect("invariant: route must exist after prior existence check");
415            (
416                managed.from_uri.clone(),
417                Arc::clone(&managed.pipeline),
418                managed.concurrency.clone(),
419            )
420        };
421
422        // Parse from URI and create consumer (lock registry for lookup)
423        let parsed = parse_uri(&from_uri)?;
424        let registry = self
425            .registry
426            .lock()
427            .expect("mutex poisoned: another thread panicked while holding this lock");
428        let component = registry.get_or_err(&parsed.scheme)?;
429        let endpoint = component.create_endpoint(&from_uri)?;
430        let mut consumer = endpoint.create_consumer()?;
431        let consumer_concurrency = consumer.concurrency_model();
432        // Drop the lock before spawning tasks
433        drop(registry);
434
435        // Resolve effective concurrency: route override > consumer default
436        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
437
438        // Get the managed route for mutation
439        let managed = self
440            .routes
441            .get_mut(route_id)
442            .expect("invariant: route must exist after prior existence check");
443
444        // Create channel for consumer to send exchanges
445        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
446        let child_token = managed.cancel_token.child_token();
447        let consumer_ctx = ConsumerContext::new(tx, child_token.clone());
448
449        // Start consumer in background task
450        // TODO: Update route status to Failed when consumer crashes (requires Arc<Mutex<RouteStatus>> or channel)
451        let route_id_for_consumer = route_id.to_string();
452        let consumer_handle = tokio::spawn(async move {
453            if let Err(e) = consumer.start(consumer_ctx).await {
454                error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
455            }
456        });
457
458        // Spawn pipeline task
459        let pipeline_cancel = child_token;
460        let pipeline_handle = match effective_concurrency {
461            ConcurrencyModel::Sequential => {
462                tokio::spawn(async move {
463                    // Clone pipeline from the Sync wrapper
464                    let mut pipeline = pipeline
465                        .lock()
466                        .expect("mutex poisoned: another thread panicked while holding this lock")
467                        .clone();
468                    while let Some(envelope) = rx.recv().await {
469                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
470
471                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
472                            if let Some(tx) = reply_tx {
473                                let _ = tx.send(Err(e));
474                            }
475                            return;
476                        }
477
478                        let result = pipeline.call(exchange).await;
479                        if let Some(tx) = reply_tx {
480                            let _ = tx.send(result);
481                        } else if let Err(ref e) = result
482                            && !matches!(e, CamelError::Stopped)
483                        {
484                            error!("Pipeline error: {e}");
485                        }
486                    }
487                })
488            }
489            ConcurrencyModel::Concurrent { max } => {
490                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
491                tokio::spawn(async move {
492                    while let Some(envelope) = rx.recv().await {
493                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
494                        let pipe_clone = Arc::clone(&pipeline);
495                        let sem = sem.clone();
496                        let cancel = pipeline_cancel.clone();
497                        tokio::spawn(async move {
498                            // Acquire semaphore permit if bounded
499                            let _permit = match &sem {
500                                Some(s) => Some(s.acquire().await.expect("semaphore closed")),
501                                None => None,
502                            };
503
504                            // Clone pipeline from the Sync wrapper
505                            let mut pipe = pipe_clone.lock().expect("mutex poisoned: another thread panicked while holding this lock").clone();
506
507                            // Wait for service ready with circuit breaker backoff
508                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
509                                if let Some(tx) = reply_tx {
510                                    let _ = tx.send(Err(e));
511                                }
512                                return;
513                            }
514
515                            let result = pipe.call(exchange).await;
516                            if let Some(tx) = reply_tx {
517                                let _ = tx.send(result);
518                            } else if let Err(ref e) = result
519                                && !matches!(e, CamelError::Stopped)
520                            {
521                                error!("Pipeline error: {e}");
522                            }
523                        });
524                    }
525                })
526            }
527        };
528
529        // Store handles and update status
530        let managed = self
531            .routes
532            .get_mut(route_id)
533            .expect("invariant: route must exist after prior existence check");
534        managed.consumer_handle = Some(consumer_handle);
535        managed.pipeline_handle = Some(pipeline_handle);
536        managed.status = RouteStatus::Started;
537
538        info!(route_id = %route_id, "Route started");
539        Ok(())
540    }
541
542    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
543        self.stop_route_internal(route_id).await
544    }
545
546    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
547        self.stop_route(route_id).await?;
548        tokio::time::sleep(Duration::from_millis(100)).await;
549        self.start_route(route_id).await
550    }
551
552    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
553        self.stop_route_internal(route_id).await?;
554        let managed = self
555            .routes
556            .get_mut(route_id)
557            .expect("invariant: route must exist after prior existence check");
558        managed.status = RouteStatus::Suspended;
559        info!(route_id = %route_id, "Route suspended");
560        Ok(())
561    }
562
563    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
564        // Resume only if Suspended
565        let is_suspended = self
566            .routes
567            .get(route_id)
568            .map(|r| r.status == RouteStatus::Suspended)
569            .unwrap_or(false);
570
571        if !is_suspended {
572            return Err(CamelError::RouteError(format!(
573                "Route '{}' is not suspended",
574                route_id
575            )));
576        }
577
578        self.start_route(route_id).await
579    }
580
581    fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
582        self.routes.get(route_id).map(|r| r.status.clone())
583    }
584
585    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
586        // Only start routes where auto_startup() == true
587        // Sort by startup_order() ascending before starting
588        let route_ids: Vec<String> = {
589            let mut pairs: Vec<_> = self
590                .routes
591                .iter()
592                .filter(|(_, r)| r.definition.auto_startup())
593                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
594                .collect();
595            pairs.sort_by_key(|(_, order)| *order);
596            pairs.into_iter().map(|(id, _)| id).collect()
597        };
598
599        info!("Starting {} auto-startup routes", route_ids.len());
600
601        // Collect errors but continue starting remaining routes
602        let mut errors: Vec<String> = Vec::new();
603        for route_id in route_ids {
604            if let Err(e) = self.start_route(&route_id).await {
605                errors.push(format!("Route '{}': {}", route_id, e));
606            }
607        }
608
609        if !errors.is_empty() {
610            return Err(CamelError::RouteError(format!(
611                "Failed to start routes: {}",
612                errors.join(", ")
613            )));
614        }
615
616        info!("All auto-startup routes started");
617        Ok(())
618    }
619
620    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
621        // Sort by startup_order descending (reverse order)
622        let route_ids: Vec<String> = {
623            let mut pairs: Vec<_> = self
624                .routes
625                .iter()
626                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
627                .collect();
628            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
629            pairs.into_iter().map(|(id, _)| id).collect()
630        };
631
632        info!("Stopping {} routes", route_ids.len());
633
634        for route_id in route_ids {
635            let _ = self.stop_route(&route_id).await;
636        }
637
638        info!("All routes stopped");
639        Ok(())
640    }
641}
642
643#[cfg(test)]
644mod tests {
645    use super::*;
646    use std::sync::Arc;
647
648    #[tokio::test]
649    async fn test_add_route_requires_id() {
650        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
651        let mut controller = DefaultRouteController::new(registry);
652
653        // Set self_ref to avoid error during add_route
654        let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
655            DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
656        ));
657        controller.set_self_ref(controller_arc);
658
659        let definition = crate::route::RouteDefinition::new("timer:tick", vec![]);
660        // RouteDefinition with no route_id should fail
661        assert!(controller.add_route(definition).is_err());
662    }
663
664    #[tokio::test]
665    async fn test_add_route_with_id_succeeds() {
666        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
667        let mut controller = DefaultRouteController::new(registry);
668
669        // Set self_ref to avoid error during add_route
670        let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
671            DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
672        ));
673        controller.set_self_ref(controller_arc);
674
675        let definition =
676            crate::route::RouteDefinition::new("timer:tick", vec![]).with_route_id("test-route");
677        assert!(controller.add_route(definition).is_ok());
678        assert_eq!(controller.route_count(), 1);
679    }
680}