1use std::sync::Arc;
7use std::time::Instant;
8
9use camel_api::{MetricsCollector, RouteController, SupervisionConfig};
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tracing::{error, info};
13
14pub(crate) use super::controller_actor_commands::RouteControllerCommand;
15pub use super::controller_actor_commands::RouteControllerHandle;
16use super::route_controller::DefaultRouteController;
17use super::route_helpers::CrashNotification;
18
19pub fn spawn_controller_actor(
20 controller: DefaultRouteController,
21) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
22 let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
23 let handle = tokio::spawn(async move {
24 let mut controller = controller;
25 while let Some(cmd) = rx.recv().await {
26 match cmd {
27 RouteControllerCommand::StartRoute { route_id, reply } => {
28 let _ = reply.send(controller.start_route(&route_id).await);
29 }
30 RouteControllerCommand::StopRoute { route_id, reply } => {
31 let _ = reply.send(controller.stop_route(&route_id).await);
32 }
33 RouteControllerCommand::RestartRoute { route_id, reply } => {
34 let _ = reply.send(controller.restart_route(&route_id).await);
35 }
36 RouteControllerCommand::SuspendRoute { route_id, reply } => {
37 let _ = reply.send(controller.suspend_route(&route_id).await);
38 }
39 RouteControllerCommand::ResumeRoute { route_id, reply } => {
40 let _ = reply.send(controller.resume_route(&route_id).await);
41 }
42 RouteControllerCommand::StartAllRoutes { reply } => {
43 let _ = reply.send(controller.start_all_routes().await);
44 }
45 RouteControllerCommand::StopAllRoutes { reply } => {
46 let _ = reply.send(controller.stop_all_routes().await);
47 }
48 RouteControllerCommand::AddRoute { definition, reply } => {
49 let _ = reply.send(controller.add_route(definition).await);
50 }
51 RouteControllerCommand::RemoveRoute { route_id, reply } => {
52 let _ = reply.send(controller.remove_route(&route_id).await);
53 }
54 RouteControllerCommand::SwapPipeline {
55 route_id,
56 pipeline,
57 reply,
58 } => {
59 let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
60 }
61 RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
62 let _ = reply.send(controller.compile_route_definition(definition));
63 }
64 RouteControllerCommand::CompileRouteDefinitionWithGeneration {
65 definition,
66 generation,
67 reply,
68 } => {
69 let _ = reply.send(
70 controller.compile_route_definition_with_generation(definition, generation),
71 );
72 }
73 RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
74 definition,
75 generation,
76 reply,
77 } => {
78 let _ = reply.send(
79 controller.prepare_route_definition_with_generation(definition, generation),
80 );
81 }
82 RouteControllerCommand::InsertPreparedRoute { prepared, reply } => {
83 let _ = reply.send(controller.insert_prepared_route(prepared));
84 }
85 RouteControllerCommand::RemoveRoutePreservingFunctions { route_id, reply } => {
86 let _ = reply.send(
87 controller
88 .remove_route_preserving_functions(&route_id)
89 .await,
90 );
91 }
92 RouteControllerCommand::RouteFromUri { route_id, reply } => {
93 let _ = reply.send(controller.route_from_uri(&route_id));
94 }
95 RouteControllerCommand::SetErrorHandler { config } => {
96 controller.set_error_handler(config);
97 }
98 RouteControllerCommand::SetTracerConfig { config } => {
99 controller.set_tracer_config(&config);
100 }
101 RouteControllerCommand::RouteCount { reply } => {
102 let _ = reply.send(controller.route_count());
103 }
104 RouteControllerCommand::InFlightCount { route_id, reply } => {
105 let _ = reply.send(controller.in_flight_count(&route_id));
106 }
107 RouteControllerCommand::RouteExists { route_id, reply } => {
108 let _ = reply.send(controller.route_exists(&route_id));
109 }
110 RouteControllerCommand::RouteIds { reply } => {
111 let _ = reply.send(controller.route_ids());
112 }
113 RouteControllerCommand::AutoStartupRouteIds { reply } => {
114 let _ = reply.send(controller.auto_startup_route_ids());
115 }
116 RouteControllerCommand::ShutdownRouteIds { reply } => {
117 let _ = reply.send(controller.shutdown_route_ids());
118 }
119 RouteControllerCommand::GetPipeline { route_id, reply } => {
120 let _ = reply.send(controller.get_pipeline(&route_id));
121 }
122 RouteControllerCommand::StartRouteReload { route_id, reply } => {
123 let _ = reply.send(controller.start_route_reload(&route_id).await);
124 }
125 RouteControllerCommand::StopRouteReload { route_id, reply } => {
126 let _ = reply.send(controller.stop_route_reload(&route_id).await);
127 }
128 RouteControllerCommand::SetRuntimeHandle { runtime } => {
129 controller.set_runtime_handle(runtime);
130 }
131 RouteControllerCommand::SetFunctionInvoker { invoker } => {
132 controller.set_function_invoker(invoker);
133 }
134 RouteControllerCommand::RouteSourceHash { route_id, reply } => {
135 let _ = reply.send(controller.route_source_hash(&route_id));
136 }
137 RouteControllerCommand::Shutdown => {
138 break;
139 }
140 }
141 }
142 });
143 (RouteControllerHandle { tx }, handle)
144}
145
146pub fn spawn_supervision_task(
147 controller: RouteControllerHandle,
148 config: SupervisionConfig,
149 _metrics: Option<Arc<dyn MetricsCollector>>,
150 mut crash_rx: mpsc::Receiver<CrashNotification>,
151) -> JoinHandle<()> {
152 tokio::spawn(async move {
153 let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
154 let mut last_restart_time: std::collections::HashMap<String, Instant> =
155 std::collections::HashMap::new();
156 let mut currently_restarting: std::collections::HashSet<String> =
157 std::collections::HashSet::new();
158
159 info!("Supervision loop started");
160
161 while let Some(notification) = crash_rx.recv().await {
162 let route_id = notification.route_id;
163 if currently_restarting.contains(&route_id) {
164 continue;
165 }
166
167 if let Some(last_time) = last_restart_time.get(&route_id)
168 && last_time.elapsed() >= config.initial_delay
169 {
170 attempts.insert(route_id.clone(), 0);
171 }
172
173 let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
174 *current_attempt += 1;
175
176 if config
177 .max_attempts
178 .is_some_and(|max| *current_attempt > max)
179 {
180 error!(
182 route_id = %route_id,
183 attempts = *current_attempt,
184 "Route exceeded max restart attempts, giving up"
185 );
186 continue;
187 }
188
189 let delay = config.next_delay(*current_attempt);
190 currently_restarting.insert(route_id.clone());
191 tokio::time::sleep(delay).await;
192
193 match controller.restart_route(route_id.clone()).await {
194 Ok(()) => {
195 info!(route_id = %route_id, "Route restarted successfully");
196 last_restart_time.insert(route_id.clone(), Instant::now());
197 }
198 Err(err) => {
199 error!(route_id = %route_id, error = %err, "Failed to restart route");
201 }
202 }
203
204 currently_restarting.remove(&route_id);
205 }
206
207 info!("Supervision loop ended");
208 })
209}
210
211#[cfg(test)]
212mod tests {
213 use super::{
214 RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
215 spawn_supervision_task,
216 };
217 use crate::lifecycle::adapters::route_controller::DefaultRouteController;
218 use crate::lifecycle::adapters::route_helpers::CrashNotification;
219 use crate::lifecycle::application::route_definition::RouteDefinition;
220 use crate::shared::components::domain::Registry;
221 use crate::shared::observability::domain::TracerConfig;
222 use camel_api::function::PrepareToken;
223 use camel_api::{
224 CamelError, ErrorHandlerConfig, Exchange, ExchangePatch, FunctionDefinition, FunctionDiff,
225 FunctionId, FunctionInvocationError, FunctionInvoker, FunctionInvokerSync, RuntimeCommand,
226 RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult,
227 SupervisionConfig,
228 };
229 use std::sync::Arc;
230 use std::time::Duration;
231 use tokio::sync::mpsc;
232 use tokio::time::sleep;
233
234 fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
235 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
236 {
237 let mut guard = registry.lock().expect("lock");
238 guard.register(std::sync::Arc::new(
239 camel_component_timer::TimerComponent::new(),
240 ));
241 guard.register(std::sync::Arc::new(
242 camel_component_mock::MockComponent::new(),
243 ));
244 }
245 let controller = DefaultRouteController::new(
246 Arc::clone(®istry),
247 Arc::new(camel_api::NoopPlatformService::default()),
248 );
249 spawn_controller_actor(controller)
250 }
251
252 fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
253 let controller = DefaultRouteController::new(
254 Arc::new(std::sync::Mutex::new(Registry::new())),
255 Arc::new(camel_api::NoopPlatformService::default()),
256 );
257 spawn_controller_actor(controller)
258 }
259
260 fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
261 RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
262 }
263
264 struct NoopRuntime;
265 struct NoopInvoker;
266
267 #[async_trait::async_trait]
268 impl RuntimeCommandBus for NoopRuntime {
269 async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
270 Ok(RuntimeCommandResult::Accepted)
271 }
272 }
273
274 #[async_trait::async_trait]
275 impl RuntimeQueryBus for NoopRuntime {
276 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
277 Ok(match query {
278 RuntimeQuery::GetRouteStatus { route_id }
279 | RuntimeQuery::InFlightCount { route_id } => {
280 RuntimeQueryResult::RouteNotFound { route_id }
281 }
282 RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
283 route_ids: Vec::new(),
284 },
285 })
286 }
287 }
288
289 impl FunctionInvokerSync for NoopInvoker {
290 fn stage_pending(
291 &self,
292 _def: FunctionDefinition,
293 _route_id: Option<&str>,
294 _generation: u64,
295 ) {
296 }
297 fn discard_staging(&self, _generation: u64) {}
298 fn begin_reload(&self) -> u64 {
299 1
300 }
301 fn function_refs_for_route(&self, _route_id: &str) -> Vec<(FunctionId, Option<String>)> {
302 vec![]
303 }
304 fn staged_refs_for_route(
305 &self,
306 _route_id: &str,
307 _generation: u64,
308 ) -> Vec<(FunctionId, Option<String>)> {
309 vec![]
310 }
311 fn staged_defs_for_route(
312 &self,
313 _route_id: &str,
314 _generation: u64,
315 ) -> Vec<(FunctionDefinition, Option<String>)> {
316 vec![]
317 }
318 }
319
320 #[async_trait::async_trait]
321 impl FunctionInvoker for NoopInvoker {
322 async fn register(
323 &self,
324 _def: FunctionDefinition,
325 _route_id: Option<&str>,
326 ) -> Result<(), FunctionInvocationError> {
327 Ok(())
328 }
329 async fn unregister(
330 &self,
331 _id: &FunctionId,
332 _route_id: Option<&str>,
333 ) -> Result<(), FunctionInvocationError> {
334 Ok(())
335 }
336 async fn invoke(
337 &self,
338 _id: &FunctionId,
339 _exchange: &Exchange,
340 ) -> Result<ExchangePatch, FunctionInvocationError> {
341 Ok(ExchangePatch::default())
342 }
343 async fn prepare_reload(
344 &self,
345 _diff: FunctionDiff,
346 _generation: u64,
347 ) -> Result<PrepareToken, FunctionInvocationError> {
348 Ok(PrepareToken::default())
349 }
350 async fn finalize_reload(
351 &self,
352 _diff: &FunctionDiff,
353 _generation: u64,
354 ) -> Result<(), FunctionInvocationError> {
355 Ok(())
356 }
357 async fn rollback_reload(
358 &self,
359 _token: PrepareToken,
360 _generation: u64,
361 ) -> Result<(), FunctionInvocationError> {
362 Ok(())
363 }
364 async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
365 Ok(())
366 }
367 }
368
369 #[tokio::test]
370 async fn start_route_sends_command_and_returns_reply() {
371 let (tx, mut rx) = mpsc::channel(1);
372 let handle = RouteControllerHandle { tx };
373
374 let task = tokio::spawn(async move { handle.start_route("route-a").await });
375
376 let command = rx.recv().await.expect("command should be received");
377 match command {
378 RouteControllerCommand::StartRoute { route_id, reply } => {
379 assert_eq!(route_id, "route-a");
380 let _ = reply.send(Ok(()));
381 }
382 _ => panic!("unexpected command variant"),
383 }
384
385 let result = task.await.expect("join should succeed");
386 assert!(result.is_ok());
387 }
388
389 #[tokio::test]
390 async fn start_route_returns_error_when_actor_stops() {
391 let (tx, rx) = mpsc::channel(1);
392 drop(rx);
393
394 let handle = RouteControllerHandle { tx };
395 let result = handle.start_route("route-a").await;
396
397 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
398 }
399
400 #[tokio::test]
401 async fn spawn_controller_actor_processes_commands_and_shutdown() {
402 let controller = DefaultRouteController::new(
403 Arc::new(std::sync::Mutex::new(Registry::new())),
404 Arc::new(camel_api::NoopPlatformService::default()),
405 );
406 let (handle, join_handle) = spawn_controller_actor(controller);
407
408 assert_eq!(handle.route_count().await.expect("route_count"), 0);
409 assert_eq!(
410 handle.route_ids().await.expect("route_ids"),
411 Vec::<String>::new()
412 );
413
414 handle.shutdown().await.expect("shutdown send");
415 join_handle.await.expect("actor join");
416 }
417
418 #[tokio::test]
419 async fn actor_handle_introspection_and_mutation_commands() {
420 let (handle, join_handle) = build_actor_with_components();
421 let definition = route_def("h-1", "timer:tick?period=100");
422
423 handle.add_route(definition).await.expect("add route");
424 assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
425 assert!(
426 !handle
427 .route_exists("no-such")
428 .await
429 .expect("route exists no-such")
430 );
431
432 let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
433 assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
434 assert_eq!(handle.route_count().await.expect("route_count"), 1);
435
436 let auto_ids = handle
437 .auto_startup_route_ids()
438 .await
439 .expect("auto_startup_route_ids");
440 assert!(auto_ids.iter().any(|id| id == "h-1"));
441
442 let shutdown_ids = handle
443 .shutdown_route_ids()
444 .await
445 .expect("shutdown_route_ids");
446 assert!(shutdown_ids.iter().any(|id| id == "h-1"));
447
448 let compiled = handle
449 .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
450 .await
451 .expect("compile_route_definition");
452
453 assert!(
454 handle
455 .get_pipeline("h-1")
456 .await
457 .expect("get_pipeline")
458 .is_some()
459 );
460 handle
461 .swap_pipeline("h-1", compiled)
462 .await
463 .expect("swap_pipeline");
464
465 let _ = handle
466 .in_flight_count("h-1")
467 .await
468 .expect("in_flight_count");
469 let _ = handle.route_source_hash("h-1").await;
470
471 handle
472 .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
473 .await
474 .expect("set_error_handler");
475 handle
476 .set_tracer_config(TracerConfig::default())
477 .await
478 .expect("set_tracer_config");
479 handle
480 .set_runtime_handle(Arc::new(NoopRuntime))
481 .await
482 .expect("set_runtime_handle");
483
484 handle.remove_route("h-1").await.expect("remove_route");
485 assert_eq!(
486 handle
487 .route_count()
488 .await
489 .expect("route_count after remove"),
490 0
491 );
492 handle
493 .stop_all_routes()
494 .await
495 .expect("stop_all_routes on empty");
496
497 handle.shutdown().await.expect("shutdown send");
498 join_handle.await.expect("actor join");
499 }
500
501 #[tokio::test]
502 async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
503 let (handle, join_handle) = build_actor_with_components();
504 handle
505 .add_route(route_def("lc-1", "timer:tick?period=50"))
506 .await
507 .expect("add route lc-1");
508
509 handle.start_route("lc-1").await.expect("start_route");
510 sleep(Duration::from_millis(20)).await;
511
512 handle.restart_route("lc-1").await.expect("restart_route");
513 sleep(Duration::from_millis(20)).await;
514
515 handle.suspend_route("lc-1").await.expect("suspend_route");
516 handle.resume_route("lc-1").await.expect("resume_route");
517 sleep(Duration::from_millis(20)).await;
518
519 handle.stop_route("lc-1").await.expect("stop_route");
520 handle.start_all_routes().await.expect("start_all_routes");
521 sleep(Duration::from_millis(20)).await;
522 handle.stop_all_routes().await.expect("stop_all_routes");
523
524 handle
525 .start_route_reload("lc-1")
526 .await
527 .expect("start_route_reload");
528 handle
529 .stop_route_reload("lc-1")
530 .await
531 .expect("stop_route_reload");
532
533 handle.shutdown().await.expect("shutdown send");
534 join_handle.await.expect("actor join");
535 }
536
537 #[tokio::test]
538 async fn spawn_supervision_restarts_route_on_crash() {
539 let (handle, join_handle) = build_actor_with_components();
540 handle
541 .add_route(route_def("sup-1", "timer:tick?period=100"))
542 .await
543 .expect("add route sup-1");
544 handle
545 .start_route("sup-1")
546 .await
547 .expect("start_route sup-1");
548
549 let (crash_tx, crash_rx) = mpsc::channel(8);
550 let supervision = spawn_supervision_task(
551 handle.clone(),
552 SupervisionConfig {
553 initial_delay: Duration::from_millis(10),
554 max_attempts: Some(2),
555 ..SupervisionConfig::default()
556 },
557 None,
558 crash_rx,
559 );
560
561 crash_tx
562 .send(CrashNotification {
563 route_id: "sup-1".to_string(),
564 error: "simulated".to_string(),
565 })
566 .await
567 .expect("send crash notification");
568
569 sleep(Duration::from_millis(150)).await;
570 drop(crash_tx);
571 supervision.await.expect("supervision join");
572
573 handle.shutdown().await.expect("shutdown send");
574 join_handle.await.expect("actor join");
575 }
576
577 #[tokio::test]
578 async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
579 let (handle, join_handle) = build_actor_with_components();
580 handle
581 .add_route(route_def("sup-2", "timer:tick?period=100"))
582 .await
583 .expect("add route sup-2");
584 handle
585 .start_route("sup-2")
586 .await
587 .expect("start_route sup-2");
588
589 let (crash_tx, crash_rx) = mpsc::channel(8);
590 let supervision = spawn_supervision_task(
591 handle.clone(),
592 SupervisionConfig {
593 initial_delay: Duration::from_millis(10),
594 max_attempts: Some(1),
595 ..SupervisionConfig::default()
596 },
597 None,
598 crash_rx,
599 );
600
601 crash_tx
602 .send(CrashNotification {
603 route_id: "sup-2".to_string(),
604 error: "attempt-1".to_string(),
605 })
606 .await
607 .expect("send crash attempt-1");
608 crash_tx
609 .send(CrashNotification {
610 route_id: "sup-2".to_string(),
611 error: "attempt-2".to_string(),
612 })
613 .await
614 .expect("send crash attempt-2");
615
616 sleep(Duration::from_millis(200)).await;
617 drop(crash_tx);
618 supervision.await.expect("supervision join");
619
620 handle.shutdown().await.expect("shutdown send");
621 join_handle.await.expect("actor join");
622 }
623
624 #[tokio::test]
625 async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
626 let (handle, join_handle) = build_empty_actor();
627
628 handle
629 .try_set_runtime_handle(Arc::new(NoopRuntime))
630 .expect("try_set_runtime_handle should succeed");
631
632 handle.shutdown().await.expect("shutdown send");
633 join_handle.await.expect("actor join");
634 }
635
636 #[tokio::test]
637 async fn shutdown_returns_error_when_actor_stopped() {
638 let (tx, rx) = mpsc::channel(1);
639 drop(rx);
640
641 let handle = RouteControllerHandle { tx };
642 let result = handle.shutdown().await;
643
644 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
645 }
646
647 #[tokio::test]
648 async fn handle_methods_send_expected_commands_and_receive_replies() {
649 let (tx, mut rx) = mpsc::channel(16);
650 let handle = RouteControllerHandle { tx };
651
652 let stop_task = tokio::spawn({
653 let h = handle.clone();
654 async move { h.stop_route("r-1").await }
655 });
656 let cmd = rx.recv().await.expect("stop command");
657 match cmd {
658 RouteControllerCommand::StopRoute { route_id, reply } => {
659 assert_eq!(route_id, "r-1");
660 let _ = reply.send(Ok(()));
661 }
662 _ => panic!("unexpected command"),
663 }
664 assert!(stop_task.await.expect("join").is_ok());
665
666 let exists_task = tokio::spawn({
667 let h = handle.clone();
668 async move { h.route_exists("r-2").await }
669 });
670 let cmd = rx.recv().await.expect("exists command");
671 match cmd {
672 RouteControllerCommand::RouteExists { route_id, reply } => {
673 assert_eq!(route_id, "r-2");
674 let _ = reply.send(true);
675 }
676 _ => panic!("unexpected command"),
677 }
678 assert!(exists_task.await.expect("join").expect("ok"));
679
680 let hash_task = tokio::spawn({
681 let h = handle.clone();
682 async move { h.route_source_hash("r-3").await }
683 });
684 let cmd = rx.recv().await.expect("hash command");
685 match cmd {
686 RouteControllerCommand::RouteSourceHash { route_id, reply } => {
687 assert_eq!(route_id, "r-3");
688 let _ = reply.send(Some(77));
689 }
690 _ => panic!("unexpected command"),
691 }
692 assert_eq!(hash_task.await.expect("join"), Some(77));
693 }
694
695 #[tokio::test]
696 async fn handle_methods_error_on_dropped_reply_channel() {
697 let (tx, mut rx) = mpsc::channel(16);
698 let handle = RouteControllerHandle { tx };
699
700 let count_task = tokio::spawn({
701 let h = handle.clone();
702 async move { h.route_count().await }
703 });
704 let cmd = rx.recv().await.expect("route_count command");
705 match cmd {
706 RouteControllerCommand::RouteCount { reply } => drop(reply),
707 _ => panic!("unexpected command"),
708 }
709 assert!(matches!(
710 count_task.await.expect("join"),
711 Err(CamelError::ProcessorError(_))
712 ));
713
714 let stop_task = tokio::spawn({
715 let h = handle.clone();
716 async move { h.stop_route("x").await }
717 });
718 let cmd = rx.recv().await.expect("stop command");
719 match cmd {
720 RouteControllerCommand::StopRoute { reply, .. } => drop(reply),
721 _ => panic!("unexpected command"),
722 }
723 assert!(matches!(
724 stop_task.await.expect("join"),
725 Err(CamelError::ProcessorError(_))
726 ));
727
728 let maybe_hash = tokio::spawn({
729 let h = handle.clone();
730 async move { h.route_source_hash("x").await }
731 });
732 let cmd = rx.recv().await.expect("hash command");
733 match cmd {
734 RouteControllerCommand::RouteSourceHash { reply, .. } => drop(reply),
735 _ => panic!("unexpected command"),
736 }
737 assert_eq!(maybe_hash.await.expect("join"), None);
738 }
739
740 #[test]
741 fn try_set_function_invoker_returns_mailbox_full() {
742 let (tx, mut rx) = mpsc::channel(1);
743 tx.try_send(RouteControllerCommand::Shutdown)
744 .expect("fill mailbox");
745 let handle = RouteControllerHandle { tx };
746
747 let result = handle.try_set_function_invoker(Arc::new(NoopInvoker));
748 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
749
750 rx.try_recv().expect("mailbox still has first message");
751 }
752}