Skip to main content

camel_core/lifecycle/adapters/
controller_actor.rs

1//! Actor loop and supervision — the async task that processes route control commands.
2//!
3//! Extracted from a monolithic file. The command enum and handle live
4//! in [`controller_actor_commands`](super::controller_actor_commands).
5
6use std::sync::Arc;
7use std::time::Instant;
8
9use camel_api::{MetricsCollector, RouteController, SupervisionConfig};
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tracing::{error, info};
13
14pub(crate) use super::controller_actor_commands::RouteControllerCommand;
15pub use super::controller_actor_commands::RouteControllerHandle;
16use super::route_controller::DefaultRouteController;
17use super::route_helpers::CrashNotification;
18
19pub fn spawn_controller_actor(
20    controller: DefaultRouteController,
21) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
22    let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
23    let handle = tokio::spawn(async move {
24        let mut controller = controller;
25        while let Some(cmd) = rx.recv().await {
26            match cmd {
27                RouteControllerCommand::StartRoute { route_id, reply } => {
28                    let _ = reply.send(controller.start_route(&route_id).await);
29                }
30                RouteControllerCommand::StopRoute { route_id, reply } => {
31                    let _ = reply.send(controller.stop_route(&route_id).await);
32                }
33                RouteControllerCommand::RestartRoute { route_id, reply } => {
34                    let _ = reply.send(controller.restart_route(&route_id).await);
35                }
36                RouteControllerCommand::SuspendRoute { route_id, reply } => {
37                    let _ = reply.send(controller.suspend_route(&route_id).await);
38                }
39                RouteControllerCommand::ResumeRoute { route_id, reply } => {
40                    let _ = reply.send(controller.resume_route(&route_id).await);
41                }
42                RouteControllerCommand::StartAllRoutes { reply } => {
43                    let _ = reply.send(controller.start_all_routes().await);
44                }
45                RouteControllerCommand::StopAllRoutes { reply } => {
46                    let _ = reply.send(controller.stop_all_routes().await);
47                }
48                RouteControllerCommand::AddRoute { definition, reply } => {
49                    let _ = reply.send(controller.add_route(definition).await);
50                }
51                RouteControllerCommand::RemoveRoute { route_id, reply } => {
52                    let _ = reply.send(controller.remove_route(&route_id).await);
53                }
54                RouteControllerCommand::SwapPipeline {
55                    route_id,
56                    pipeline,
57                    reply,
58                } => {
59                    let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
60                }
61                RouteControllerCommand::SwapPipelineRaw {
62                    route_id,
63                    pipeline,
64                    lifecycle,
65                    reply,
66                } => {
67                    let _ =
68                        reply.send(controller.swap_pipeline_raw(&route_id, pipeline, lifecycle));
69                }
70                RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
71                    let _ = reply.send(controller.compile_route_definition(definition));
72                }
73                RouteControllerCommand::CompileRouteDefinitionWithGeneration {
74                    definition,
75                    generation,
76                    reply,
77                } => {
78                    let _ = reply.send(
79                        controller.compile_route_definition_with_generation(definition, generation),
80                    );
81                }
82                RouteControllerCommand::CompileRouteDefinitionPipeline {
83                    definition,
84                    generation,
85                    reply,
86                } => {
87                    let _ = reply
88                        .send(controller.compile_route_definition_pipeline(definition, generation));
89                }
90                RouteControllerCommand::CompileRouteDefinitionDryPipeline { definition, reply } => {
91                    let _ =
92                        reply.send(controller.compile_route_definition_dry_pipeline(definition));
93                }
94                RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
95                    definition,
96                    generation,
97                    reply,
98                } => {
99                    let _ = reply.send(
100                        controller.prepare_route_definition_with_generation(definition, generation),
101                    );
102                }
103                RouteControllerCommand::InsertPreparedRoute { prepared, reply } => {
104                    let _ = reply.send(controller.insert_prepared_route(prepared));
105                }
106                RouteControllerCommand::RemoveRoutePreservingFunctions { route_id, reply } => {
107                    let _ = reply.send(
108                        controller
109                            .remove_route_preserving_functions(&route_id)
110                            .await,
111                    );
112                }
113                RouteControllerCommand::RouteFromUri { route_id, reply } => {
114                    let _ = reply.send(controller.route_from_uri(&route_id));
115                }
116                RouteControllerCommand::SetErrorHandler { config } => {
117                    controller.set_error_handler(config);
118                }
119                RouteControllerCommand::SetTracerConfig { config } => {
120                    controller.set_tracer_config(&config);
121                }
122                RouteControllerCommand::RouteCount { reply } => {
123                    let _ = reply.send(controller.route_count());
124                }
125                RouteControllerCommand::InFlightCount { route_id, reply } => {
126                    let _ = reply.send(controller.in_flight_count(&route_id));
127                }
128                RouteControllerCommand::RouteExists { route_id, reply } => {
129                    let _ = reply.send(controller.route_exists(&route_id));
130                }
131                RouteControllerCommand::RouteIds { reply } => {
132                    let _ = reply.send(controller.route_ids());
133                }
134                RouteControllerCommand::AutoStartupRouteIds { reply } => {
135                    let _ = reply.send(controller.auto_startup_route_ids());
136                }
137                RouteControllerCommand::ShutdownRouteIds { reply } => {
138                    let _ = reply.send(controller.shutdown_route_ids());
139                }
140                RouteControllerCommand::GetPipeline { route_id, reply } => {
141                    let _ = reply.send(controller.get_pipeline(&route_id));
142                }
143                RouteControllerCommand::StartRouteReload { route_id, reply } => {
144                    let _ = reply.send(controller.start_route_reload(&route_id).await);
145                }
146                RouteControllerCommand::StopRouteReload { route_id, reply } => {
147                    let _ = reply.send(controller.stop_route_reload(&route_id).await);
148                }
149                RouteControllerCommand::SetRuntimeHandle { runtime } => {
150                    controller.set_runtime_handle(runtime);
151                }
152                RouteControllerCommand::SetFunctionInvoker { invoker } => {
153                    controller.set_function_invoker(invoker);
154                }
155                RouteControllerCommand::RouteSourceHash { route_id, reply } => {
156                    let _ = reply.send(controller.route_source_hash(&route_id));
157                }
158                RouteControllerCommand::RouteHasLifecycle { route_id, reply } => {
159                    let _ = reply.send(controller.route_has_lifecycle(&route_id));
160                }
161                RouteControllerCommand::Shutdown => {
162                    break;
163                }
164            }
165        }
166    });
167    (RouteControllerHandle { tx }, handle)
168}
169
170pub fn spawn_supervision_task(
171    controller: RouteControllerHandle,
172    config: SupervisionConfig,
173    _metrics: Option<Arc<dyn MetricsCollector>>,
174    mut crash_rx: mpsc::Receiver<CrashNotification>,
175) -> JoinHandle<()> {
176    tokio::spawn(async move {
177        let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
178        let mut last_restart_time: std::collections::HashMap<String, Instant> =
179            std::collections::HashMap::new();
180        let mut currently_restarting: std::collections::HashSet<String> =
181            std::collections::HashSet::new();
182
183        info!("Supervision loop started");
184
185        while let Some(notification) = crash_rx.recv().await {
186            let route_id = notification.route_id;
187            if currently_restarting.contains(&route_id) {
188                continue;
189            }
190
191            if let Some(last_time) = last_restart_time.get(&route_id)
192                && last_time.elapsed() >= config.initial_delay
193            {
194                attempts.insert(route_id.clone(), 0);
195            }
196
197            let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
198            *current_attempt += 1;
199
200            if config
201                .max_attempts
202                .is_some_and(|max| *current_attempt > max)
203            {
204                // log-policy: system-broken
205                error!(
206                    route_id = %route_id,
207                    attempts = *current_attempt,
208                    "Route exceeded max restart attempts, giving up"
209                );
210                continue;
211            }
212
213            let delay = config.next_delay(*current_attempt);
214            currently_restarting.insert(route_id.clone());
215            tokio::time::sleep(delay).await;
216
217            match controller.restart_route(route_id.clone()).await {
218                Ok(()) => {
219                    info!(route_id = %route_id, "Route restarted successfully");
220                    last_restart_time.insert(route_id.clone(), Instant::now());
221                }
222                Err(err) => {
223                    // log-policy: system-broken
224                    error!(route_id = %route_id, error = %err, "Failed to restart route");
225                }
226            }
227
228            currently_restarting.remove(&route_id);
229        }
230
231        info!("Supervision loop ended");
232    })
233}
234
235#[cfg(test)]
236mod tests {
237    use super::{
238        RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
239        spawn_supervision_task,
240    };
241    use crate::lifecycle::adapters::route_controller::DefaultRouteController;
242    use crate::lifecycle::adapters::route_helpers::CrashNotification;
243    use crate::lifecycle::application::route_definition::RouteDefinition;
244    use crate::shared::components::domain::Registry;
245    use crate::shared::observability::domain::TracerConfig;
246    use camel_api::function::PrepareToken;
247    use camel_api::{
248        CamelError, ErrorHandlerConfig, Exchange, ExchangePatch, FunctionDefinition, FunctionDiff,
249        FunctionId, FunctionInvocationError, FunctionInvoker, FunctionInvokerSync, RuntimeCommand,
250        RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult,
251        SupervisionConfig,
252    };
253    use std::sync::Arc;
254    use std::time::Duration;
255    use tokio::sync::mpsc;
256    use tokio::time::sleep;
257
258    fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
259        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
260        {
261            let mut guard = registry.lock().expect("lock");
262            guard.register(std::sync::Arc::new(
263                camel_component_timer::TimerComponent::new(),
264            ));
265            guard.register(std::sync::Arc::new(
266                camel_component_mock::MockComponent::new(),
267            ));
268        }
269        let controller = DefaultRouteController::new(
270            Arc::clone(&registry),
271            Arc::new(camel_api::NoopPlatformService::default()),
272        );
273        spawn_controller_actor(controller)
274    }
275
276    fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
277        let controller = DefaultRouteController::new(
278            Arc::new(std::sync::Mutex::new(Registry::new())),
279            Arc::new(camel_api::NoopPlatformService::default()),
280        );
281        spawn_controller_actor(controller)
282    }
283
284    fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
285        RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
286    }
287
288    struct NoopRuntime;
289    struct NoopInvoker;
290
291    #[async_trait::async_trait]
292    impl RuntimeCommandBus for NoopRuntime {
293        async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
294            Ok(RuntimeCommandResult::Accepted)
295        }
296    }
297
298    #[async_trait::async_trait]
299    impl RuntimeQueryBus for NoopRuntime {
300        async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
301            Ok(match query {
302                RuntimeQuery::GetRouteStatus { route_id }
303                | RuntimeQuery::InFlightCount { route_id } => {
304                    RuntimeQueryResult::RouteNotFound { route_id }
305                }
306                RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
307                    route_ids: Vec::new(),
308                },
309            })
310        }
311    }
312
313    impl FunctionInvokerSync for NoopInvoker {
314        fn stage_pending(
315            &self,
316            _def: FunctionDefinition,
317            _route_id: Option<&str>,
318            _generation: u64,
319        ) {
320        }
321        fn discard_staging(&self, _generation: u64) {}
322        fn begin_reload(&self) -> u64 {
323            1
324        }
325        fn function_refs_for_route(&self, _route_id: &str) -> Vec<(FunctionId, Option<String>)> {
326            vec![]
327        }
328        fn staged_refs_for_route(
329            &self,
330            _route_id: &str,
331            _generation: u64,
332        ) -> Vec<(FunctionId, Option<String>)> {
333            vec![]
334        }
335        fn staged_defs_for_route(
336            &self,
337            _route_id: &str,
338            _generation: u64,
339        ) -> Vec<(FunctionDefinition, Option<String>)> {
340            vec![]
341        }
342    }
343
344    #[async_trait::async_trait]
345    impl FunctionInvoker for NoopInvoker {
346        async fn register(
347            &self,
348            _def: FunctionDefinition,
349            _route_id: Option<&str>,
350        ) -> Result<(), FunctionInvocationError> {
351            Ok(())
352        }
353        async fn unregister(
354            &self,
355            _id: &FunctionId,
356            _route_id: Option<&str>,
357        ) -> Result<(), FunctionInvocationError> {
358            Ok(())
359        }
360        async fn invoke(
361            &self,
362            _id: &FunctionId,
363            _exchange: &Exchange,
364        ) -> Result<ExchangePatch, FunctionInvocationError> {
365            Ok(ExchangePatch::default())
366        }
367        async fn prepare_reload(
368            &self,
369            _diff: FunctionDiff,
370            _generation: u64,
371        ) -> Result<PrepareToken, FunctionInvocationError> {
372            Ok(PrepareToken::default())
373        }
374        async fn finalize_reload(
375            &self,
376            _diff: &FunctionDiff,
377            _generation: u64,
378        ) -> Result<(), FunctionInvocationError> {
379            Ok(())
380        }
381        async fn rollback_reload(
382            &self,
383            _token: PrepareToken,
384            _generation: u64,
385        ) -> Result<(), FunctionInvocationError> {
386            Ok(())
387        }
388        async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
389            Ok(())
390        }
391    }
392
393    #[tokio::test]
394    async fn start_route_sends_command_and_returns_reply() {
395        let (tx, mut rx) = mpsc::channel(1);
396        let handle = RouteControllerHandle { tx };
397
398        let task = tokio::spawn(async move { handle.start_route("route-a").await });
399
400        let command = rx.recv().await.expect("command should be received");
401        match command {
402            RouteControllerCommand::StartRoute { route_id, reply } => {
403                assert_eq!(route_id, "route-a");
404                let _ = reply.send(Ok(()));
405            }
406            _ => panic!("unexpected command variant"),
407        }
408
409        let result = task.await.expect("join should succeed");
410        assert!(result.is_ok());
411    }
412
413    #[tokio::test]
414    async fn start_route_returns_error_when_actor_stops() {
415        let (tx, rx) = mpsc::channel(1);
416        drop(rx);
417
418        let handle = RouteControllerHandle { tx };
419        let result = handle.start_route("route-a").await;
420
421        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
422    }
423
424    #[tokio::test]
425    async fn spawn_controller_actor_processes_commands_and_shutdown() {
426        let controller = DefaultRouteController::new(
427            Arc::new(std::sync::Mutex::new(Registry::new())),
428            Arc::new(camel_api::NoopPlatformService::default()),
429        );
430        let (handle, join_handle) = spawn_controller_actor(controller);
431
432        assert_eq!(handle.route_count().await.expect("route_count"), 0);
433        assert_eq!(
434            handle.route_ids().await.expect("route_ids"),
435            Vec::<String>::new()
436        );
437
438        handle.shutdown().await.expect("shutdown send");
439        join_handle.await.expect("actor join");
440    }
441
442    #[tokio::test]
443    async fn actor_handle_introspection_and_mutation_commands() {
444        let (handle, join_handle) = build_actor_with_components();
445        let definition = route_def("h-1", "timer:tick?period=100");
446
447        handle.add_route(definition).await.expect("add route");
448        assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
449        assert!(
450            !handle
451                .route_exists("no-such")
452                .await
453                .expect("route exists no-such")
454        );
455
456        let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
457        assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
458        assert_eq!(handle.route_count().await.expect("route_count"), 1);
459
460        let auto_ids = handle
461            .auto_startup_route_ids()
462            .await
463            .expect("auto_startup_route_ids");
464        assert!(auto_ids.iter().any(|id| id == "h-1"));
465
466        let shutdown_ids = handle
467            .shutdown_route_ids()
468            .await
469            .expect("shutdown_route_ids");
470        assert!(shutdown_ids.iter().any(|id| id == "h-1"));
471
472        let compiled = handle
473            .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
474            .await
475            .expect("compile_route_definition");
476
477        assert!(
478            handle
479                .get_pipeline("h-1")
480                .await
481                .expect("get_pipeline")
482                .is_some()
483        );
484        handle
485            .swap_pipeline("h-1", compiled)
486            .await
487            .expect("swap_pipeline");
488
489        let _ = handle
490            .in_flight_count("h-1")
491            .await
492            .expect("in_flight_count");
493        let _ = handle.route_source_hash("h-1").await;
494
495        handle
496            .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
497            .await
498            .expect("set_error_handler");
499        handle
500            .set_tracer_config(TracerConfig::default())
501            .await
502            .expect("set_tracer_config");
503        handle
504            .set_runtime_handle(Arc::new(NoopRuntime))
505            .await
506            .expect("set_runtime_handle");
507
508        handle.remove_route("h-1").await.expect("remove_route");
509        assert_eq!(
510            handle
511                .route_count()
512                .await
513                .expect("route_count after remove"),
514            0
515        );
516        handle
517            .stop_all_routes()
518            .await
519            .expect("stop_all_routes on empty");
520
521        handle.shutdown().await.expect("shutdown send");
522        join_handle.await.expect("actor join");
523    }
524
525    #[tokio::test]
526    async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
527        let (handle, join_handle) = build_actor_with_components();
528        handle
529            .add_route(route_def("lc-1", "timer:tick?period=50"))
530            .await
531            .expect("add route lc-1");
532
533        handle.start_route("lc-1").await.expect("start_route");
534        sleep(Duration::from_millis(20)).await;
535
536        handle.restart_route("lc-1").await.expect("restart_route");
537        sleep(Duration::from_millis(20)).await;
538
539        handle.suspend_route("lc-1").await.expect("suspend_route");
540        handle.resume_route("lc-1").await.expect("resume_route");
541        sleep(Duration::from_millis(20)).await;
542
543        handle.stop_route("lc-1").await.expect("stop_route");
544        handle.start_all_routes().await.expect("start_all_routes");
545        sleep(Duration::from_millis(20)).await;
546        handle.stop_all_routes().await.expect("stop_all_routes");
547
548        handle
549            .start_route_reload("lc-1")
550            .await
551            .expect("start_route_reload");
552        handle
553            .stop_route_reload("lc-1")
554            .await
555            .expect("stop_route_reload");
556
557        handle.shutdown().await.expect("shutdown send");
558        join_handle.await.expect("actor join");
559    }
560
561    #[tokio::test]
562    async fn spawn_supervision_restarts_route_on_crash() {
563        let (handle, join_handle) = build_actor_with_components();
564        handle
565            .add_route(route_def("sup-1", "timer:tick?period=100"))
566            .await
567            .expect("add route sup-1");
568        handle
569            .start_route("sup-1")
570            .await
571            .expect("start_route sup-1");
572
573        let (crash_tx, crash_rx) = mpsc::channel(8);
574        let supervision = spawn_supervision_task(
575            handle.clone(),
576            SupervisionConfig {
577                initial_delay: Duration::from_millis(10),
578                max_attempts: Some(2),
579                ..SupervisionConfig::default()
580            },
581            None,
582            crash_rx,
583        );
584
585        crash_tx
586            .send(CrashNotification {
587                route_id: "sup-1".to_string(),
588                error: "simulated".to_string(),
589            })
590            .await
591            .expect("send crash notification");
592
593        sleep(Duration::from_millis(150)).await;
594        drop(crash_tx);
595        supervision.await.expect("supervision join");
596
597        handle.shutdown().await.expect("shutdown send");
598        join_handle.await.expect("actor join");
599    }
600
601    #[tokio::test]
602    async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
603        let (handle, join_handle) = build_actor_with_components();
604        handle
605            .add_route(route_def("sup-2", "timer:tick?period=100"))
606            .await
607            .expect("add route sup-2");
608        handle
609            .start_route("sup-2")
610            .await
611            .expect("start_route sup-2");
612
613        let (crash_tx, crash_rx) = mpsc::channel(8);
614        let supervision = spawn_supervision_task(
615            handle.clone(),
616            SupervisionConfig {
617                initial_delay: Duration::from_millis(10),
618                max_attempts: Some(1),
619                ..SupervisionConfig::default()
620            },
621            None,
622            crash_rx,
623        );
624
625        crash_tx
626            .send(CrashNotification {
627                route_id: "sup-2".to_string(),
628                error: "attempt-1".to_string(),
629            })
630            .await
631            .expect("send crash attempt-1");
632        crash_tx
633            .send(CrashNotification {
634                route_id: "sup-2".to_string(),
635                error: "attempt-2".to_string(),
636            })
637            .await
638            .expect("send crash attempt-2");
639
640        sleep(Duration::from_millis(200)).await;
641        drop(crash_tx);
642        supervision.await.expect("supervision join");
643
644        handle.shutdown().await.expect("shutdown send");
645        join_handle.await.expect("actor join");
646    }
647
648    #[tokio::test]
649    async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
650        let (handle, join_handle) = build_empty_actor();
651
652        handle
653            .try_set_runtime_handle(Arc::new(NoopRuntime))
654            .expect("try_set_runtime_handle should succeed");
655
656        handle.shutdown().await.expect("shutdown send");
657        join_handle.await.expect("actor join");
658    }
659
660    #[tokio::test]
661    async fn shutdown_returns_error_when_actor_stopped() {
662        let (tx, rx) = mpsc::channel(1);
663        drop(rx);
664
665        let handle = RouteControllerHandle { tx };
666        let result = handle.shutdown().await;
667
668        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
669    }
670
671    #[tokio::test]
672    async fn handle_methods_send_expected_commands_and_receive_replies() {
673        let (tx, mut rx) = mpsc::channel(16);
674        let handle = RouteControllerHandle { tx };
675
676        let stop_task = tokio::spawn({
677            let h = handle.clone();
678            async move { h.stop_route("r-1").await }
679        });
680        let cmd = rx.recv().await.expect("stop command");
681        match cmd {
682            RouteControllerCommand::StopRoute { route_id, reply } => {
683                assert_eq!(route_id, "r-1");
684                let _ = reply.send(Ok(()));
685            }
686            _ => panic!("unexpected command"),
687        }
688        assert!(stop_task.await.expect("join").is_ok());
689
690        let exists_task = tokio::spawn({
691            let h = handle.clone();
692            async move { h.route_exists("r-2").await }
693        });
694        let cmd = rx.recv().await.expect("exists command");
695        match cmd {
696            RouteControllerCommand::RouteExists { route_id, reply } => {
697                assert_eq!(route_id, "r-2");
698                let _ = reply.send(true);
699            }
700            _ => panic!("unexpected command"),
701        }
702        assert!(exists_task.await.expect("join").expect("ok"));
703
704        let hash_task = tokio::spawn({
705            let h = handle.clone();
706            async move { h.route_source_hash("r-3").await }
707        });
708        let cmd = rx.recv().await.expect("hash command");
709        match cmd {
710            RouteControllerCommand::RouteSourceHash { route_id, reply } => {
711                assert_eq!(route_id, "r-3");
712                let _ = reply.send(Some(77));
713            }
714            _ => panic!("unexpected command"),
715        }
716        assert_eq!(hash_task.await.expect("join"), Some(77));
717    }
718
719    #[tokio::test]
720    async fn handle_methods_error_on_dropped_reply_channel() {
721        let (tx, mut rx) = mpsc::channel(16);
722        let handle = RouteControllerHandle { tx };
723
724        let count_task = tokio::spawn({
725            let h = handle.clone();
726            async move { h.route_count().await }
727        });
728        let cmd = rx.recv().await.expect("route_count command");
729        match cmd {
730            RouteControllerCommand::RouteCount { reply } => drop(reply),
731            _ => panic!("unexpected command"),
732        }
733        assert!(matches!(
734            count_task.await.expect("join"),
735            Err(CamelError::ProcessorError(_))
736        ));
737
738        let stop_task = tokio::spawn({
739            let h = handle.clone();
740            async move { h.stop_route("x").await }
741        });
742        let cmd = rx.recv().await.expect("stop command");
743        match cmd {
744            RouteControllerCommand::StopRoute { reply, .. } => drop(reply),
745            _ => panic!("unexpected command"),
746        }
747        assert!(matches!(
748            stop_task.await.expect("join"),
749            Err(CamelError::ProcessorError(_))
750        ));
751
752        let maybe_hash = tokio::spawn({
753            let h = handle.clone();
754            async move { h.route_source_hash("x").await }
755        });
756        let cmd = rx.recv().await.expect("hash command");
757        match cmd {
758            RouteControllerCommand::RouteSourceHash { reply, .. } => drop(reply),
759            _ => panic!("unexpected command"),
760        }
761        assert_eq!(maybe_hash.await.expect("join"), None);
762    }
763
764    #[test]
765    fn try_set_function_invoker_returns_mailbox_full() {
766        let (tx, mut rx) = mpsc::channel(1);
767        tx.try_send(RouteControllerCommand::Shutdown)
768            .expect("fill mailbox");
769        let handle = RouteControllerHandle { tx };
770
771        let result = handle.try_set_function_invoker(Arc::new(NoopInvoker));
772        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
773
774        rx.try_recv().expect("mailbox still has first message");
775    }
776}