1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::{Mutex, mpsc};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, Service, ServiceExt};
13use tracing::{error, info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::{
17 BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProducerContext,
18 RouteController, RouteStatus, Value, body::Body,
19};
20use camel_component::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
21use camel_endpoint::parse_uri;
22use camel_language_api::{Expression, Language, Predicate};
23use camel_processor::circuit_breaker::CircuitBreakerLayer;
24use camel_processor::error_handler::ErrorHandlerLayer;
25use camel_processor::{ChoiceService, WhenClause};
26
27use crate::config::{DetailLevel, TracerConfig};
28use crate::registry::Registry;
29use crate::route::{
30 BuilderStep, LanguageExpressionDef, RouteDefinition, RouteDefinitionInfo, ValueSourceDef,
31 compose_pipeline, compose_traced_pipeline,
32};
33use arc_swap::ArcSwap;
34use camel_bean::BeanRegistry;
35
36#[derive(Debug, Clone)]
41pub struct CrashNotification {
42 pub route_id: String,
44 pub error: String,
46}
47
48pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
63unsafe impl Sync for SyncBoxProcessor {}
64
65type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
66pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
67
68#[async_trait::async_trait]
74pub trait RouteControllerInternal: RouteController + Send {
75 fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError>;
77
78 fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError>;
80
81 fn route_from_uri(&self, route_id: &str) -> Option<String>;
83
84 fn set_error_handler(&mut self, config: ErrorHandlerConfig);
86
87 fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>);
89
90 fn route_count(&self) -> usize;
92
93 fn route_ids(&self) -> Vec<String>;
95
96 fn set_tracer_config(&mut self, config: &TracerConfig);
98
99 fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError>;
102
103 fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError>;
105
106 async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
108
109 async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
111}
112
113struct ManagedRoute {
115 definition: RouteDefinitionInfo,
117 from_uri: String,
119 pipeline: SharedPipeline,
121 concurrency: Option<ConcurrencyModel>,
123 status: Arc<std::sync::Mutex<RouteStatus>>,
125 consumer_handle: Option<JoinHandle<()>>,
127 pipeline_handle: Option<JoinHandle<()>>,
129 consumer_cancel_token: CancellationToken,
132 pipeline_cancel_token: CancellationToken,
135 channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
138}
139
140async fn ready_with_backoff(
147 pipeline: &mut BoxProcessor,
148 cancel: &CancellationToken,
149) -> Result<(), CamelError> {
150 loop {
151 match pipeline.ready().await {
152 Ok(_) => return Ok(()),
153 Err(CamelError::CircuitOpen(ref msg)) => {
154 warn!("Circuit open, backing off: {msg}");
155 tokio::select! {
156 _ = tokio::time::sleep(Duration::from_secs(1)) => {
157 continue;
158 }
159 _ = cancel.cancelled() => {
160 return Err(CamelError::CircuitOpen(msg.clone()));
162 }
163 }
164 }
165 Err(e) => {
166 error!("Pipeline not ready: {e}");
167 return Err(e);
168 }
169 }
170 }
171}
172
173pub struct DefaultRouteController {
181 routes: HashMap<String, ManagedRoute>,
183 registry: Arc<std::sync::Mutex<Registry>>,
185 languages: SharedLanguageRegistry,
187 beans: Arc<std::sync::Mutex<BeanRegistry>>,
189 self_ref: Option<Arc<Mutex<dyn RouteController>>>,
192 global_error_handler: Option<ErrorHandlerConfig>,
194 crash_notifier: Option<mpsc::Sender<CrashNotification>>,
196 tracing_enabled: bool,
198 tracer_detail_level: DetailLevel,
200}
201
202impl DefaultRouteController {
203 pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
205 Self::with_beans(
206 registry,
207 Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
208 )
209 }
210
211 pub fn with_beans(
213 registry: Arc<std::sync::Mutex<Registry>>,
214 beans: Arc<std::sync::Mutex<BeanRegistry>>,
215 ) -> Self {
216 Self {
217 routes: HashMap::new(),
218 registry,
219 languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
220 beans,
221 self_ref: None,
222 global_error_handler: None,
223 crash_notifier: None,
224 tracing_enabled: false,
225 tracer_detail_level: DetailLevel::Minimal,
226 }
227 }
228
229 pub fn with_languages(
231 registry: Arc<std::sync::Mutex<Registry>>,
232 languages: SharedLanguageRegistry,
233 ) -> Self {
234 Self {
235 routes: HashMap::new(),
236 registry,
237 languages,
238 beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
239 self_ref: None,
240 global_error_handler: None,
241 crash_notifier: None,
242 tracing_enabled: false,
243 tracer_detail_level: DetailLevel::Minimal,
244 }
245 }
246
247 pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
251 self.self_ref = Some(self_ref);
252 }
253
254 pub fn self_ref_for_supervision(&self) -> Option<Arc<Mutex<dyn RouteController>>> {
259 self.self_ref.clone()
260 }
261
262 pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
267 self.crash_notifier = Some(tx);
268 }
269
270 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
272 self.global_error_handler = Some(config);
273 }
274
275 pub fn set_tracer_config(&mut self, config: &TracerConfig) {
277 self.tracing_enabled = config.enabled;
278 self.tracer_detail_level = config.detail_level.clone();
279 }
280
281 fn resolve_error_handler(
283 &self,
284 config: ErrorHandlerConfig,
285 producer_ctx: &ProducerContext,
286 registry: &Registry,
287 ) -> Result<ErrorHandlerLayer, CamelError> {
288 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
290 let parsed = parse_uri(uri)?;
291 let component = registry.get_or_err(&parsed.scheme)?;
292 let endpoint = component.create_endpoint(uri)?;
293 Some(endpoint.create_producer(producer_ctx)?)
294 } else {
295 None
296 };
297
298 let mut resolved_policies = Vec::new();
300 for policy in config.policies {
301 let handler_producer = if let Some(ref uri) = policy.handled_by {
302 let parsed = parse_uri(uri)?;
303 let component = registry.get_or_err(&parsed.scheme)?;
304 let endpoint = component.create_endpoint(uri)?;
305 Some(endpoint.create_producer(producer_ctx)?)
306 } else {
307 None
308 };
309 resolved_policies.push((policy, handler_producer));
310 }
311
312 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
313 }
314
315 fn resolve_language(&self, language: &str) -> Result<Arc<dyn Language>, CamelError> {
316 let guard = self
317 .languages
318 .lock()
319 .expect("mutex poisoned: another thread panicked while holding this lock");
320 guard.get(language).cloned().ok_or_else(|| {
321 CamelError::RouteError(format!(
322 "language `{language}` is not registered in CamelContext"
323 ))
324 })
325 }
326
327 fn compile_language_expression(
328 &self,
329 expression: &LanguageExpressionDef,
330 ) -> Result<Arc<dyn Expression>, CamelError> {
331 let language = self.resolve_language(&expression.language)?;
332 let compiled = language
333 .create_expression(&expression.source)
334 .map_err(|e| {
335 CamelError::RouteError(format!(
336 "failed to compile {} expression `{}`: {e}",
337 expression.language, expression.source
338 ))
339 })?;
340 Ok(Arc::from(compiled))
341 }
342
343 fn compile_language_predicate(
344 &self,
345 expression: &LanguageExpressionDef,
346 ) -> Result<Arc<dyn Predicate>, CamelError> {
347 let language = self.resolve_language(&expression.language)?;
348 let compiled = language.create_predicate(&expression.source).map_err(|e| {
349 CamelError::RouteError(format!(
350 "failed to compile {} predicate `{}`: {e}",
351 expression.language, expression.source
352 ))
353 })?;
354 Ok(Arc::from(compiled))
355 }
356
357 fn compile_filter_predicate(
358 &self,
359 expression: &LanguageExpressionDef,
360 ) -> Result<FilterPredicate, CamelError> {
361 let predicate = self.compile_language_predicate(expression)?;
362 Ok(Arc::new(move |exchange: &Exchange| {
363 predicate.matches(exchange).unwrap_or(false)
364 }))
365 }
366
367 fn value_to_body(value: Value) -> Body {
368 match value {
369 Value::Null => Body::Empty,
370 Value::String(text) => Body::Text(text),
371 other => Body::Json(other),
372 }
373 }
374
375 pub(crate) fn resolve_steps(
377 &self,
378 steps: Vec<BuilderStep>,
379 producer_ctx: &ProducerContext,
380 registry: &Registry,
381 ) -> Result<Vec<BoxProcessor>, CamelError> {
382 let mut processors: Vec<BoxProcessor> = Vec::new();
383 for step in steps {
384 match step {
385 BuilderStep::Processor(svc) => {
386 processors.push(svc);
387 }
388 BuilderStep::To(uri) => {
389 let parsed = parse_uri(&uri)?;
390 let component = registry.get_or_err(&parsed.scheme)?;
391 let endpoint = component.create_endpoint(&uri)?;
392 let producer = endpoint.create_producer(producer_ctx)?;
393 processors.push(producer);
394 }
395 BuilderStep::DeclarativeSetHeader { key, value } => match value {
396 ValueSourceDef::Literal(value) => {
397 let svc = camel_processor::SetHeader::new(IdentityProcessor, key, value);
398 processors.push(BoxProcessor::new(svc));
399 }
400 ValueSourceDef::Expression(expression) => {
401 let expression = self.compile_language_expression(&expression)?;
402 let svc = camel_processor::DynamicSetHeader::new(
403 IdentityProcessor,
404 key,
405 move |exchange: &Exchange| {
406 expression.evaluate(exchange).unwrap_or(Value::Null)
407 },
408 );
409 processors.push(BoxProcessor::new(svc));
410 }
411 },
412 BuilderStep::DeclarativeSetBody { value } => match value {
413 ValueSourceDef::Literal(value) => {
414 let body = Self::value_to_body(value);
415 let svc = camel_processor::SetBody::new(
416 IdentityProcessor,
417 move |_exchange: &Exchange| body.clone(),
418 );
419 processors.push(BoxProcessor::new(svc));
420 }
421 ValueSourceDef::Expression(expression) => {
422 let expression = self.compile_language_expression(&expression)?;
423 let svc = camel_processor::SetBody::new(
424 IdentityProcessor,
425 move |exchange: &Exchange| {
426 let value = expression.evaluate(exchange).unwrap_or(Value::Null);
427 Self::value_to_body(value)
428 },
429 );
430 processors.push(BoxProcessor::new(svc));
431 }
432 },
433 BuilderStep::DeclarativeFilter { predicate, steps } => {
434 let predicate = self.compile_filter_predicate(&predicate)?;
435 let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
436 let sub_pipeline = compose_pipeline(sub_processors);
437 let svc =
438 camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
439 processors.push(BoxProcessor::new(svc));
440 }
441 BuilderStep::DeclarativeChoice { whens, otherwise } => {
442 let mut when_clauses = Vec::new();
443 for when_step in whens {
444 let predicate = self.compile_filter_predicate(&when_step.predicate)?;
445 let sub_processors =
446 self.resolve_steps(when_step.steps, producer_ctx, registry)?;
447 let pipeline = compose_pipeline(sub_processors);
448 when_clauses.push(WhenClause {
449 predicate,
450 pipeline,
451 });
452 }
453 let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
454 let sub_processors =
455 self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
456 Some(compose_pipeline(sub_processors))
457 } else {
458 None
459 };
460 let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
461 processors.push(BoxProcessor::new(svc));
462 }
463 BuilderStep::DeclarativeScript { expression } => {
464 let expression = self.compile_language_expression(&expression)?;
465 let svc = camel_processor::SetBody::new(
466 IdentityProcessor,
467 move |exchange: &Exchange| {
468 let value = expression.evaluate(exchange).unwrap_or(Value::Null);
469 Self::value_to_body(value)
470 },
471 );
472 processors.push(BoxProcessor::new(svc));
473 }
474 BuilderStep::Split { config, steps } => {
475 let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
476 let sub_pipeline = compose_pipeline(sub_processors);
477 let splitter =
478 camel_processor::splitter::SplitterService::new(config, sub_pipeline);
479 processors.push(BoxProcessor::new(splitter));
480 }
481 BuilderStep::DeclarativeSplit {
482 expression,
483 aggregation,
484 parallel,
485 parallel_limit,
486 stop_on_exception,
487 steps,
488 } => {
489 let lang_expr = self.compile_language_expression(&expression)?;
490 let split_fn = move |exchange: &Exchange| {
491 let value = lang_expr.evaluate(exchange).unwrap_or(Value::Null);
492 match value {
493 Value::String(s) => s
494 .lines()
495 .filter(|line| !line.is_empty())
496 .map(|line| {
497 let mut fragment = exchange.clone();
498 fragment.input.body = Body::from(line.to_string());
499 fragment
500 })
501 .collect(),
502 Value::Array(arr) => arr
503 .into_iter()
504 .map(|v| {
505 let mut fragment = exchange.clone();
506 fragment.input.body = Body::from(v);
507 fragment
508 })
509 .collect(),
510 _ => vec![exchange.clone()],
511 }
512 };
513
514 let mut config = camel_api::splitter::SplitterConfig::new(Arc::new(split_fn))
515 .aggregation(aggregation)
516 .parallel(parallel)
517 .stop_on_exception(stop_on_exception);
518 if let Some(limit) = parallel_limit {
519 config = config.parallel_limit(limit);
520 }
521
522 let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
523 let sub_pipeline = compose_pipeline(sub_processors);
524 let splitter =
525 camel_processor::splitter::SplitterService::new(config, sub_pipeline);
526 processors.push(BoxProcessor::new(splitter));
527 }
528 BuilderStep::Aggregate { config } => {
529 let svc = camel_processor::AggregatorService::new(config);
530 processors.push(BoxProcessor::new(svc));
531 }
532 BuilderStep::Filter { predicate, steps } => {
533 let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
534 let sub_pipeline = compose_pipeline(sub_processors);
535 let svc =
536 camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
537 processors.push(BoxProcessor::new(svc));
538 }
539 BuilderStep::Choice { whens, otherwise } => {
540 let mut when_clauses = Vec::new();
542 for when_step in whens {
543 let sub_processors =
544 self.resolve_steps(when_step.steps, producer_ctx, registry)?;
545 let pipeline = compose_pipeline(sub_processors);
546 when_clauses.push(WhenClause {
547 predicate: when_step.predicate,
548 pipeline,
549 });
550 }
551 let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
553 let sub_processors =
554 self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
555 Some(compose_pipeline(sub_processors))
556 } else {
557 None
558 };
559 let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
560 processors.push(BoxProcessor::new(svc));
561 }
562 BuilderStep::WireTap { uri } => {
563 let parsed = parse_uri(&uri)?;
564 let component = registry.get_or_err(&parsed.scheme)?;
565 let endpoint = component.create_endpoint(&uri)?;
566 let producer = endpoint.create_producer(producer_ctx)?;
567 let svc = camel_processor::WireTapService::new(producer);
568 processors.push(BoxProcessor::new(svc));
569 }
570 BuilderStep::Multicast { config, steps } => {
571 let mut endpoints = Vec::new();
573 for step in steps {
574 let sub_processors =
575 self.resolve_steps(vec![step], producer_ctx, registry)?;
576 let endpoint = compose_pipeline(sub_processors);
577 endpoints.push(endpoint);
578 }
579 let svc = camel_processor::MulticastService::new(endpoints, config);
580 processors.push(BoxProcessor::new(svc));
581 }
582 BuilderStep::DeclarativeLog { level, message } => {
583 let ValueSourceDef::Expression(expression) = message else {
584 unreachable!(
587 "DeclarativeLog with Literal should have been compiled to a Processor"
588 );
589 };
590 let expression = self.compile_language_expression(&expression)?;
591 let svc =
592 camel_processor::log::DynamicLog::new(level, move |exchange: &Exchange| {
593 expression
594 .evaluate(exchange)
595 .unwrap_or_else(|e| {
596 warn!(error = %e, "log expression evaluation failed");
597 Value::Null
598 })
599 .to_string()
600 });
601 processors.push(BoxProcessor::new(svc));
602 }
603 BuilderStep::Bean { name, method } => {
604 let beans = self.beans.lock().expect(
606 "beans mutex poisoned: another thread panicked while holding this lock",
607 );
608
609 let bean = beans.get(&name).ok_or_else(|| {
611 CamelError::ProcessorError(format!("Bean not found: {}", name))
612 })?;
613
614 let bean_clone = Arc::clone(&bean);
616 let method = method.clone();
617
618 let processor = tower::service_fn(move |mut exchange: Exchange| {
620 let bean = Arc::clone(&bean_clone);
621 let method = method.clone();
622
623 async move {
624 bean.call(&method, &mut exchange).await?;
625 Ok(exchange)
626 }
627 });
628
629 processors.push(BoxProcessor::new(processor));
630 }
631 }
632 }
633 Ok(processors)
634 }
635
636 pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
646 let route_id = definition.route_id().to_string();
647
648 if self.routes.contains_key(&route_id) {
649 return Err(CamelError::RouteError(format!(
650 "Route '{}' already exists",
651 route_id
652 )));
653 }
654
655 info!(route_id = %route_id, "Adding route to controller");
656
657 let definition_info = definition.to_info();
659 let from_uri = definition.from_uri.to_string();
660 let concurrency = definition.concurrency;
661
662 let producer_ctx = self
664 .self_ref
665 .clone()
666 .map(ProducerContext::new)
667 .ok_or_else(|| CamelError::RouteError("RouteController self_ref not set".into()))?;
668
669 let registry = self
671 .registry
672 .lock()
673 .expect("mutex poisoned: another thread panicked while holding this lock");
674
675 let processors = self.resolve_steps(definition.steps, &producer_ctx, ®istry)?;
677 let route_id_for_tracing = route_id.clone();
678 let mut pipeline = compose_traced_pipeline(
679 processors,
680 &route_id_for_tracing,
681 self.tracing_enabled,
682 self.tracer_detail_level.clone(),
683 );
684
685 if let Some(cb_config) = definition.circuit_breaker {
687 let cb_layer = CircuitBreakerLayer::new(cb_config);
688 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
689 }
690
691 let eh_config = definition
693 .error_handler
694 .or_else(|| self.global_error_handler.clone());
695
696 if let Some(config) = eh_config {
697 let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
698 pipeline = BoxProcessor::new(layer.layer(pipeline));
699 }
700
701 drop(registry);
703
704 self.routes.insert(
705 route_id.clone(),
706 ManagedRoute {
707 definition: definition_info,
708 from_uri,
709 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
710 concurrency,
711 status: Arc::new(std::sync::Mutex::new(RouteStatus::Stopped)),
712 consumer_handle: None,
713 pipeline_handle: None,
714 consumer_cancel_token: CancellationToken::new(),
715 pipeline_cancel_token: CancellationToken::new(),
716 channel_sender: None,
717 },
718 );
719
720 Ok(())
721 }
722
723 pub fn compile_route_definition(
728 &self,
729 def: RouteDefinition,
730 ) -> Result<BoxProcessor, CamelError> {
731 let route_id = def.route_id().to_string();
732
733 let producer_ctx = self
734 .self_ref
735 .clone()
736 .map(ProducerContext::new)
737 .ok_or_else(|| CamelError::RouteError("RouteController self_ref not set".into()))?;
738
739 let registry = self
740 .registry
741 .lock()
742 .expect("mutex poisoned: registry lock in compile_route_definition");
743
744 let processors = self.resolve_steps(def.steps, &producer_ctx, ®istry)?;
745 let mut pipeline = compose_traced_pipeline(
746 processors,
747 &route_id,
748 self.tracing_enabled,
749 self.tracer_detail_level.clone(),
750 );
751
752 if let Some(cb_config) = def.circuit_breaker {
753 let cb_layer = CircuitBreakerLayer::new(cb_config);
754 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
755 }
756
757 let eh_config = def
758 .error_handler
759 .or_else(|| self.global_error_handler.clone());
760 if let Some(config) = eh_config {
761 let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
762 pipeline = BoxProcessor::new(layer.layer(pipeline));
763 }
764
765 drop(registry);
766 Ok(pipeline)
767 }
768
769 pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
775 let managed = self.routes.get(route_id).ok_or_else(|| {
776 CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
777 })?;
778 let status = managed
779 .status
780 .lock()
781 .expect("status mutex poisoned")
782 .clone();
783 match status {
784 RouteStatus::Stopped | RouteStatus::Failed(_) => {}
785 other => {
786 return Err(CamelError::RouteError(format!(
787 "Route '{}' must be stopped before removal (current status: {:?})",
788 route_id, other
789 )));
790 }
791 }
792 self.routes.remove(route_id);
793 info!(route_id = %route_id, "Route removed from controller");
794 Ok(())
795 }
796
797 pub fn route_count(&self) -> usize {
799 self.routes.len()
800 }
801
802 #[doc(hidden)]
804 pub fn force_route_status(&mut self, route_id: &str, status: RouteStatus) {
805 if let Some(managed) = self.routes.get(route_id) {
806 *managed.status.lock().expect("status mutex poisoned") = status;
807 }
808 }
809
810 pub fn route_ids(&self) -> Vec<String> {
812 self.routes.keys().cloned().collect()
813 }
814
815 pub fn swap_pipeline(
820 &self,
821 route_id: &str,
822 new_pipeline: BoxProcessor,
823 ) -> Result<(), CamelError> {
824 let managed = self
825 .routes
826 .get(route_id)
827 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
828
829 managed
830 .pipeline
831 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
832 info!(route_id = %route_id, "Pipeline swapped atomically");
833 Ok(())
834 }
835
836 pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
838 self.routes.get(route_id).map(|r| r.from_uri.clone())
839 }
840
841 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
846 self.routes
847 .get(route_id)
848 .map(|r| r.pipeline.load().0.clone())
849 }
850
851 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
853 let managed = self
854 .routes
855 .get_mut(route_id)
856 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
857
858 let current_status = managed
859 .status
860 .lock()
861 .expect("status mutex poisoned")
862 .clone();
863 if current_status != RouteStatus::Started && current_status != RouteStatus::Suspended {
864 return Ok(()); }
866
867 info!(route_id = %route_id, "Stopping route");
868 *managed.status.lock().expect("status mutex poisoned") = RouteStatus::Stopping;
869
870 let managed = self
872 .routes
873 .get_mut(route_id)
874 .expect("invariant: route must exist after prior existence check");
875 managed.consumer_cancel_token.cancel();
876 managed.pipeline_cancel_token.cancel();
877
878 let managed = self
880 .routes
881 .get_mut(route_id)
882 .expect("invariant: route must exist after prior existence check");
883 let consumer_handle = managed.consumer_handle.take();
884 let pipeline_handle = managed.pipeline_handle.take();
885
886 let managed = self
889 .routes
890 .get_mut(route_id)
891 .expect("invariant: route must exist after prior existence check");
892 managed.channel_sender = None;
893
894 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
898 match (consumer_handle, pipeline_handle) {
899 (Some(c), Some(p)) => {
900 let _ = tokio::join!(c, p);
901 }
902 (Some(c), None) => {
903 let _ = c.await;
904 }
905 (None, Some(p)) => {
906 let _ = p.await;
907 }
908 (None, None) => {}
909 }
910 })
911 .await;
912
913 if timeout_result.is_err() {
914 warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
915 }
916
917 let managed = self
919 .routes
920 .get_mut(route_id)
921 .expect("invariant: route must exist after prior existence check");
922
923 managed.consumer_cancel_token = CancellationToken::new();
925 managed.pipeline_cancel_token = CancellationToken::new();
926 *managed.status.lock().expect("status mutex poisoned") = RouteStatus::Stopped;
927
928 info!(route_id = %route_id, "Route stopped");
929 Ok(())
930 }
931}
932
933#[async_trait::async_trait]
934impl RouteController for DefaultRouteController {
935 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
936 {
938 let managed = self
939 .routes
940 .get_mut(route_id)
941 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
942
943 let current_status = managed
944 .status
945 .lock()
946 .expect("status mutex poisoned")
947 .clone();
948 match current_status {
949 RouteStatus::Started => return Ok(()), RouteStatus::Starting => {
951 return Err(CamelError::RouteError(format!(
952 "Route '{}' is already starting",
953 route_id
954 )));
955 }
956 RouteStatus::Stopped | RouteStatus::Failed(_) => {} RouteStatus::Stopping => {
958 return Err(CamelError::RouteError(format!(
959 "Route '{}' is stopping",
960 route_id
961 )));
962 }
963 RouteStatus::Suspended => {
964 return Err(CamelError::RouteError(format!(
965 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
966 route_id
967 )));
968 }
969 }
970 *managed.status.lock().expect("status mutex poisoned") = RouteStatus::Starting;
971 }
972
973 info!(route_id = %route_id, "Starting route");
974
975 let (from_uri, pipeline, concurrency, status_for_consumer) = {
977 let managed = self
978 .routes
979 .get(route_id)
980 .expect("invariant: route must exist after prior existence check");
981 (
982 managed.from_uri.clone(),
983 Arc::clone(&managed.pipeline),
984 managed.concurrency.clone(),
985 Arc::clone(&managed.status),
986 )
987 };
988
989 let crash_notifier = self.crash_notifier.clone();
991
992 let parsed = parse_uri(&from_uri)?;
994 let registry = self
995 .registry
996 .lock()
997 .expect("mutex poisoned: another thread panicked while holding this lock");
998 let component = registry.get_or_err(&parsed.scheme)?;
999 let endpoint = component.create_endpoint(&from_uri)?;
1000 let mut consumer = endpoint.create_consumer()?;
1001 let consumer_concurrency = consumer.concurrency_model();
1002 drop(registry);
1004
1005 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1007
1008 let managed = self
1010 .routes
1011 .get_mut(route_id)
1012 .expect("invariant: route must exist after prior existence check");
1013
1014 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1016 let consumer_cancel = managed.consumer_cancel_token.child_token();
1018 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1019 let tx_for_storage = tx.clone();
1021 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1022
1023 let route_id_for_consumer = route_id.to_string();
1026 let consumer_handle = tokio::spawn(async move {
1027 if let Err(e) = consumer.start(consumer_ctx).await {
1028 error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1029 let error_msg = e.to_string();
1030 *status_for_consumer.lock().expect("status mutex poisoned") =
1031 RouteStatus::Failed(error_msg.clone());
1032
1033 if let Some(tx) = crash_notifier {
1035 let _ = tx
1036 .send(CrashNotification {
1037 route_id: route_id_for_consumer.clone(),
1038 error: error_msg,
1039 })
1040 .await;
1041 }
1042 }
1043 });
1044
1045 let pipeline_handle = match effective_concurrency {
1047 ConcurrencyModel::Sequential => {
1048 tokio::spawn(async move {
1049 loop {
1050 let envelope = tokio::select! {
1052 envelope = rx.recv() => match envelope {
1053 Some(e) => e,
1054 None => return, },
1056 _ = pipeline_cancel.cancelled() => {
1057 return;
1059 }
1060 };
1061 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1062
1063 let mut pipeline = pipeline.load().0.clone();
1065
1066 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1067 if let Some(tx) = reply_tx {
1068 let _ = tx.send(Err(e));
1069 }
1070 return;
1071 }
1072
1073 let result = pipeline.call(exchange).await;
1074 if let Some(tx) = reply_tx {
1075 let _ = tx.send(result);
1076 } else if let Err(ref e) = result
1077 && !matches!(e, CamelError::Stopped)
1078 {
1079 error!("Pipeline error: {e}");
1080 }
1081 }
1082 })
1083 }
1084 ConcurrencyModel::Concurrent { max } => {
1085 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1086 tokio::spawn(async move {
1087 loop {
1088 let envelope = tokio::select! {
1090 envelope = rx.recv() => match envelope {
1091 Some(e) => e,
1092 None => return, },
1094 _ = pipeline_cancel.cancelled() => {
1095 return;
1097 }
1098 };
1099 let ExchangeEnvelope { exchange, reply_tx } = envelope;
1100 let pipe_ref = Arc::clone(&pipeline);
1101 let sem = sem.clone();
1102 let cancel = pipeline_cancel.clone();
1103 tokio::spawn(async move {
1104 let _permit = match &sem {
1106 Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1107 None => None,
1108 };
1109
1110 let mut pipe = pipe_ref.load().0.clone();
1112
1113 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1115 if let Some(tx) = reply_tx {
1116 let _ = tx.send(Err(e));
1117 }
1118 return;
1119 }
1120
1121 let result = pipe.call(exchange).await;
1122 if let Some(tx) = reply_tx {
1123 let _ = tx.send(result);
1124 } else if let Err(ref e) = result
1125 && !matches!(e, CamelError::Stopped)
1126 {
1127 error!("Pipeline error: {e}");
1128 }
1129 });
1130 }
1131 })
1132 }
1133 };
1134
1135 let managed = self
1137 .routes
1138 .get_mut(route_id)
1139 .expect("invariant: route must exist after prior existence check");
1140 managed.consumer_handle = Some(consumer_handle);
1141 managed.pipeline_handle = Some(pipeline_handle);
1142 managed.channel_sender = Some(tx_for_storage);
1143 let mut status_guard = managed.status.lock().expect("status mutex poisoned");
1145 if !matches!(*status_guard, RouteStatus::Failed(_)) {
1146 *status_guard = RouteStatus::Started;
1147 }
1148 drop(status_guard);
1149
1150 info!(route_id = %route_id, "Route started");
1151 Ok(())
1152 }
1153
1154 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1155 self.stop_route_internal(route_id).await
1156 }
1157
1158 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1159 self.stop_route(route_id).await?;
1160 tokio::time::sleep(Duration::from_millis(100)).await;
1161 self.start_route(route_id).await
1162 }
1163
1164 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1165 let managed = self
1167 .routes
1168 .get_mut(route_id)
1169 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1170
1171 let current_status = managed
1172 .status
1173 .lock()
1174 .expect("status mutex poisoned")
1175 .clone();
1176
1177 if current_status != RouteStatus::Started {
1179 return Err(CamelError::RouteError(format!(
1180 "Cannot suspend route '{}' with status {:?}",
1181 route_id, current_status
1182 )));
1183 }
1184
1185 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1186
1187 *managed.status.lock().expect("status mutex poisoned") = RouteStatus::Stopping;
1189
1190 let managed = self
1192 .routes
1193 .get_mut(route_id)
1194 .expect("invariant: route must exist after prior existence check");
1195 managed.consumer_cancel_token.cancel();
1196
1197 let managed = self
1199 .routes
1200 .get_mut(route_id)
1201 .expect("invariant: route must exist after prior existence check");
1202 let consumer_handle = managed.consumer_handle.take();
1203
1204 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1206 if let Some(handle) = consumer_handle {
1207 let _ = handle.await;
1208 }
1209 })
1210 .await;
1211
1212 if timeout_result.is_err() {
1213 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1214 }
1215
1216 let managed = self
1218 .routes
1219 .get_mut(route_id)
1220 .expect("invariant: route must exist after prior existence check");
1221
1222 managed.consumer_cancel_token = CancellationToken::new();
1224
1225 *managed.status.lock().expect("status mutex poisoned") = RouteStatus::Suspended;
1227
1228 info!(route_id = %route_id, "Route suspended (pipeline still running)");
1229 Ok(())
1230 }
1231
1232 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1233 let managed = self
1235 .routes
1236 .get(route_id)
1237 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1238
1239 let current_status = managed
1240 .status
1241 .lock()
1242 .expect("status mutex poisoned")
1243 .clone();
1244
1245 if current_status != RouteStatus::Suspended {
1246 return Err(CamelError::RouteError(format!(
1247 "Cannot resume route '{}' with status {:?} (expected Suspended)",
1248 route_id, current_status
1249 )));
1250 }
1251
1252 let sender = managed.channel_sender.clone().ok_or_else(|| {
1254 CamelError::RouteError("Suspended route has no channel sender".into())
1255 })?;
1256
1257 let from_uri = managed.from_uri.clone();
1259
1260 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1261
1262 let parsed = parse_uri(&from_uri)?;
1264 let registry = self
1265 .registry
1266 .lock()
1267 .expect("mutex poisoned: another thread panicked while holding this lock");
1268 let component = registry.get_or_err(&parsed.scheme)?;
1269 let endpoint = component.create_endpoint(&from_uri)?;
1270 let mut consumer = endpoint.create_consumer()?;
1271 drop(registry);
1273
1274 let managed = self
1276 .routes
1277 .get_mut(route_id)
1278 .expect("invariant: route must exist after prior existence check");
1279
1280 let consumer_cancel = managed.consumer_cancel_token.child_token();
1282
1283 let status_for_consumer = Arc::clone(&managed.status);
1285 let crash_notifier = self.crash_notifier.clone();
1286
1287 let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1289
1290 let route_id_for_consumer = route_id.to_string();
1292 let consumer_handle = tokio::spawn(async move {
1293 if let Err(e) = consumer.start(consumer_ctx).await {
1294 error!(route_id = %route_id_for_consumer, "Consumer error on resume: {e}");
1295 let error_msg = e.to_string();
1296 *status_for_consumer.lock().expect("status mutex poisoned") =
1297 RouteStatus::Failed(error_msg.clone());
1298
1299 if let Some(tx) = crash_notifier {
1301 let _ = tx
1302 .send(CrashNotification {
1303 route_id: route_id_for_consumer.clone(),
1304 error: error_msg,
1305 })
1306 .await;
1307 }
1308 }
1309 });
1310
1311 let managed = self
1313 .routes
1314 .get_mut(route_id)
1315 .expect("invariant: route must exist after prior existence check");
1316 managed.consumer_handle = Some(consumer_handle);
1317
1318 let mut status_guard = managed.status.lock().expect("status mutex poisoned");
1320 if !matches!(*status_guard, RouteStatus::Failed(_)) {
1321 *status_guard = RouteStatus::Started;
1322 } else {
1323 let failed_status = status_guard.clone();
1325 drop(status_guard);
1326 return Err(CamelError::RouteError(format!(
1327 "Resume failed: {:?}",
1328 failed_status
1329 )));
1330 }
1331 drop(status_guard);
1332
1333 info!(route_id = %route_id, "Route resumed");
1334 Ok(())
1335 }
1336
1337 fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
1338 self.routes
1339 .get(route_id)
1340 .map(|r| r.status.lock().expect("status mutex poisoned").clone())
1341 }
1342
1343 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1344 let route_ids: Vec<String> = {
1347 let mut pairs: Vec<_> = self
1348 .routes
1349 .iter()
1350 .filter(|(_, r)| r.definition.auto_startup())
1351 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1352 .collect();
1353 pairs.sort_by_key(|(_, order)| *order);
1354 pairs.into_iter().map(|(id, _)| id).collect()
1355 };
1356
1357 info!("Starting {} auto-startup routes", route_ids.len());
1358
1359 let mut errors: Vec<String> = Vec::new();
1361 for route_id in route_ids {
1362 if let Err(e) = self.start_route(&route_id).await {
1363 errors.push(format!("Route '{}': {}", route_id, e));
1364 }
1365 }
1366
1367 if !errors.is_empty() {
1368 return Err(CamelError::RouteError(format!(
1369 "Failed to start routes: {}",
1370 errors.join(", ")
1371 )));
1372 }
1373
1374 info!("All auto-startup routes started");
1375 Ok(())
1376 }
1377
1378 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1379 let route_ids: Vec<String> = {
1381 let mut pairs: Vec<_> = self
1382 .routes
1383 .iter()
1384 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1385 .collect();
1386 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1387 pairs.into_iter().map(|(id, _)| id).collect()
1388 };
1389
1390 info!("Stopping {} routes", route_ids.len());
1391
1392 for route_id in route_ids {
1393 let _ = self.stop_route(&route_id).await;
1394 }
1395
1396 info!("All routes stopped");
1397 Ok(())
1398 }
1399}
1400
1401#[async_trait::async_trait]
1402impl RouteControllerInternal for DefaultRouteController {
1403 fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
1404 DefaultRouteController::add_route(self, def)
1405 }
1406
1407 fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError> {
1408 DefaultRouteController::swap_pipeline(self, route_id, pipeline)
1409 }
1410
1411 fn route_from_uri(&self, route_id: &str) -> Option<String> {
1412 DefaultRouteController::route_from_uri(self, route_id)
1414 }
1415
1416 fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
1417 DefaultRouteController::set_error_handler(self, config)
1418 }
1419
1420 fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
1421 DefaultRouteController::set_self_ref(self, self_ref)
1422 }
1423
1424 fn route_count(&self) -> usize {
1425 DefaultRouteController::route_count(self)
1426 }
1427
1428 fn route_ids(&self) -> Vec<String> {
1429 DefaultRouteController::route_ids(self)
1430 }
1431
1432 fn set_tracer_config(&mut self, config: &TracerConfig) {
1433 DefaultRouteController::set_tracer_config(self, config)
1434 }
1435
1436 fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError> {
1437 DefaultRouteController::compile_route_definition(self, def)
1438 }
1439
1440 fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1441 DefaultRouteController::remove_route(self, route_id)
1442 }
1443
1444 async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1445 DefaultRouteController::start_route(self, route_id).await
1446 }
1447
1448 async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1449 DefaultRouteController::stop_route(self, route_id).await
1450 }
1451}
1452
1453#[cfg(test)]
1454mod tests {
1455 use super::*;
1456 use std::sync::Arc;
1457
1458 #[test]
1459 fn test_route_controller_internal_is_object_safe() {
1460 let _: Option<Box<dyn RouteControllerInternal>> = None;
1463 }
1464
1465 #[tokio::test]
1466 async fn test_swap_pipeline_updates_stored_pipeline() {
1467 use camel_api::IdentityProcessor;
1468
1469 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1470 let mut controller = DefaultRouteController::new(registry);
1471
1472 let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1473 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
1474 ));
1475 controller.set_self_ref(controller_arc);
1476
1477 let definition =
1478 crate::route::RouteDefinition::new("timer:tick", vec![]).with_route_id("swap-test");
1479 controller.add_route(definition).unwrap();
1480
1481 let new_pipeline = BoxProcessor::new(IdentityProcessor);
1483 let result = controller.swap_pipeline("swap-test", new_pipeline);
1484 assert!(result.is_ok());
1485
1486 let new_pipeline = BoxProcessor::new(IdentityProcessor);
1488 let result = controller.swap_pipeline("nonexistent", new_pipeline);
1489 assert!(result.is_err());
1490 }
1491
1492 #[tokio::test]
1493 async fn test_add_route_duplicate_id_fails() {
1494 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1495 let mut controller = DefaultRouteController::new(registry);
1496
1497 let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1499 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
1500 ));
1501 controller.set_self_ref(controller_arc);
1502
1503 let definition = crate::route::RouteDefinition::new("timer:tick", vec![])
1504 .with_route_id("duplicate-route");
1505 assert!(controller.add_route(definition).is_ok());
1506
1507 let definition2 = crate::route::RouteDefinition::new("timer:tock", vec![])
1509 .with_route_id("duplicate-route");
1510 let result = controller.add_route(definition2);
1511 assert!(result.is_err());
1512 let err = result.unwrap_err().to_string();
1513 assert!(
1514 err.contains("already exists"),
1515 "error should mention 'already exists', got: {}",
1516 err
1517 );
1518 }
1519
1520 #[tokio::test]
1521 async fn test_add_route_with_id_succeeds() {
1522 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1523 let mut controller = DefaultRouteController::new(registry);
1524
1525 let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1527 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
1528 ));
1529 controller.set_self_ref(controller_arc);
1530
1531 let definition =
1532 crate::route::RouteDefinition::new("timer:tick", vec![]).with_route_id("test-route");
1533 assert!(controller.add_route(definition).is_ok());
1534 assert_eq!(controller.route_count(), 1);
1535 }
1536
1537 #[tokio::test]
1538 async fn test_crashed_consumer_sets_failed_status() {
1539 use async_trait::async_trait;
1540 use camel_api::{CamelError, RouteStatus};
1541 use camel_component::{ConcurrencyModel, ConsumerContext, Endpoint};
1542
1543 struct CrashingConsumer;
1544 #[async_trait]
1545 impl camel_component::Consumer for CrashingConsumer {
1546 async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
1547 Err(CamelError::RouteError("boom".into()))
1548 }
1549 async fn stop(&mut self) -> Result<(), CamelError> {
1550 Ok(())
1551 }
1552 fn concurrency_model(&self) -> ConcurrencyModel {
1553 ConcurrencyModel::Sequential
1554 }
1555 }
1556 struct CrashingEndpoint;
1557 impl Endpoint for CrashingEndpoint {
1558 fn uri(&self) -> &str {
1559 "crash:test"
1560 }
1561 fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
1562 Ok(Box::new(CrashingConsumer))
1563 }
1564 fn create_producer(
1565 &self,
1566 _ctx: &camel_api::ProducerContext,
1567 ) -> Result<camel_api::BoxProcessor, CamelError> {
1568 Err(CamelError::RouteError("no producer".into()))
1569 }
1570 }
1571 struct CrashingComponent;
1572 impl camel_component::Component for CrashingComponent {
1573 fn scheme(&self) -> &str {
1574 "crash"
1575 }
1576 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1577 let _ = uri; Ok(Box::new(CrashingEndpoint))
1579 }
1580 }
1581
1582 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1583 registry.lock().unwrap().register(CrashingComponent);
1584 let mut controller = DefaultRouteController::new(Arc::clone(®istry));
1585 let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1586 DefaultRouteController::new(Arc::clone(®istry)),
1587 ));
1588 controller.set_self_ref(self_ref);
1589
1590 let def =
1591 crate::route::RouteDefinition::new("crash:test", vec![]).with_route_id("crash-route");
1592 controller.add_route(def).unwrap();
1593 controller.start_route("crash-route").await.unwrap();
1594
1595 tokio::time::sleep(std::time::Duration::from_millis(150)).await;
1597
1598 let status = controller.route_status("crash-route").unwrap();
1599 assert!(
1600 matches!(status, RouteStatus::Failed(_)),
1601 "expected Failed, got {:?}",
1602 status
1603 );
1604 }
1605
1606 #[test]
1612 fn test_managed_route_has_independent_cancel_tokens() {
1613 let consumer_token = CancellationToken::new();
1621 let pipeline_token = CancellationToken::new();
1622
1623 assert!(
1625 !consumer_token.is_cancelled(),
1626 "consumer token should start uncancelled"
1627 );
1628 assert!(
1629 !pipeline_token.is_cancelled(),
1630 "pipeline token should start uncancelled"
1631 );
1632
1633 consumer_token.cancel();
1635 assert!(
1636 consumer_token.is_cancelled(),
1637 "consumer token should be cancelled"
1638 );
1639 assert!(
1640 !pipeline_token.is_cancelled(),
1641 "pipeline token should NOT be cancelled when consumer is cancelled"
1642 );
1643
1644 let consumer_token2 = CancellationToken::new();
1646 let pipeline_token2 = CancellationToken::new();
1647
1648 pipeline_token2.cancel();
1649 assert!(
1650 !consumer_token2.is_cancelled(),
1651 "consumer token should NOT be cancelled when pipeline is cancelled"
1652 );
1653 assert!(
1654 pipeline_token2.is_cancelled(),
1655 "pipeline token should be cancelled"
1656 );
1657
1658 }
1662
1663 #[tokio::test]
1665 async fn test_stop_cancels_both_consumer_and_pipeline_tokens() {
1666 use camel_api::IdentityProcessor;
1667 use camel_component::{ConcurrencyModel, ConsumerContext, Endpoint};
1668
1669 struct TrackingConsumer {
1671 cancelled: Arc<std::sync::atomic::AtomicBool>,
1672 }
1673 #[async_trait::async_trait]
1674 impl camel_component::Consumer for TrackingConsumer {
1675 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
1676 ctx.cancelled().await;
1678 self.cancelled
1679 .store(true, std::sync::atomic::Ordering::SeqCst);
1680 Ok(())
1681 }
1682 async fn stop(&mut self) -> Result<(), CamelError> {
1683 Ok(())
1684 }
1685 fn concurrency_model(&self) -> ConcurrencyModel {
1686 ConcurrencyModel::Sequential
1687 }
1688 }
1689 struct TrackingEndpoint {
1690 cancelled: Arc<std::sync::atomic::AtomicBool>,
1691 }
1692 impl Endpoint for TrackingEndpoint {
1693 fn uri(&self) -> &str {
1694 "tracking:test"
1695 }
1696 fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
1697 Ok(Box::new(TrackingConsumer {
1698 cancelled: Arc::clone(&self.cancelled),
1699 }))
1700 }
1701 fn create_producer(
1702 &self,
1703 _ctx: &camel_api::ProducerContext,
1704 ) -> Result<camel_api::BoxProcessor, CamelError> {
1705 Ok(camel_api::BoxProcessor::new(IdentityProcessor))
1706 }
1707 }
1708 struct TrackingComponent {
1709 cancelled: Arc<std::sync::atomic::AtomicBool>,
1710 }
1711 impl camel_component::Component for TrackingComponent {
1712 fn scheme(&self) -> &str {
1713 "tracking"
1714 }
1715 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1716 Ok(Box::new(TrackingEndpoint {
1717 cancelled: Arc::clone(&self.cancelled),
1718 }))
1719 }
1720 }
1721
1722 let cancelled = Arc::new(std::sync::atomic::AtomicBool::new(false));
1723 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1724 registry.lock().unwrap().register(TrackingComponent {
1725 cancelled: Arc::clone(&cancelled),
1726 });
1727 let mut controller = DefaultRouteController::new(Arc::clone(®istry));
1728 let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1729 DefaultRouteController::new(Arc::clone(®istry)),
1730 ));
1731 controller.set_self_ref(self_ref);
1732
1733 let def = crate::route::RouteDefinition::new("tracking:test", vec![])
1734 .with_route_id("tracking-route");
1735 controller.add_route(def).unwrap();
1736
1737 controller.start_route("tracking-route").await.unwrap();
1739 assert_eq!(
1740 controller.route_status("tracking-route"),
1741 Some(RouteStatus::Started)
1742 );
1743
1744 controller.stop_route("tracking-route").await.unwrap();
1746
1747 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1749
1750 assert!(
1752 cancelled.load(std::sync::atomic::Ordering::SeqCst),
1753 "consumer should have been cancelled via consumer_cancel_token"
1754 );
1755
1756 assert_eq!(
1758 controller.route_status("tracking-route"),
1759 Some(RouteStatus::Stopped)
1760 );
1761
1762 controller.start_route("tracking-route").await.unwrap();
1764 assert_eq!(
1765 controller.route_status("tracking-route"),
1766 Some(RouteStatus::Started)
1767 );
1768 controller.stop_route("tracking-route").await.unwrap();
1769 }
1770
1771 #[tokio::test]
1773 async fn test_suspend() {
1774 use camel_api::IdentityProcessor;
1775 use camel_component::{ConcurrencyModel, ConsumerContext, Endpoint};
1776 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1777
1778 struct SuspendableConsumer {
1780 started: Arc<AtomicUsize>,
1781 cancelled: Arc<AtomicBool>,
1782 }
1783 #[async_trait::async_trait]
1784 impl camel_component::Consumer for SuspendableConsumer {
1785 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
1786 self.started.fetch_add(1, Ordering::SeqCst);
1787 ctx.cancelled().await;
1789 self.cancelled.store(true, Ordering::SeqCst);
1790 Ok(())
1791 }
1792 async fn stop(&mut self) -> Result<(), CamelError> {
1793 Ok(())
1794 }
1795 fn concurrency_model(&self) -> ConcurrencyModel {
1796 ConcurrencyModel::Sequential
1797 }
1798 }
1799 struct SuspendableEndpoint {
1800 started: Arc<AtomicUsize>,
1801 cancelled: Arc<AtomicBool>,
1802 }
1803 impl Endpoint for SuspendableEndpoint {
1804 fn uri(&self) -> &str {
1805 "suspend:test"
1806 }
1807 fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
1808 Ok(Box::new(SuspendableConsumer {
1809 started: Arc::clone(&self.started),
1810 cancelled: Arc::clone(&self.cancelled),
1811 }))
1812 }
1813 fn create_producer(
1814 &self,
1815 _ctx: &camel_api::ProducerContext,
1816 ) -> Result<camel_api::BoxProcessor, CamelError> {
1817 Ok(camel_api::BoxProcessor::new(IdentityProcessor))
1818 }
1819 }
1820 struct SuspendableComponent {
1821 started: Arc<AtomicUsize>,
1822 cancelled: Arc<AtomicBool>,
1823 }
1824 impl camel_component::Component for SuspendableComponent {
1825 fn scheme(&self) -> &str {
1826 "suspend"
1827 }
1828 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1829 Ok(Box::new(SuspendableEndpoint {
1830 started: Arc::clone(&self.started),
1831 cancelled: Arc::clone(&self.cancelled),
1832 }))
1833 }
1834 }
1835
1836 let started = Arc::new(AtomicUsize::new(0));
1837 let cancelled = Arc::new(AtomicBool::new(false));
1838 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1839 registry.lock().unwrap().register(SuspendableComponent {
1840 started: Arc::clone(&started),
1841 cancelled: Arc::clone(&cancelled),
1842 });
1843 let mut controller = DefaultRouteController::new(Arc::clone(®istry));
1844 let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1845 DefaultRouteController::new(Arc::clone(®istry)),
1846 ));
1847 controller.set_self_ref(self_ref);
1848
1849 let def = crate::route::RouteDefinition::new("suspend:test", vec![])
1850 .with_route_id("suspend-route");
1851 controller.add_route(def).unwrap();
1852
1853 controller.start_route("suspend-route").await.unwrap();
1855 assert_eq!(
1856 controller.route_status("suspend-route"),
1857 Some(RouteStatus::Started)
1858 );
1859
1860 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1862 assert_eq!(started.load(Ordering::SeqCst), 1);
1863
1864 controller.suspend_route("suspend-route").await.unwrap();
1866 assert_eq!(
1867 controller.route_status("suspend-route"),
1868 Some(RouteStatus::Suspended)
1869 );
1870
1871 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1873
1874 assert!(
1876 cancelled.load(Ordering::SeqCst),
1877 "consumer should have been cancelled during suspend"
1878 );
1879
1880 let result = controller.start_route("suspend-route").await;
1882 assert!(result.is_err());
1883 let err = result.unwrap_err().to_string();
1884 assert!(
1885 err.contains("suspended"),
1886 "error should mention 'suspended', got: {}",
1887 err
1888 );
1889
1890 controller.stop_route("suspend-route").await.unwrap();
1892 assert_eq!(
1893 controller.route_status("suspend-route"),
1894 Some(RouteStatus::Stopped)
1895 );
1896 }
1897
1898 #[tokio::test]
1900 async fn test_resume() {
1901 use camel_api::IdentityProcessor;
1902 use camel_component::{ConcurrencyModel, ConsumerContext, Endpoint};
1903 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1904
1905 struct ResumableConsumer {
1907 started: Arc<AtomicUsize>,
1908 cancelled: Arc<AtomicBool>,
1909 }
1910 #[async_trait::async_trait]
1911 impl camel_component::Consumer for ResumableConsumer {
1912 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
1913 self.started.fetch_add(1, Ordering::SeqCst);
1914 self.cancelled.store(false, Ordering::SeqCst);
1916 ctx.cancelled().await;
1918 self.cancelled.store(true, Ordering::SeqCst);
1919 Ok(())
1920 }
1921 async fn stop(&mut self) -> Result<(), CamelError> {
1922 Ok(())
1923 }
1924 fn concurrency_model(&self) -> ConcurrencyModel {
1925 ConcurrencyModel::Sequential
1926 }
1927 }
1928 struct ResumableEndpoint {
1929 started: Arc<AtomicUsize>,
1930 cancelled: Arc<AtomicBool>,
1931 }
1932 impl Endpoint for ResumableEndpoint {
1933 fn uri(&self) -> &str {
1934 "resume:test"
1935 }
1936 fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
1937 Ok(Box::new(ResumableConsumer {
1938 started: Arc::clone(&self.started),
1939 cancelled: Arc::clone(&self.cancelled),
1940 }))
1941 }
1942 fn create_producer(
1943 &self,
1944 _ctx: &camel_api::ProducerContext,
1945 ) -> Result<camel_api::BoxProcessor, CamelError> {
1946 Ok(camel_api::BoxProcessor::new(IdentityProcessor))
1947 }
1948 }
1949 struct ResumableComponent {
1950 started: Arc<AtomicUsize>,
1951 cancelled: Arc<AtomicBool>,
1952 }
1953 impl camel_component::Component for ResumableComponent {
1954 fn scheme(&self) -> &str {
1955 "resume"
1956 }
1957 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1958 Ok(Box::new(ResumableEndpoint {
1959 started: Arc::clone(&self.started),
1960 cancelled: Arc::clone(&self.cancelled),
1961 }))
1962 }
1963 }
1964
1965 let started = Arc::new(AtomicUsize::new(0));
1966 let cancelled = Arc::new(AtomicBool::new(false));
1967 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1968 registry.lock().unwrap().register(ResumableComponent {
1969 started: Arc::clone(&started),
1970 cancelled: Arc::clone(&cancelled),
1971 });
1972 let mut controller = DefaultRouteController::new(Arc::clone(®istry));
1973 let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1974 DefaultRouteController::new(Arc::clone(®istry)),
1975 ));
1976 controller.set_self_ref(self_ref);
1977
1978 let def =
1979 crate::route::RouteDefinition::new("resume:test", vec![]).with_route_id("resume-route");
1980 controller.add_route(def).unwrap();
1981
1982 controller.start_route("resume-route").await.unwrap();
1984 assert_eq!(
1985 controller.route_status("resume-route"),
1986 Some(RouteStatus::Started)
1987 );
1988
1989 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1991 assert_eq!(started.load(Ordering::SeqCst), 1);
1992
1993 controller.suspend_route("resume-route").await.unwrap();
1995 assert_eq!(
1996 controller.route_status("resume-route"),
1997 Some(RouteStatus::Suspended)
1998 );
1999
2000 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2002 assert!(cancelled.load(Ordering::SeqCst));
2003
2004 controller.resume_route("resume-route").await.unwrap();
2006 assert_eq!(
2007 controller.route_status("resume-route"),
2008 Some(RouteStatus::Started)
2009 );
2010
2011 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2013
2014 assert_eq!(
2016 started.load(Ordering::SeqCst),
2017 2,
2018 "consumer should have been started twice (initial + resume)"
2019 );
2020
2021 assert!(
2023 !cancelled.load(Ordering::SeqCst),
2024 "new consumer should not be cancelled after resume"
2025 );
2026
2027 let result = controller.resume_route("resume-route").await;
2029 assert!(result.is_err());
2030 let err = result.unwrap_err().to_string();
2031 assert!(
2032 err.contains("not suspended") || err.contains("expected Suspended"),
2033 "error should indicate route is not suspended, got: {}",
2034 err
2035 );
2036
2037 controller.stop_route("resume-route").await.unwrap();
2039 assert_eq!(
2040 controller.route_status("resume-route"),
2041 Some(RouteStatus::Stopped)
2042 );
2043
2044 let result = controller.resume_route("resume-route").await;
2046 assert!(result.is_err());
2047 }
2048
2049 #[tokio::test]
2051 async fn test_resume_non_existent_route() {
2052 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2053 let mut controller = DefaultRouteController::new(Arc::clone(®istry));
2054 let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
2055 DefaultRouteController::new(Arc::clone(®istry)),
2056 ));
2057 controller.set_self_ref(self_ref);
2058
2059 let result = controller.resume_route("non-existent").await;
2060 assert!(result.is_err());
2061 let err = result.unwrap_err().to_string();
2062 assert!(
2063 err.contains("not found"),
2064 "error should mention 'not found', got: {}",
2065 err
2066 );
2067 }
2068
2069 #[tokio::test]
2071 async fn test_suspend_non_started_route() {
2072 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2073 let mut controller = DefaultRouteController::new(Arc::clone(®istry));
2074 let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
2075 DefaultRouteController::new(Arc::clone(®istry)),
2076 ));
2077 controller.set_self_ref(self_ref);
2078
2079 let def =
2080 crate::route::RouteDefinition::new("timer:tick", vec![]).with_route_id("test-route");
2081 controller.add_route(def).unwrap();
2082
2083 let result = controller.suspend_route("test-route").await;
2085 assert!(result.is_err());
2086 let err = result.unwrap_err().to_string();
2087 assert!(
2088 err.contains("Cannot suspend") || err.contains("Started"),
2089 "error should indicate cannot suspend, got: {}",
2090 err
2091 );
2092 }
2093
2094 #[tokio::test]
2100 async fn test_stop_from_suspended_cancels_consumer_and_pipeline() {
2101 use camel_api::IdentityProcessor;
2102 use camel_component::{ConcurrencyModel, ConsumerContext, Endpoint};
2103 use std::sync::atomic::{AtomicBool, Ordering};
2104
2105 struct CancellableConsumer {
2107 cancelled: Arc<AtomicBool>,
2108 }
2109 #[async_trait::async_trait]
2110 impl camel_component::Consumer for CancellableConsumer {
2111 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
2112 ctx.cancelled().await;
2113 self.cancelled.store(true, Ordering::SeqCst);
2114 Ok(())
2115 }
2116 async fn stop(&mut self) -> Result<(), CamelError> {
2117 Ok(())
2118 }
2119 fn concurrency_model(&self) -> ConcurrencyModel {
2120 ConcurrencyModel::Sequential
2121 }
2122 }
2123 struct CancellableEndpoint {
2124 cancelled: Arc<AtomicBool>,
2125 }
2126 impl Endpoint for CancellableEndpoint {
2127 fn uri(&self) -> &str {
2128 "cancellable:test"
2129 }
2130 fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
2131 Ok(Box::new(CancellableConsumer {
2132 cancelled: Arc::clone(&self.cancelled),
2133 }))
2134 }
2135 fn create_producer(
2136 &self,
2137 _ctx: &camel_api::ProducerContext,
2138 ) -> Result<camel_api::BoxProcessor, CamelError> {
2139 Ok(camel_api::BoxProcessor::new(IdentityProcessor))
2140 }
2141 }
2142 struct CancellableComponent {
2143 cancelled: Arc<AtomicBool>,
2144 }
2145 impl camel_component::Component for CancellableComponent {
2146 fn scheme(&self) -> &str {
2147 "cancellable"
2148 }
2149 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
2150 Ok(Box::new(CancellableEndpoint {
2151 cancelled: Arc::clone(&self.cancelled),
2152 }))
2153 }
2154 }
2155
2156 let consumer_cancelled = Arc::new(AtomicBool::new(false));
2157 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2158 registry.lock().unwrap().register(CancellableComponent {
2159 cancelled: Arc::clone(&consumer_cancelled),
2160 });
2161 let mut controller = DefaultRouteController::new(Arc::clone(®istry));
2162 let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
2163 DefaultRouteController::new(Arc::clone(®istry)),
2164 ));
2165 controller.set_self_ref(self_ref);
2166
2167 let def = crate::route::RouteDefinition::new("cancellable:test", vec![])
2168 .with_route_id("stop-from-suspended-route");
2169 controller.add_route(def).unwrap();
2170
2171 controller
2173 .start_route("stop-from-suspended-route")
2174 .await
2175 .unwrap();
2176 assert_eq!(
2177 controller.route_status("stop-from-suspended-route"),
2178 Some(RouteStatus::Started)
2179 );
2180
2181 controller
2183 .suspend_route("stop-from-suspended-route")
2184 .await
2185 .unwrap();
2186 assert_eq!(
2187 controller.route_status("stop-from-suspended-route"),
2188 Some(RouteStatus::Suspended)
2189 );
2190
2191 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2193 assert!(
2194 consumer_cancelled.load(Ordering::SeqCst),
2195 "consumer should be cancelled after suspend"
2196 );
2197
2198 consumer_cancelled.store(false, Ordering::SeqCst);
2200
2201 controller
2204 .stop_route("stop-from-suspended-route")
2205 .await
2206 .unwrap();
2207
2208 assert_eq!(
2210 controller.route_status("stop-from-suspended-route"),
2211 Some(RouteStatus::Stopped),
2212 "route must be Stopped after stop_route from Suspended"
2213 );
2214
2215 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2217
2218 controller
2220 .start_route("stop-from-suspended-route")
2221 .await
2222 .unwrap();
2223 assert_eq!(
2224 controller.route_status("stop-from-suspended-route"),
2225 Some(RouteStatus::Started)
2226 );
2227
2228 controller
2229 .stop_route("stop-from-suspended-route")
2230 .await
2231 .unwrap();
2232 }
2233
2234 #[tokio::test]
2236 async fn test_suspend_resume_stop_cycle() {
2237 use camel_api::IdentityProcessor;
2238 use camel_component::{ConcurrencyModel, ConsumerContext, Endpoint};
2239
2240 struct SimpleConsumer;
2242 #[async_trait::async_trait]
2243 impl camel_component::Consumer for SimpleConsumer {
2244 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
2245 ctx.cancelled().await;
2246 Ok(())
2247 }
2248 async fn stop(&mut self) -> Result<(), CamelError> {
2249 Ok(())
2250 }
2251 fn concurrency_model(&self) -> ConcurrencyModel {
2252 ConcurrencyModel::Sequential
2253 }
2254 }
2255 struct SimpleEndpoint;
2256 impl Endpoint for SimpleEndpoint {
2257 fn uri(&self) -> &str {
2258 "simple:test"
2259 }
2260 fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
2261 Ok(Box::new(SimpleConsumer))
2262 }
2263 fn create_producer(
2264 &self,
2265 _ctx: &camel_api::ProducerContext,
2266 ) -> Result<camel_api::BoxProcessor, CamelError> {
2267 Ok(camel_api::BoxProcessor::new(IdentityProcessor))
2268 }
2269 }
2270 struct SimpleComponent;
2271 impl camel_component::Component for SimpleComponent {
2272 fn scheme(&self) -> &str {
2273 "simple"
2274 }
2275 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
2276 Ok(Box::new(SimpleEndpoint))
2277 }
2278 }
2279
2280 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2281 registry.lock().unwrap().register(SimpleComponent);
2282 let mut controller = DefaultRouteController::new(Arc::clone(®istry));
2283 let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
2284 DefaultRouteController::new(Arc::clone(®istry)),
2285 ));
2286 controller.set_self_ref(self_ref);
2287
2288 let def =
2289 crate::route::RouteDefinition::new("simple:test", vec![]).with_route_id("cycle-route");
2290 controller.add_route(def).unwrap();
2291
2292 controller.start_route("cycle-route").await.unwrap();
2294 assert_eq!(
2295 controller.route_status("cycle-route"),
2296 Some(RouteStatus::Started)
2297 );
2298
2299 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2300
2301 controller.suspend_route("cycle-route").await.unwrap();
2302 assert_eq!(
2303 controller.route_status("cycle-route"),
2304 Some(RouteStatus::Suspended)
2305 );
2306
2307 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2308
2309 controller.resume_route("cycle-route").await.unwrap();
2310 assert_eq!(
2311 controller.route_status("cycle-route"),
2312 Some(RouteStatus::Started)
2313 );
2314
2315 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2316
2317 controller.suspend_route("cycle-route").await.unwrap();
2318 assert_eq!(
2319 controller.route_status("cycle-route"),
2320 Some(RouteStatus::Suspended)
2321 );
2322
2323 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2324
2325 controller.stop_route("cycle-route").await.unwrap();
2327 assert_eq!(
2328 controller.route_status("cycle-route"),
2329 Some(RouteStatus::Stopped)
2330 );
2331
2332 controller.start_route("cycle-route").await.unwrap();
2334 assert_eq!(
2335 controller.route_status("cycle-route"),
2336 Some(RouteStatus::Started)
2337 );
2338
2339 controller.stop_route("cycle-route").await.unwrap();
2340 }
2341}