1use std::sync::Arc;
7use std::time::Instant;
8
9use camel_api::{MetricsCollector, RouteController, SupervisionConfig};
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tracing::{error, info};
13
14pub(crate) use super::controller_actor_commands::RouteControllerCommand;
15pub use super::controller_actor_commands::RouteControllerHandle;
16use super::route_controller::DefaultRouteController;
17use super::route_helpers::CrashNotification;
18
19pub fn spawn_controller_actor(
20 controller: DefaultRouteController,
21) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
22 let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
23 let handle = tokio::spawn(async move {
24 let mut controller = controller;
25 while let Some(cmd) = rx.recv().await {
26 match cmd {
27 RouteControllerCommand::StartRoute { route_id, reply } => {
28 let _ = reply.send(controller.start_route(&route_id).await);
29 }
30 RouteControllerCommand::StopRoute { route_id, reply } => {
31 let _ = reply.send(controller.stop_route(&route_id).await);
32 }
33 RouteControllerCommand::RestartRoute { route_id, reply } => {
34 let _ = reply.send(controller.restart_route(&route_id).await);
35 }
36 RouteControllerCommand::SuspendRoute { route_id, reply } => {
37 let _ = reply.send(controller.suspend_route(&route_id).await);
38 }
39 RouteControllerCommand::ResumeRoute { route_id, reply } => {
40 let _ = reply.send(controller.resume_route(&route_id).await);
41 }
42 RouteControllerCommand::StartAllRoutes { reply } => {
43 let _ = reply.send(controller.start_all_routes().await);
44 }
45 RouteControllerCommand::StopAllRoutes { reply } => {
46 let _ = reply.send(controller.stop_all_routes().await);
47 }
48 RouteControllerCommand::AddRoute { definition, reply } => {
49 let _ = reply.send(controller.add_route(definition).await);
50 }
51 RouteControllerCommand::RemoveRoute { route_id, reply } => {
52 let _ = reply.send(controller.remove_route(&route_id).await);
53 }
54 RouteControllerCommand::SwapPipeline {
55 route_id,
56 pipeline,
57 reply,
58 } => {
59 let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
60 }
61 RouteControllerCommand::SwapPipelineRaw {
62 route_id,
63 pipeline,
64 lifecycle,
65 reply,
66 } => {
67 let _ =
68 reply.send(controller.swap_pipeline_raw(&route_id, pipeline, lifecycle));
69 }
70 RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
71 let _ = reply.send(controller.compile_route_definition(definition));
72 }
73 RouteControllerCommand::CompileRouteDefinitionWithGeneration {
74 definition,
75 generation,
76 reply,
77 } => {
78 let _ = reply.send(
79 controller.compile_route_definition_with_generation(definition, generation),
80 );
81 }
82 RouteControllerCommand::CompileRouteDefinitionPipeline {
83 definition,
84 generation,
85 reply,
86 } => {
87 let _ = reply
88 .send(controller.compile_route_definition_pipeline(definition, generation));
89 }
90 RouteControllerCommand::CompileRouteDefinitionDryPipeline { definition, reply } => {
91 let _ =
92 reply.send(controller.compile_route_definition_dry_pipeline(definition));
93 }
94 RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
95 definition,
96 generation,
97 reply,
98 } => {
99 let _ = reply.send(
100 controller.prepare_route_definition_with_generation(definition, generation),
101 );
102 }
103 RouteControllerCommand::InsertPreparedRoute { prepared, reply } => {
104 let _ = reply.send(controller.insert_prepared_route(prepared));
105 }
106 RouteControllerCommand::RemoveRoutePreservingFunctions { route_id, reply } => {
107 let _ = reply.send(
108 controller
109 .remove_route_preserving_functions(&route_id)
110 .await,
111 );
112 }
113 RouteControllerCommand::RouteFromUri { route_id, reply } => {
114 let _ = reply.send(controller.route_from_uri(&route_id));
115 }
116 RouteControllerCommand::SetErrorHandler { config } => {
117 controller.set_error_handler(config);
118 }
119 RouteControllerCommand::SetTracerConfig { config } => {
120 controller.set_tracer_config(&config);
121 }
122 RouteControllerCommand::RouteCount { reply } => {
123 let _ = reply.send(controller.route_count());
124 }
125 RouteControllerCommand::InFlightCount { route_id, reply } => {
126 let _ = reply.send(controller.in_flight_count(&route_id));
127 }
128 RouteControllerCommand::RouteExists { route_id, reply } => {
129 let _ = reply.send(controller.route_exists(&route_id));
130 }
131 RouteControllerCommand::RouteIds { reply } => {
132 let _ = reply.send(controller.route_ids());
133 }
134 RouteControllerCommand::AutoStartupRouteIds { reply } => {
135 let _ = reply.send(controller.auto_startup_route_ids());
136 }
137 RouteControllerCommand::ShutdownRouteIds { reply } => {
138 let _ = reply.send(controller.shutdown_route_ids());
139 }
140 RouteControllerCommand::GetPipeline { route_id, reply } => {
141 let _ = reply.send(controller.get_pipeline(&route_id));
142 }
143 RouteControllerCommand::StartRouteReload { route_id, reply } => {
144 let _ = reply.send(controller.start_route_reload(&route_id).await);
145 }
146 RouteControllerCommand::StopRouteReload { route_id, reply } => {
147 let _ = reply.send(controller.stop_route_reload(&route_id).await);
148 }
149 RouteControllerCommand::SetRuntimeHandle { runtime } => {
150 controller.set_runtime_handle(runtime);
151 }
152 RouteControllerCommand::SetFunctionInvoker { invoker } => {
153 controller.set_function_invoker(invoker);
154 }
155 RouteControllerCommand::RouteSourceHash { route_id, reply } => {
156 let _ = reply.send(controller.route_source_hash(&route_id));
157 }
158 RouteControllerCommand::RouteHasLifecycle { route_id, reply } => {
159 let _ = reply.send(controller.route_has_lifecycle(&route_id));
160 }
161 RouteControllerCommand::Shutdown => {
162 break;
163 }
164 }
165 }
166 });
167 (RouteControllerHandle { tx }, handle)
168}
169
170pub fn spawn_supervision_task(
171 controller: RouteControllerHandle,
172 config: SupervisionConfig,
173 _metrics: Option<Arc<dyn MetricsCollector>>,
174 mut crash_rx: mpsc::Receiver<CrashNotification>,
175) -> JoinHandle<()> {
176 tokio::spawn(async move {
177 let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
178 let mut last_restart_time: std::collections::HashMap<String, Instant> =
179 std::collections::HashMap::new();
180 let mut currently_restarting: std::collections::HashSet<String> =
181 std::collections::HashSet::new();
182
183 info!("Supervision loop started");
184
185 while let Some(notification) = crash_rx.recv().await {
186 let route_id = notification.route_id;
187 if currently_restarting.contains(&route_id) {
188 continue;
189 }
190
191 if let Some(last_time) = last_restart_time.get(&route_id)
192 && last_time.elapsed() >= config.initial_delay
193 {
194 attempts.insert(route_id.clone(), 0);
195 }
196
197 let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
198 *current_attempt += 1;
199
200 if config
201 .max_attempts
202 .is_some_and(|max| *current_attempt > max)
203 {
204 error!(
206 route_id = %route_id,
207 attempts = *current_attempt,
208 "Route exceeded max restart attempts, giving up"
209 );
210 continue;
211 }
212
213 let delay = config.next_delay(*current_attempt);
214 currently_restarting.insert(route_id.clone());
215 tokio::time::sleep(delay).await;
216
217 match controller.restart_route(route_id.clone()).await {
218 Ok(()) => {
219 info!(route_id = %route_id, "Route restarted successfully");
220 last_restart_time.insert(route_id.clone(), Instant::now());
221 }
222 Err(err) => {
223 error!(route_id = %route_id, error = %err, "Failed to restart route");
225 }
226 }
227
228 currently_restarting.remove(&route_id);
229 }
230
231 info!("Supervision loop ended");
232 })
233}
234
235#[cfg(test)]
236mod tests {
237 use super::{
238 RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
239 spawn_supervision_task,
240 };
241 use crate::lifecycle::adapters::route_controller::DefaultRouteController;
242 use crate::lifecycle::adapters::route_helpers::CrashNotification;
243 use crate::lifecycle::application::route_definition::RouteDefinition;
244 use crate::shared::components::domain::Registry;
245 use crate::shared::observability::domain::TracerConfig;
246 use camel_api::function::PrepareToken;
247 use camel_api::{
248 CamelError, ErrorHandlerConfig, Exchange, ExchangePatch, FunctionDefinition, FunctionDiff,
249 FunctionId, FunctionInvocationError, FunctionInvoker, FunctionInvokerSync, RuntimeCommand,
250 RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult,
251 SupervisionConfig,
252 };
253 use std::sync::Arc;
254 use std::time::Duration;
255 use tokio::sync::mpsc;
256 use tokio::time::sleep;
257
258 fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
259 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
260 {
261 let mut guard = registry.lock().expect("lock");
262 guard.register(std::sync::Arc::new(
263 camel_component_timer::TimerComponent::new(),
264 ));
265 guard.register(std::sync::Arc::new(
266 camel_component_mock::MockComponent::new(),
267 ));
268 }
269 let controller = DefaultRouteController::new(
270 Arc::clone(®istry),
271 Arc::new(camel_api::NoopPlatformService::default()),
272 );
273 spawn_controller_actor(controller)
274 }
275
276 fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
277 let controller = DefaultRouteController::new(
278 Arc::new(std::sync::Mutex::new(Registry::new())),
279 Arc::new(camel_api::NoopPlatformService::default()),
280 );
281 spawn_controller_actor(controller)
282 }
283
284 fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
285 RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
286 }
287
288 struct NoopRuntime;
289 struct NoopInvoker;
290
291 #[async_trait::async_trait]
292 impl RuntimeCommandBus for NoopRuntime {
293 async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
294 Ok(RuntimeCommandResult::Accepted)
295 }
296 }
297
298 #[async_trait::async_trait]
299 impl RuntimeQueryBus for NoopRuntime {
300 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
301 Ok(match query {
302 RuntimeQuery::GetRouteStatus { route_id }
303 | RuntimeQuery::InFlightCount { route_id } => {
304 RuntimeQueryResult::RouteNotFound { route_id }
305 }
306 RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
307 route_ids: Vec::new(),
308 },
309 })
310 }
311 }
312
313 impl FunctionInvokerSync for NoopInvoker {
314 fn stage_pending(
315 &self,
316 _def: FunctionDefinition,
317 _route_id: Option<&str>,
318 _generation: u64,
319 ) {
320 }
321 fn discard_staging(&self, _generation: u64) {}
322 fn begin_reload(&self) -> u64 {
323 1
324 }
325 fn function_refs_for_route(&self, _route_id: &str) -> Vec<(FunctionId, Option<String>)> {
326 vec![]
327 }
328 fn staged_refs_for_route(
329 &self,
330 _route_id: &str,
331 _generation: u64,
332 ) -> Vec<(FunctionId, Option<String>)> {
333 vec![]
334 }
335 fn staged_defs_for_route(
336 &self,
337 _route_id: &str,
338 _generation: u64,
339 ) -> Vec<(FunctionDefinition, Option<String>)> {
340 vec![]
341 }
342 }
343
344 #[async_trait::async_trait]
345 impl FunctionInvoker for NoopInvoker {
346 async fn register(
347 &self,
348 _def: FunctionDefinition,
349 _route_id: Option<&str>,
350 ) -> Result<(), FunctionInvocationError> {
351 Ok(())
352 }
353 async fn unregister(
354 &self,
355 _id: &FunctionId,
356 _route_id: Option<&str>,
357 ) -> Result<(), FunctionInvocationError> {
358 Ok(())
359 }
360 async fn invoke(
361 &self,
362 _id: &FunctionId,
363 _exchange: &Exchange,
364 ) -> Result<ExchangePatch, FunctionInvocationError> {
365 Ok(ExchangePatch::default())
366 }
367 async fn prepare_reload(
368 &self,
369 _diff: FunctionDiff,
370 _generation: u64,
371 ) -> Result<PrepareToken, FunctionInvocationError> {
372 Ok(PrepareToken::default())
373 }
374 async fn finalize_reload(
375 &self,
376 _diff: &FunctionDiff,
377 _generation: u64,
378 ) -> Result<(), FunctionInvocationError> {
379 Ok(())
380 }
381 async fn rollback_reload(
382 &self,
383 _token: PrepareToken,
384 _generation: u64,
385 ) -> Result<(), FunctionInvocationError> {
386 Ok(())
387 }
388 async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
389 Ok(())
390 }
391 }
392
393 #[tokio::test]
394 async fn start_route_sends_command_and_returns_reply() {
395 let (tx, mut rx) = mpsc::channel(1);
396 let handle = RouteControllerHandle { tx };
397
398 let task = tokio::spawn(async move { handle.start_route("route-a").await });
399
400 let command = rx.recv().await.expect("command should be received");
401 match command {
402 RouteControllerCommand::StartRoute { route_id, reply } => {
403 assert_eq!(route_id, "route-a");
404 let _ = reply.send(Ok(()));
405 }
406 _ => panic!("unexpected command variant"),
407 }
408
409 let result = task.await.expect("join should succeed");
410 assert!(result.is_ok());
411 }
412
413 #[tokio::test]
414 async fn start_route_returns_error_when_actor_stops() {
415 let (tx, rx) = mpsc::channel(1);
416 drop(rx);
417
418 let handle = RouteControllerHandle { tx };
419 let result = handle.start_route("route-a").await;
420
421 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
422 }
423
424 #[tokio::test]
425 async fn spawn_controller_actor_processes_commands_and_shutdown() {
426 let controller = DefaultRouteController::new(
427 Arc::new(std::sync::Mutex::new(Registry::new())),
428 Arc::new(camel_api::NoopPlatformService::default()),
429 );
430 let (handle, join_handle) = spawn_controller_actor(controller);
431
432 assert_eq!(handle.route_count().await.expect("route_count"), 0);
433 assert_eq!(
434 handle.route_ids().await.expect("route_ids"),
435 Vec::<String>::new()
436 );
437
438 handle.shutdown().await.expect("shutdown send");
439 join_handle.await.expect("actor join");
440 }
441
442 #[tokio::test]
443 async fn actor_handle_introspection_and_mutation_commands() {
444 let (handle, join_handle) = build_actor_with_components();
445 let definition = route_def("h-1", "timer:tick?period=100");
446
447 handle.add_route(definition).await.expect("add route");
448 assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
449 assert!(
450 !handle
451 .route_exists("no-such")
452 .await
453 .expect("route exists no-such")
454 );
455
456 let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
457 assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
458 assert_eq!(handle.route_count().await.expect("route_count"), 1);
459
460 let auto_ids = handle
461 .auto_startup_route_ids()
462 .await
463 .expect("auto_startup_route_ids");
464 assert!(auto_ids.iter().any(|id| id == "h-1"));
465
466 let shutdown_ids = handle
467 .shutdown_route_ids()
468 .await
469 .expect("shutdown_route_ids");
470 assert!(shutdown_ids.iter().any(|id| id == "h-1"));
471
472 let compiled = handle
473 .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
474 .await
475 .expect("compile_route_definition");
476
477 assert!(
478 handle
479 .get_pipeline("h-1")
480 .await
481 .expect("get_pipeline")
482 .is_some()
483 );
484 handle
485 .swap_pipeline("h-1", compiled)
486 .await
487 .expect("swap_pipeline");
488
489 let _ = handle
490 .in_flight_count("h-1")
491 .await
492 .expect("in_flight_count");
493 let _ = handle.route_source_hash("h-1").await;
494
495 handle
496 .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
497 .await
498 .expect("set_error_handler");
499 handle
500 .set_tracer_config(TracerConfig::default())
501 .await
502 .expect("set_tracer_config");
503 handle
504 .set_runtime_handle(Arc::new(NoopRuntime))
505 .await
506 .expect("set_runtime_handle");
507
508 handle.remove_route("h-1").await.expect("remove_route");
509 assert_eq!(
510 handle
511 .route_count()
512 .await
513 .expect("route_count after remove"),
514 0
515 );
516 handle
517 .stop_all_routes()
518 .await
519 .expect("stop_all_routes on empty");
520
521 handle.shutdown().await.expect("shutdown send");
522 join_handle.await.expect("actor join");
523 }
524
525 #[tokio::test]
526 async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
527 let (handle, join_handle) = build_actor_with_components();
528 handle
529 .add_route(route_def("lc-1", "timer:tick?period=50"))
530 .await
531 .expect("add route lc-1");
532
533 handle.start_route("lc-1").await.expect("start_route");
534 sleep(Duration::from_millis(20)).await;
535
536 handle.restart_route("lc-1").await.expect("restart_route");
537 sleep(Duration::from_millis(20)).await;
538
539 handle.suspend_route("lc-1").await.expect("suspend_route");
540 handle.resume_route("lc-1").await.expect("resume_route");
541 sleep(Duration::from_millis(20)).await;
542
543 handle.stop_route("lc-1").await.expect("stop_route");
544 handle.start_all_routes().await.expect("start_all_routes");
545 sleep(Duration::from_millis(20)).await;
546 handle.stop_all_routes().await.expect("stop_all_routes");
547
548 handle
549 .start_route_reload("lc-1")
550 .await
551 .expect("start_route_reload");
552 handle
553 .stop_route_reload("lc-1")
554 .await
555 .expect("stop_route_reload");
556
557 handle.shutdown().await.expect("shutdown send");
558 join_handle.await.expect("actor join");
559 }
560
561 #[tokio::test]
562 async fn spawn_supervision_restarts_route_on_crash() {
563 let (handle, join_handle) = build_actor_with_components();
564 handle
565 .add_route(route_def("sup-1", "timer:tick?period=100"))
566 .await
567 .expect("add route sup-1");
568 handle
569 .start_route("sup-1")
570 .await
571 .expect("start_route sup-1");
572
573 let (crash_tx, crash_rx) = mpsc::channel(8);
574 let supervision = spawn_supervision_task(
575 handle.clone(),
576 SupervisionConfig {
577 initial_delay: Duration::from_millis(10),
578 max_attempts: Some(2),
579 ..SupervisionConfig::default()
580 },
581 None,
582 crash_rx,
583 );
584
585 crash_tx
586 .send(CrashNotification {
587 route_id: "sup-1".to_string(),
588 error: "simulated".to_string(),
589 })
590 .await
591 .expect("send crash notification");
592
593 sleep(Duration::from_millis(150)).await;
594 drop(crash_tx);
595 supervision.await.expect("supervision join");
596
597 handle.shutdown().await.expect("shutdown send");
598 join_handle.await.expect("actor join");
599 }
600
601 #[tokio::test]
602 async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
603 let (handle, join_handle) = build_actor_with_components();
604 handle
605 .add_route(route_def("sup-2", "timer:tick?period=100"))
606 .await
607 .expect("add route sup-2");
608 handle
609 .start_route("sup-2")
610 .await
611 .expect("start_route sup-2");
612
613 let (crash_tx, crash_rx) = mpsc::channel(8);
614 let supervision = spawn_supervision_task(
615 handle.clone(),
616 SupervisionConfig {
617 initial_delay: Duration::from_millis(10),
618 max_attempts: Some(1),
619 ..SupervisionConfig::default()
620 },
621 None,
622 crash_rx,
623 );
624
625 crash_tx
626 .send(CrashNotification {
627 route_id: "sup-2".to_string(),
628 error: "attempt-1".to_string(),
629 })
630 .await
631 .expect("send crash attempt-1");
632 crash_tx
633 .send(CrashNotification {
634 route_id: "sup-2".to_string(),
635 error: "attempt-2".to_string(),
636 })
637 .await
638 .expect("send crash attempt-2");
639
640 sleep(Duration::from_millis(200)).await;
641 drop(crash_tx);
642 supervision.await.expect("supervision join");
643
644 handle.shutdown().await.expect("shutdown send");
645 join_handle.await.expect("actor join");
646 }
647
648 #[tokio::test]
649 async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
650 let (handle, join_handle) = build_empty_actor();
651
652 handle
653 .try_set_runtime_handle(Arc::new(NoopRuntime))
654 .expect("try_set_runtime_handle should succeed");
655
656 handle.shutdown().await.expect("shutdown send");
657 join_handle.await.expect("actor join");
658 }
659
660 #[tokio::test]
661 async fn shutdown_returns_error_when_actor_stopped() {
662 let (tx, rx) = mpsc::channel(1);
663 drop(rx);
664
665 let handle = RouteControllerHandle { tx };
666 let result = handle.shutdown().await;
667
668 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
669 }
670
671 #[tokio::test]
672 async fn handle_methods_send_expected_commands_and_receive_replies() {
673 let (tx, mut rx) = mpsc::channel(16);
674 let handle = RouteControllerHandle { tx };
675
676 let stop_task = tokio::spawn({
677 let h = handle.clone();
678 async move { h.stop_route("r-1").await }
679 });
680 let cmd = rx.recv().await.expect("stop command");
681 match cmd {
682 RouteControllerCommand::StopRoute { route_id, reply } => {
683 assert_eq!(route_id, "r-1");
684 let _ = reply.send(Ok(()));
685 }
686 _ => panic!("unexpected command"),
687 }
688 assert!(stop_task.await.expect("join").is_ok());
689
690 let exists_task = tokio::spawn({
691 let h = handle.clone();
692 async move { h.route_exists("r-2").await }
693 });
694 let cmd = rx.recv().await.expect("exists command");
695 match cmd {
696 RouteControllerCommand::RouteExists { route_id, reply } => {
697 assert_eq!(route_id, "r-2");
698 let _ = reply.send(true);
699 }
700 _ => panic!("unexpected command"),
701 }
702 assert!(exists_task.await.expect("join").expect("ok"));
703
704 let hash_task = tokio::spawn({
705 let h = handle.clone();
706 async move { h.route_source_hash("r-3").await }
707 });
708 let cmd = rx.recv().await.expect("hash command");
709 match cmd {
710 RouteControllerCommand::RouteSourceHash { route_id, reply } => {
711 assert_eq!(route_id, "r-3");
712 let _ = reply.send(Some(77));
713 }
714 _ => panic!("unexpected command"),
715 }
716 assert_eq!(hash_task.await.expect("join"), Some(77));
717 }
718
719 #[tokio::test]
720 async fn handle_methods_error_on_dropped_reply_channel() {
721 let (tx, mut rx) = mpsc::channel(16);
722 let handle = RouteControllerHandle { tx };
723
724 let count_task = tokio::spawn({
725 let h = handle.clone();
726 async move { h.route_count().await }
727 });
728 let cmd = rx.recv().await.expect("route_count command");
729 match cmd {
730 RouteControllerCommand::RouteCount { reply } => drop(reply),
731 _ => panic!("unexpected command"),
732 }
733 assert!(matches!(
734 count_task.await.expect("join"),
735 Err(CamelError::ProcessorError(_))
736 ));
737
738 let stop_task = tokio::spawn({
739 let h = handle.clone();
740 async move { h.stop_route("x").await }
741 });
742 let cmd = rx.recv().await.expect("stop command");
743 match cmd {
744 RouteControllerCommand::StopRoute { reply, .. } => drop(reply),
745 _ => panic!("unexpected command"),
746 }
747 assert!(matches!(
748 stop_task.await.expect("join"),
749 Err(CamelError::ProcessorError(_))
750 ));
751
752 let maybe_hash = tokio::spawn({
753 let h = handle.clone();
754 async move { h.route_source_hash("x").await }
755 });
756 let cmd = rx.recv().await.expect("hash command");
757 match cmd {
758 RouteControllerCommand::RouteSourceHash { reply, .. } => drop(reply),
759 _ => panic!("unexpected command"),
760 }
761 assert_eq!(maybe_hash.await.expect("join"), None);
762 }
763
764 #[test]
765 fn try_set_function_invoker_returns_mailbox_full() {
766 let (tx, mut rx) = mpsc::channel(1);
767 tx.try_send(RouteControllerCommand::Shutdown)
768 .expect("fill mailbox");
769 let handle = RouteControllerHandle { tx };
770
771 let result = handle.try_set_function_invoker(Arc::new(NoopInvoker));
772 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
773
774 rx.try_recv().expect("mailbox still has first message");
775 }
776}