Skip to main content

camel_core/lifecycle/adapters/
controller_actor.rs

1//! Actor loop and supervision — the async task that processes route control commands.
2//!
3//! Extracted from a monolithic file. The command enum and handle live
4//! in [`controller_actor_commands`](super::controller_actor_commands).
5
6use std::sync::Arc;
7use std::time::Instant;
8
9use camel_api::{MetricsCollector, RouteController, SupervisionConfig};
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tracing::{error, info};
13
14pub(crate) use super::controller_actor_commands::RouteControllerCommand;
15pub use super::controller_actor_commands::RouteControllerHandle;
16use super::route_controller::DefaultRouteController;
17use super::route_helpers::CrashNotification;
18
19pub fn spawn_controller_actor(
20    controller: DefaultRouteController,
21) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
22    let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
23    let handle = tokio::spawn(async move {
24        let mut controller = controller;
25        while let Some(cmd) = rx.recv().await {
26            match cmd {
27                RouteControllerCommand::StartRoute { route_id, reply } => {
28                    let _ = reply.send(controller.start_route(&route_id).await);
29                }
30                RouteControllerCommand::StopRoute { route_id, reply } => {
31                    let _ = reply.send(controller.stop_route(&route_id).await);
32                }
33                RouteControllerCommand::RestartRoute { route_id, reply } => {
34                    let _ = reply.send(controller.restart_route(&route_id).await);
35                }
36                RouteControllerCommand::SuspendRoute { route_id, reply } => {
37                    let _ = reply.send(controller.suspend_route(&route_id).await);
38                }
39                RouteControllerCommand::ResumeRoute { route_id, reply } => {
40                    let _ = reply.send(controller.resume_route(&route_id).await);
41                }
42                RouteControllerCommand::StartAllRoutes { reply } => {
43                    let _ = reply.send(controller.start_all_routes().await);
44                }
45                RouteControllerCommand::StopAllRoutes { reply } => {
46                    let _ = reply.send(controller.stop_all_routes().await);
47                }
48                RouteControllerCommand::AddRoute { definition, reply } => {
49                    let _ = reply.send(controller.add_route(definition).await);
50                }
51                RouteControllerCommand::RemoveRoute { route_id, reply } => {
52                    let _ = reply.send(controller.remove_route(&route_id).await);
53                }
54                RouteControllerCommand::SwapPipeline {
55                    route_id,
56                    pipeline,
57                    reply,
58                } => {
59                    let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
60                }
61                RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
62                    let _ = reply.send(controller.compile_route_definition(definition));
63                }
64                RouteControllerCommand::CompileRouteDefinitionWithGeneration {
65                    definition,
66                    generation,
67                    reply,
68                } => {
69                    let _ = reply.send(
70                        controller.compile_route_definition_with_generation(definition, generation),
71                    );
72                }
73                RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
74                    definition,
75                    generation,
76                    reply,
77                } => {
78                    let _ = reply.send(
79                        controller.prepare_route_definition_with_generation(definition, generation),
80                    );
81                }
82                RouteControllerCommand::InsertPreparedRoute { prepared, reply } => {
83                    let _ = reply.send(controller.insert_prepared_route(prepared));
84                }
85                RouteControllerCommand::RemoveRoutePreservingFunctions { route_id, reply } => {
86                    let _ = reply.send(
87                        controller
88                            .remove_route_preserving_functions(&route_id)
89                            .await,
90                    );
91                }
92                RouteControllerCommand::RouteFromUri { route_id, reply } => {
93                    let _ = reply.send(controller.route_from_uri(&route_id));
94                }
95                RouteControllerCommand::SetErrorHandler { config } => {
96                    controller.set_error_handler(config);
97                }
98                RouteControllerCommand::SetTracerConfig { config } => {
99                    controller.set_tracer_config(&config);
100                }
101                RouteControllerCommand::RouteCount { reply } => {
102                    let _ = reply.send(controller.route_count());
103                }
104                RouteControllerCommand::InFlightCount { route_id, reply } => {
105                    let _ = reply.send(controller.in_flight_count(&route_id));
106                }
107                RouteControllerCommand::RouteExists { route_id, reply } => {
108                    let _ = reply.send(controller.route_exists(&route_id));
109                }
110                RouteControllerCommand::RouteIds { reply } => {
111                    let _ = reply.send(controller.route_ids());
112                }
113                RouteControllerCommand::AutoStartupRouteIds { reply } => {
114                    let _ = reply.send(controller.auto_startup_route_ids());
115                }
116                RouteControllerCommand::ShutdownRouteIds { reply } => {
117                    let _ = reply.send(controller.shutdown_route_ids());
118                }
119                RouteControllerCommand::GetPipeline { route_id, reply } => {
120                    let _ = reply.send(controller.get_pipeline(&route_id));
121                }
122                RouteControllerCommand::StartRouteReload { route_id, reply } => {
123                    let _ = reply.send(controller.start_route_reload(&route_id).await);
124                }
125                RouteControllerCommand::StopRouteReload { route_id, reply } => {
126                    let _ = reply.send(controller.stop_route_reload(&route_id).await);
127                }
128                RouteControllerCommand::SetRuntimeHandle { runtime } => {
129                    controller.set_runtime_handle(runtime);
130                }
131                RouteControllerCommand::SetFunctionInvoker { invoker } => {
132                    controller.set_function_invoker(invoker);
133                }
134                RouteControllerCommand::RouteSourceHash { route_id, reply } => {
135                    let _ = reply.send(controller.route_source_hash(&route_id));
136                }
137                RouteControllerCommand::Shutdown => {
138                    break;
139                }
140            }
141        }
142    });
143    (RouteControllerHandle { tx }, handle)
144}
145
146pub fn spawn_supervision_task(
147    controller: RouteControllerHandle,
148    config: SupervisionConfig,
149    _metrics: Option<Arc<dyn MetricsCollector>>,
150    mut crash_rx: mpsc::Receiver<CrashNotification>,
151) -> JoinHandle<()> {
152    tokio::spawn(async move {
153        let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
154        let mut last_restart_time: std::collections::HashMap<String, Instant> =
155            std::collections::HashMap::new();
156        let mut currently_restarting: std::collections::HashSet<String> =
157            std::collections::HashSet::new();
158
159        info!("Supervision loop started");
160
161        while let Some(notification) = crash_rx.recv().await {
162            let route_id = notification.route_id;
163            if currently_restarting.contains(&route_id) {
164                continue;
165            }
166
167            if let Some(last_time) = last_restart_time.get(&route_id)
168                && last_time.elapsed() >= config.initial_delay
169            {
170                attempts.insert(route_id.clone(), 0);
171            }
172
173            let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
174            *current_attempt += 1;
175
176            if config
177                .max_attempts
178                .is_some_and(|max| *current_attempt > max)
179            {
180                // log-policy: system-broken
181                error!(
182                    route_id = %route_id,
183                    attempts = *current_attempt,
184                    "Route exceeded max restart attempts, giving up"
185                );
186                continue;
187            }
188
189            let delay = config.next_delay(*current_attempt);
190            currently_restarting.insert(route_id.clone());
191            tokio::time::sleep(delay).await;
192
193            match controller.restart_route(route_id.clone()).await {
194                Ok(()) => {
195                    info!(route_id = %route_id, "Route restarted successfully");
196                    last_restart_time.insert(route_id.clone(), Instant::now());
197                }
198                Err(err) => {
199                    // log-policy: system-broken
200                    error!(route_id = %route_id, error = %err, "Failed to restart route");
201                }
202            }
203
204            currently_restarting.remove(&route_id);
205        }
206
207        info!("Supervision loop ended");
208    })
209}
210
211#[cfg(test)]
212mod tests {
213    use super::{
214        RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
215        spawn_supervision_task,
216    };
217    use crate::lifecycle::adapters::route_controller::DefaultRouteController;
218    use crate::lifecycle::adapters::route_helpers::CrashNotification;
219    use crate::lifecycle::application::route_definition::RouteDefinition;
220    use crate::shared::components::domain::Registry;
221    use crate::shared::observability::domain::TracerConfig;
222    use camel_api::function::PrepareToken;
223    use camel_api::{
224        CamelError, ErrorHandlerConfig, Exchange, ExchangePatch, FunctionDefinition, FunctionDiff,
225        FunctionId, FunctionInvocationError, FunctionInvoker, FunctionInvokerSync, RuntimeCommand,
226        RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult,
227        SupervisionConfig,
228    };
229    use std::sync::Arc;
230    use std::time::Duration;
231    use tokio::sync::mpsc;
232    use tokio::time::sleep;
233
234    fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
235        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
236        {
237            let mut guard = registry.lock().expect("lock");
238            guard.register(std::sync::Arc::new(
239                camel_component_timer::TimerComponent::new(),
240            ));
241            guard.register(std::sync::Arc::new(
242                camel_component_mock::MockComponent::new(),
243            ));
244        }
245        let controller = DefaultRouteController::new(
246            Arc::clone(&registry),
247            Arc::new(camel_api::NoopPlatformService::default()),
248        );
249        spawn_controller_actor(controller)
250    }
251
252    fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
253        let controller = DefaultRouteController::new(
254            Arc::new(std::sync::Mutex::new(Registry::new())),
255            Arc::new(camel_api::NoopPlatformService::default()),
256        );
257        spawn_controller_actor(controller)
258    }
259
260    fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
261        RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
262    }
263
264    struct NoopRuntime;
265    struct NoopInvoker;
266
267    #[async_trait::async_trait]
268    impl RuntimeCommandBus for NoopRuntime {
269        async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
270            Ok(RuntimeCommandResult::Accepted)
271        }
272    }
273
274    #[async_trait::async_trait]
275    impl RuntimeQueryBus for NoopRuntime {
276        async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
277            Ok(match query {
278                RuntimeQuery::GetRouteStatus { route_id }
279                | RuntimeQuery::InFlightCount { route_id } => {
280                    RuntimeQueryResult::RouteNotFound { route_id }
281                }
282                RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
283                    route_ids: Vec::new(),
284                },
285            })
286        }
287    }
288
289    impl FunctionInvokerSync for NoopInvoker {
290        fn stage_pending(
291            &self,
292            _def: FunctionDefinition,
293            _route_id: Option<&str>,
294            _generation: u64,
295        ) {
296        }
297        fn discard_staging(&self, _generation: u64) {}
298        fn begin_reload(&self) -> u64 {
299            1
300        }
301        fn function_refs_for_route(&self, _route_id: &str) -> Vec<(FunctionId, Option<String>)> {
302            vec![]
303        }
304        fn staged_refs_for_route(
305            &self,
306            _route_id: &str,
307            _generation: u64,
308        ) -> Vec<(FunctionId, Option<String>)> {
309            vec![]
310        }
311        fn staged_defs_for_route(
312            &self,
313            _route_id: &str,
314            _generation: u64,
315        ) -> Vec<(FunctionDefinition, Option<String>)> {
316            vec![]
317        }
318    }
319
320    #[async_trait::async_trait]
321    impl FunctionInvoker for NoopInvoker {
322        async fn register(
323            &self,
324            _def: FunctionDefinition,
325            _route_id: Option<&str>,
326        ) -> Result<(), FunctionInvocationError> {
327            Ok(())
328        }
329        async fn unregister(
330            &self,
331            _id: &FunctionId,
332            _route_id: Option<&str>,
333        ) -> Result<(), FunctionInvocationError> {
334            Ok(())
335        }
336        async fn invoke(
337            &self,
338            _id: &FunctionId,
339            _exchange: &Exchange,
340        ) -> Result<ExchangePatch, FunctionInvocationError> {
341            Ok(ExchangePatch::default())
342        }
343        async fn prepare_reload(
344            &self,
345            _diff: FunctionDiff,
346            _generation: u64,
347        ) -> Result<PrepareToken, FunctionInvocationError> {
348            Ok(PrepareToken::default())
349        }
350        async fn finalize_reload(
351            &self,
352            _diff: &FunctionDiff,
353            _generation: u64,
354        ) -> Result<(), FunctionInvocationError> {
355            Ok(())
356        }
357        async fn rollback_reload(
358            &self,
359            _token: PrepareToken,
360            _generation: u64,
361        ) -> Result<(), FunctionInvocationError> {
362            Ok(())
363        }
364        async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
365            Ok(())
366        }
367    }
368
369    #[tokio::test]
370    async fn start_route_sends_command_and_returns_reply() {
371        let (tx, mut rx) = mpsc::channel(1);
372        let handle = RouteControllerHandle { tx };
373
374        let task = tokio::spawn(async move { handle.start_route("route-a").await });
375
376        let command = rx.recv().await.expect("command should be received");
377        match command {
378            RouteControllerCommand::StartRoute { route_id, reply } => {
379                assert_eq!(route_id, "route-a");
380                let _ = reply.send(Ok(()));
381            }
382            _ => panic!("unexpected command variant"),
383        }
384
385        let result = task.await.expect("join should succeed");
386        assert!(result.is_ok());
387    }
388
389    #[tokio::test]
390    async fn start_route_returns_error_when_actor_stops() {
391        let (tx, rx) = mpsc::channel(1);
392        drop(rx);
393
394        let handle = RouteControllerHandle { tx };
395        let result = handle.start_route("route-a").await;
396
397        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
398    }
399
400    #[tokio::test]
401    async fn spawn_controller_actor_processes_commands_and_shutdown() {
402        let controller = DefaultRouteController::new(
403            Arc::new(std::sync::Mutex::new(Registry::new())),
404            Arc::new(camel_api::NoopPlatformService::default()),
405        );
406        let (handle, join_handle) = spawn_controller_actor(controller);
407
408        assert_eq!(handle.route_count().await.expect("route_count"), 0);
409        assert_eq!(
410            handle.route_ids().await.expect("route_ids"),
411            Vec::<String>::new()
412        );
413
414        handle.shutdown().await.expect("shutdown send");
415        join_handle.await.expect("actor join");
416    }
417
418    #[tokio::test]
419    async fn actor_handle_introspection_and_mutation_commands() {
420        let (handle, join_handle) = build_actor_with_components();
421        let definition = route_def("h-1", "timer:tick?period=100");
422
423        handle.add_route(definition).await.expect("add route");
424        assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
425        assert!(
426            !handle
427                .route_exists("no-such")
428                .await
429                .expect("route exists no-such")
430        );
431
432        let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
433        assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
434        assert_eq!(handle.route_count().await.expect("route_count"), 1);
435
436        let auto_ids = handle
437            .auto_startup_route_ids()
438            .await
439            .expect("auto_startup_route_ids");
440        assert!(auto_ids.iter().any(|id| id == "h-1"));
441
442        let shutdown_ids = handle
443            .shutdown_route_ids()
444            .await
445            .expect("shutdown_route_ids");
446        assert!(shutdown_ids.iter().any(|id| id == "h-1"));
447
448        let compiled = handle
449            .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
450            .await
451            .expect("compile_route_definition");
452
453        assert!(
454            handle
455                .get_pipeline("h-1")
456                .await
457                .expect("get_pipeline")
458                .is_some()
459        );
460        handle
461            .swap_pipeline("h-1", compiled)
462            .await
463            .expect("swap_pipeline");
464
465        let _ = handle
466            .in_flight_count("h-1")
467            .await
468            .expect("in_flight_count");
469        let _ = handle.route_source_hash("h-1").await;
470
471        handle
472            .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
473            .await
474            .expect("set_error_handler");
475        handle
476            .set_tracer_config(TracerConfig::default())
477            .await
478            .expect("set_tracer_config");
479        handle
480            .set_runtime_handle(Arc::new(NoopRuntime))
481            .await
482            .expect("set_runtime_handle");
483
484        handle.remove_route("h-1").await.expect("remove_route");
485        assert_eq!(
486            handle
487                .route_count()
488                .await
489                .expect("route_count after remove"),
490            0
491        );
492        handle
493            .stop_all_routes()
494            .await
495            .expect("stop_all_routes on empty");
496
497        handle.shutdown().await.expect("shutdown send");
498        join_handle.await.expect("actor join");
499    }
500
501    #[tokio::test]
502    async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
503        let (handle, join_handle) = build_actor_with_components();
504        handle
505            .add_route(route_def("lc-1", "timer:tick?period=50"))
506            .await
507            .expect("add route lc-1");
508
509        handle.start_route("lc-1").await.expect("start_route");
510        sleep(Duration::from_millis(20)).await;
511
512        handle.restart_route("lc-1").await.expect("restart_route");
513        sleep(Duration::from_millis(20)).await;
514
515        handle.suspend_route("lc-1").await.expect("suspend_route");
516        handle.resume_route("lc-1").await.expect("resume_route");
517        sleep(Duration::from_millis(20)).await;
518
519        handle.stop_route("lc-1").await.expect("stop_route");
520        handle.start_all_routes().await.expect("start_all_routes");
521        sleep(Duration::from_millis(20)).await;
522        handle.stop_all_routes().await.expect("stop_all_routes");
523
524        handle
525            .start_route_reload("lc-1")
526            .await
527            .expect("start_route_reload");
528        handle
529            .stop_route_reload("lc-1")
530            .await
531            .expect("stop_route_reload");
532
533        handle.shutdown().await.expect("shutdown send");
534        join_handle.await.expect("actor join");
535    }
536
537    #[tokio::test]
538    async fn spawn_supervision_restarts_route_on_crash() {
539        let (handle, join_handle) = build_actor_with_components();
540        handle
541            .add_route(route_def("sup-1", "timer:tick?period=100"))
542            .await
543            .expect("add route sup-1");
544        handle
545            .start_route("sup-1")
546            .await
547            .expect("start_route sup-1");
548
549        let (crash_tx, crash_rx) = mpsc::channel(8);
550        let supervision = spawn_supervision_task(
551            handle.clone(),
552            SupervisionConfig {
553                initial_delay: Duration::from_millis(10),
554                max_attempts: Some(2),
555                ..SupervisionConfig::default()
556            },
557            None,
558            crash_rx,
559        );
560
561        crash_tx
562            .send(CrashNotification {
563                route_id: "sup-1".to_string(),
564                error: "simulated".to_string(),
565            })
566            .await
567            .expect("send crash notification");
568
569        sleep(Duration::from_millis(150)).await;
570        drop(crash_tx);
571        supervision.await.expect("supervision join");
572
573        handle.shutdown().await.expect("shutdown send");
574        join_handle.await.expect("actor join");
575    }
576
577    #[tokio::test]
578    async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
579        let (handle, join_handle) = build_actor_with_components();
580        handle
581            .add_route(route_def("sup-2", "timer:tick?period=100"))
582            .await
583            .expect("add route sup-2");
584        handle
585            .start_route("sup-2")
586            .await
587            .expect("start_route sup-2");
588
589        let (crash_tx, crash_rx) = mpsc::channel(8);
590        let supervision = spawn_supervision_task(
591            handle.clone(),
592            SupervisionConfig {
593                initial_delay: Duration::from_millis(10),
594                max_attempts: Some(1),
595                ..SupervisionConfig::default()
596            },
597            None,
598            crash_rx,
599        );
600
601        crash_tx
602            .send(CrashNotification {
603                route_id: "sup-2".to_string(),
604                error: "attempt-1".to_string(),
605            })
606            .await
607            .expect("send crash attempt-1");
608        crash_tx
609            .send(CrashNotification {
610                route_id: "sup-2".to_string(),
611                error: "attempt-2".to_string(),
612            })
613            .await
614            .expect("send crash attempt-2");
615
616        sleep(Duration::from_millis(200)).await;
617        drop(crash_tx);
618        supervision.await.expect("supervision join");
619
620        handle.shutdown().await.expect("shutdown send");
621        join_handle.await.expect("actor join");
622    }
623
624    #[tokio::test]
625    async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
626        let (handle, join_handle) = build_empty_actor();
627
628        handle
629            .try_set_runtime_handle(Arc::new(NoopRuntime))
630            .expect("try_set_runtime_handle should succeed");
631
632        handle.shutdown().await.expect("shutdown send");
633        join_handle.await.expect("actor join");
634    }
635
636    #[tokio::test]
637    async fn shutdown_returns_error_when_actor_stopped() {
638        let (tx, rx) = mpsc::channel(1);
639        drop(rx);
640
641        let handle = RouteControllerHandle { tx };
642        let result = handle.shutdown().await;
643
644        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
645    }
646
647    #[tokio::test]
648    async fn handle_methods_send_expected_commands_and_receive_replies() {
649        let (tx, mut rx) = mpsc::channel(16);
650        let handle = RouteControllerHandle { tx };
651
652        let stop_task = tokio::spawn({
653            let h = handle.clone();
654            async move { h.stop_route("r-1").await }
655        });
656        let cmd = rx.recv().await.expect("stop command");
657        match cmd {
658            RouteControllerCommand::StopRoute { route_id, reply } => {
659                assert_eq!(route_id, "r-1");
660                let _ = reply.send(Ok(()));
661            }
662            _ => panic!("unexpected command"),
663        }
664        assert!(stop_task.await.expect("join").is_ok());
665
666        let exists_task = tokio::spawn({
667            let h = handle.clone();
668            async move { h.route_exists("r-2").await }
669        });
670        let cmd = rx.recv().await.expect("exists command");
671        match cmd {
672            RouteControllerCommand::RouteExists { route_id, reply } => {
673                assert_eq!(route_id, "r-2");
674                let _ = reply.send(true);
675            }
676            _ => panic!("unexpected command"),
677        }
678        assert!(exists_task.await.expect("join").expect("ok"));
679
680        let hash_task = tokio::spawn({
681            let h = handle.clone();
682            async move { h.route_source_hash("r-3").await }
683        });
684        let cmd = rx.recv().await.expect("hash command");
685        match cmd {
686            RouteControllerCommand::RouteSourceHash { route_id, reply } => {
687                assert_eq!(route_id, "r-3");
688                let _ = reply.send(Some(77));
689            }
690            _ => panic!("unexpected command"),
691        }
692        assert_eq!(hash_task.await.expect("join"), Some(77));
693    }
694
695    #[tokio::test]
696    async fn handle_methods_error_on_dropped_reply_channel() {
697        let (tx, mut rx) = mpsc::channel(16);
698        let handle = RouteControllerHandle { tx };
699
700        let count_task = tokio::spawn({
701            let h = handle.clone();
702            async move { h.route_count().await }
703        });
704        let cmd = rx.recv().await.expect("route_count command");
705        match cmd {
706            RouteControllerCommand::RouteCount { reply } => drop(reply),
707            _ => panic!("unexpected command"),
708        }
709        assert!(matches!(
710            count_task.await.expect("join"),
711            Err(CamelError::ProcessorError(_))
712        ));
713
714        let stop_task = tokio::spawn({
715            let h = handle.clone();
716            async move { h.stop_route("x").await }
717        });
718        let cmd = rx.recv().await.expect("stop command");
719        match cmd {
720            RouteControllerCommand::StopRoute { reply, .. } => drop(reply),
721            _ => panic!("unexpected command"),
722        }
723        assert!(matches!(
724            stop_task.await.expect("join"),
725            Err(CamelError::ProcessorError(_))
726        ));
727
728        let maybe_hash = tokio::spawn({
729            let h = handle.clone();
730            async move { h.route_source_hash("x").await }
731        });
732        let cmd = rx.recv().await.expect("hash command");
733        match cmd {
734            RouteControllerCommand::RouteSourceHash { reply, .. } => drop(reply),
735            _ => panic!("unexpected command"),
736        }
737        assert_eq!(maybe_hash.await.expect("join"), None);
738    }
739
740    #[test]
741    fn try_set_function_invoker_returns_mailbox_full() {
742        let (tx, mut rx) = mpsc::channel(1);
743        tx.try_send(RouteControllerCommand::Shutdown)
744            .expect("fill mailbox");
745        let handle = RouteControllerHandle { tx };
746
747        let result = handle.try_set_function_invoker(Arc::new(NoopInvoker));
748        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
749
750        rx.try_recv().expect("mailbox still has first message");
751    }
752}