Skip to main content

camel_core/lifecycle/adapters/
controller_actor.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::{
6    BoxProcessor, CamelError, MetricsCollector, RouteController, RuntimeHandle, SupervisionConfig,
7};
8use tokio::sync::{mpsc, oneshot};
9use tokio::task::JoinHandle;
10use tracing::{error, info};
11
12use super::route_controller::{CrashNotification, DefaultRouteController};
13use crate::lifecycle::application::route_definition::RouteDefinition;
14use crate::shared::observability::domain::TracerConfig;
15
16pub(crate) enum RouteControllerCommand {
17    StartRoute {
18        route_id: String,
19        reply: oneshot::Sender<Result<(), CamelError>>,
20    },
21    StopRoute {
22        route_id: String,
23        reply: oneshot::Sender<Result<(), CamelError>>,
24    },
25    RestartRoute {
26        route_id: String,
27        reply: oneshot::Sender<Result<(), CamelError>>,
28    },
29    SuspendRoute {
30        route_id: String,
31        reply: oneshot::Sender<Result<(), CamelError>>,
32    },
33    ResumeRoute {
34        route_id: String,
35        reply: oneshot::Sender<Result<(), CamelError>>,
36    },
37    StartAllRoutes {
38        reply: oneshot::Sender<Result<(), CamelError>>,
39    },
40    StopAllRoutes {
41        reply: oneshot::Sender<Result<(), CamelError>>,
42    },
43    AddRoute {
44        definition: RouteDefinition,
45        reply: oneshot::Sender<Result<(), CamelError>>,
46    },
47    RemoveRoute {
48        route_id: String,
49        reply: oneshot::Sender<Result<(), CamelError>>,
50    },
51    SwapPipeline {
52        route_id: String,
53        pipeline: BoxProcessor,
54        reply: oneshot::Sender<Result<(), CamelError>>,
55    },
56    CompileRouteDefinition {
57        definition: RouteDefinition,
58        reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
59    },
60    RouteFromUri {
61        route_id: String,
62        reply: oneshot::Sender<Option<String>>,
63    },
64    SetErrorHandler {
65        config: ErrorHandlerConfig,
66    },
67    SetTracerConfig {
68        config: TracerConfig,
69    },
70    RouteCount {
71        reply: oneshot::Sender<usize>,
72    },
73    InFlightCount {
74        route_id: String,
75        reply: oneshot::Sender<Option<u64>>,
76    },
77    RouteExists {
78        route_id: String,
79        reply: oneshot::Sender<bool>,
80    },
81    RouteIds {
82        reply: oneshot::Sender<Vec<String>>,
83    },
84    AutoStartupRouteIds {
85        reply: oneshot::Sender<Vec<String>>,
86    },
87    ShutdownRouteIds {
88        reply: oneshot::Sender<Vec<String>>,
89    },
90    GetPipeline {
91        route_id: String,
92        reply: oneshot::Sender<Option<BoxProcessor>>,
93    },
94    StartRouteReload {
95        route_id: String,
96        reply: oneshot::Sender<Result<(), CamelError>>,
97    },
98    StopRouteReload {
99        route_id: String,
100        reply: oneshot::Sender<Result<(), CamelError>>,
101    },
102    SetRuntimeHandle {
103        runtime: Arc<dyn RuntimeHandle>,
104    },
105    RouteSourceHash {
106        route_id: String,
107        reply: oneshot::Sender<Option<u64>>,
108    },
109    Shutdown,
110}
111
112#[derive(Clone)]
113pub struct RouteControllerHandle {
114    tx: mpsc::Sender<RouteControllerCommand>,
115}
116
117impl RouteControllerHandle {
118    pub async fn start_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
119        let (reply_tx, reply_rx) = oneshot::channel();
120        self.tx
121            .send(RouteControllerCommand::StartRoute {
122                route_id: route_id.into(),
123                reply: reply_tx,
124            })
125            .await
126            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
127        reply_rx
128            .await
129            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
130    }
131
132    pub async fn stop_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
133        let (reply_tx, reply_rx) = oneshot::channel();
134        self.tx
135            .send(RouteControllerCommand::StopRoute {
136                route_id: route_id.into(),
137                reply: reply_tx,
138            })
139            .await
140            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
141        reply_rx
142            .await
143            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
144    }
145
146    pub async fn restart_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
147        let (reply_tx, reply_rx) = oneshot::channel();
148        self.tx
149            .send(RouteControllerCommand::RestartRoute {
150                route_id: route_id.into(),
151                reply: reply_tx,
152            })
153            .await
154            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
155        reply_rx
156            .await
157            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
158    }
159
160    pub async fn suspend_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
161        let (reply_tx, reply_rx) = oneshot::channel();
162        self.tx
163            .send(RouteControllerCommand::SuspendRoute {
164                route_id: route_id.into(),
165                reply: reply_tx,
166            })
167            .await
168            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
169        reply_rx
170            .await
171            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
172    }
173
174    pub async fn resume_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
175        let (reply_tx, reply_rx) = oneshot::channel();
176        self.tx
177            .send(RouteControllerCommand::ResumeRoute {
178                route_id: route_id.into(),
179                reply: reply_tx,
180            })
181            .await
182            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
183        reply_rx
184            .await
185            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
186    }
187
188    pub async fn start_all_routes(&self) -> Result<(), CamelError> {
189        let (reply_tx, reply_rx) = oneshot::channel();
190        self.tx
191            .send(RouteControllerCommand::StartAllRoutes { reply: reply_tx })
192            .await
193            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
194        reply_rx
195            .await
196            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
197    }
198
199    pub async fn stop_all_routes(&self) -> Result<(), CamelError> {
200        let (reply_tx, reply_rx) = oneshot::channel();
201        self.tx
202            .send(RouteControllerCommand::StopAllRoutes { reply: reply_tx })
203            .await
204            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
205        reply_rx
206            .await
207            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
208    }
209
210    pub async fn add_route(&self, definition: RouteDefinition) -> Result<(), CamelError> {
211        let (reply_tx, reply_rx) = oneshot::channel();
212        self.tx
213            .send(RouteControllerCommand::AddRoute {
214                definition,
215                reply: reply_tx,
216            })
217            .await
218            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
219        reply_rx
220            .await
221            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
222    }
223
224    pub async fn remove_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
225        let (reply_tx, reply_rx) = oneshot::channel();
226        self.tx
227            .send(RouteControllerCommand::RemoveRoute {
228                route_id: route_id.into(),
229                reply: reply_tx,
230            })
231            .await
232            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
233        reply_rx
234            .await
235            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
236    }
237
238    pub async fn swap_pipeline(
239        &self,
240        route_id: impl Into<String>,
241        pipeline: BoxProcessor,
242    ) -> Result<(), CamelError> {
243        let (reply_tx, reply_rx) = oneshot::channel();
244        self.tx
245            .send(RouteControllerCommand::SwapPipeline {
246                route_id: route_id.into(),
247                pipeline,
248                reply: reply_tx,
249            })
250            .await
251            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
252        reply_rx
253            .await
254            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
255    }
256
257    pub async fn compile_route_definition(
258        &self,
259        definition: RouteDefinition,
260    ) -> Result<BoxProcessor, CamelError> {
261        let (reply_tx, reply_rx) = oneshot::channel();
262        self.tx
263            .send(RouteControllerCommand::CompileRouteDefinition {
264                definition,
265                reply: reply_tx,
266            })
267            .await
268            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
269        reply_rx
270            .await
271            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
272    }
273
274    pub async fn route_from_uri(
275        &self,
276        route_id: impl Into<String>,
277    ) -> Result<Option<String>, CamelError> {
278        let (reply_tx, reply_rx) = oneshot::channel();
279        self.tx
280            .send(RouteControllerCommand::RouteFromUri {
281                route_id: route_id.into(),
282                reply: reply_tx,
283            })
284            .await
285            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
286        reply_rx
287            .await
288            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
289    }
290
291    pub async fn route_count(&self) -> Result<usize, CamelError> {
292        let (reply_tx, reply_rx) = oneshot::channel();
293        self.tx
294            .send(RouteControllerCommand::RouteCount { reply: reply_tx })
295            .await
296            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
297        reply_rx
298            .await
299            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
300    }
301
302    pub async fn in_flight_count(
303        &self,
304        route_id: impl Into<String>,
305    ) -> Result<Option<u64>, CamelError> {
306        let (reply_tx, reply_rx) = oneshot::channel();
307        self.tx
308            .send(RouteControllerCommand::InFlightCount {
309                route_id: route_id.into(),
310                reply: reply_tx,
311            })
312            .await
313            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
314        reply_rx
315            .await
316            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
317    }
318
319    pub async fn route_exists(&self, route_id: impl Into<String>) -> Result<bool, CamelError> {
320        let (reply_tx, reply_rx) = oneshot::channel();
321        self.tx
322            .send(RouteControllerCommand::RouteExists {
323                route_id: route_id.into(),
324                reply: reply_tx,
325            })
326            .await
327            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
328        reply_rx
329            .await
330            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
331    }
332
333    pub async fn route_ids(&self) -> Result<Vec<String>, CamelError> {
334        let (reply_tx, reply_rx) = oneshot::channel();
335        self.tx
336            .send(RouteControllerCommand::RouteIds { reply: reply_tx })
337            .await
338            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
339        reply_rx
340            .await
341            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
342    }
343
344    pub async fn auto_startup_route_ids(&self) -> Result<Vec<String>, CamelError> {
345        let (reply_tx, reply_rx) = oneshot::channel();
346        self.tx
347            .send(RouteControllerCommand::AutoStartupRouteIds { reply: reply_tx })
348            .await
349            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
350        reply_rx
351            .await
352            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
353    }
354
355    pub async fn shutdown_route_ids(&self) -> Result<Vec<String>, CamelError> {
356        let (reply_tx, reply_rx) = oneshot::channel();
357        self.tx
358            .send(RouteControllerCommand::ShutdownRouteIds { reply: reply_tx })
359            .await
360            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
361        reply_rx
362            .await
363            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
364    }
365
366    pub async fn start_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
367        let (reply_tx, reply_rx) = oneshot::channel();
368        self.tx
369            .send(RouteControllerCommand::StartRouteReload {
370                route_id: route_id.into(),
371                reply: reply_tx,
372            })
373            .await
374            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
375        reply_rx
376            .await
377            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
378    }
379
380    pub async fn stop_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
381        let (reply_tx, reply_rx) = oneshot::channel();
382        self.tx
383            .send(RouteControllerCommand::StopRouteReload {
384                route_id: route_id.into(),
385                reply: reply_tx,
386            })
387            .await
388            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
389        reply_rx
390            .await
391            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
392    }
393
394    pub async fn get_pipeline(
395        &self,
396        route_id: impl Into<String>,
397    ) -> Result<Option<BoxProcessor>, CamelError> {
398        let (reply_tx, reply_rx) = oneshot::channel();
399        self.tx
400            .send(RouteControllerCommand::GetPipeline {
401                route_id: route_id.into(),
402                reply: reply_tx,
403            })
404            .await
405            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
406        reply_rx
407            .await
408            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
409    }
410
411    pub async fn set_error_handler(&self, config: ErrorHandlerConfig) -> Result<(), CamelError> {
412        self.tx
413            .send(RouteControllerCommand::SetErrorHandler { config })
414            .await
415            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
416    }
417
418    pub async fn set_tracer_config(&self, config: TracerConfig) -> Result<(), CamelError> {
419        self.tx
420            .send(RouteControllerCommand::SetTracerConfig { config })
421            .await
422            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
423    }
424
425    pub async fn set_runtime_handle(
426        &self,
427        runtime: Arc<dyn RuntimeHandle>,
428    ) -> Result<(), CamelError> {
429        self.tx
430            .send(RouteControllerCommand::SetRuntimeHandle { runtime })
431            .await
432            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
433    }
434
435    pub fn try_set_runtime_handle(
436        &self,
437        runtime: Arc<dyn RuntimeHandle>,
438    ) -> Result<(), CamelError> {
439        self.tx
440            .try_send(RouteControllerCommand::SetRuntimeHandle { runtime })
441            .map_err(|err| {
442                CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
443            })
444    }
445
446    pub async fn route_source_hash(&self, route_id: impl Into<String>) -> Option<u64> {
447        let (reply_tx, reply_rx) = oneshot::channel();
448        self.tx
449            .send(RouteControllerCommand::RouteSourceHash {
450                route_id: route_id.into(),
451                reply: reply_tx,
452            })
453            .await
454            .ok()?;
455        reply_rx.await.ok()?
456    }
457
458    pub async fn shutdown(&self) -> Result<(), CamelError> {
459        self.tx
460            .send(RouteControllerCommand::Shutdown)
461            .await
462            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
463    }
464}
465
466pub fn spawn_controller_actor(
467    controller: DefaultRouteController,
468) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
469    let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
470    let handle = tokio::spawn(async move {
471        let mut controller = controller;
472        while let Some(cmd) = rx.recv().await {
473            match cmd {
474                RouteControllerCommand::StartRoute { route_id, reply } => {
475                    let _ = reply.send(controller.start_route(&route_id).await);
476                }
477                RouteControllerCommand::StopRoute { route_id, reply } => {
478                    let _ = reply.send(controller.stop_route(&route_id).await);
479                }
480                RouteControllerCommand::RestartRoute { route_id, reply } => {
481                    let _ = reply.send(controller.restart_route(&route_id).await);
482                }
483                RouteControllerCommand::SuspendRoute { route_id, reply } => {
484                    let _ = reply.send(controller.suspend_route(&route_id).await);
485                }
486                RouteControllerCommand::ResumeRoute { route_id, reply } => {
487                    let _ = reply.send(controller.resume_route(&route_id).await);
488                }
489                RouteControllerCommand::StartAllRoutes { reply } => {
490                    let _ = reply.send(controller.start_all_routes().await);
491                }
492                RouteControllerCommand::StopAllRoutes { reply } => {
493                    let _ = reply.send(controller.stop_all_routes().await);
494                }
495                RouteControllerCommand::AddRoute { definition, reply } => {
496                    let _ = reply.send(controller.add_route(definition));
497                }
498                RouteControllerCommand::RemoveRoute { route_id, reply } => {
499                    let _ = reply.send(controller.remove_route(&route_id));
500                }
501                RouteControllerCommand::SwapPipeline {
502                    route_id,
503                    pipeline,
504                    reply,
505                } => {
506                    let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
507                }
508                RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
509                    let _ = reply.send(controller.compile_route_definition(definition));
510                }
511                RouteControllerCommand::RouteFromUri { route_id, reply } => {
512                    let _ = reply.send(controller.route_from_uri(&route_id));
513                }
514                RouteControllerCommand::SetErrorHandler { config } => {
515                    controller.set_error_handler(config);
516                }
517                RouteControllerCommand::SetTracerConfig { config } => {
518                    controller.set_tracer_config(&config);
519                }
520                RouteControllerCommand::RouteCount { reply } => {
521                    let _ = reply.send(controller.route_count());
522                }
523                RouteControllerCommand::InFlightCount { route_id, reply } => {
524                    let _ = reply.send(controller.in_flight_count(&route_id));
525                }
526                RouteControllerCommand::RouteExists { route_id, reply } => {
527                    let _ = reply.send(controller.route_exists(&route_id));
528                }
529                RouteControllerCommand::RouteIds { reply } => {
530                    let _ = reply.send(controller.route_ids());
531                }
532                RouteControllerCommand::AutoStartupRouteIds { reply } => {
533                    let _ = reply.send(controller.auto_startup_route_ids());
534                }
535                RouteControllerCommand::ShutdownRouteIds { reply } => {
536                    let _ = reply.send(controller.shutdown_route_ids());
537                }
538                RouteControllerCommand::GetPipeline { route_id, reply } => {
539                    let _ = reply.send(controller.get_pipeline(&route_id));
540                }
541                RouteControllerCommand::StartRouteReload { route_id, reply } => {
542                    let _ = reply.send(controller.start_route_reload(&route_id).await);
543                }
544                RouteControllerCommand::StopRouteReload { route_id, reply } => {
545                    let _ = reply.send(controller.stop_route_reload(&route_id).await);
546                }
547                RouteControllerCommand::SetRuntimeHandle { runtime } => {
548                    controller.set_runtime_handle(runtime);
549                }
550                RouteControllerCommand::RouteSourceHash { route_id, reply } => {
551                    let _ = reply.send(controller.route_source_hash(&route_id));
552                }
553                RouteControllerCommand::Shutdown => {
554                    break;
555                }
556            }
557        }
558    });
559    (RouteControllerHandle { tx }, handle)
560}
561
562pub fn spawn_supervision_task(
563    controller: RouteControllerHandle,
564    config: SupervisionConfig,
565    _metrics: Option<Arc<dyn MetricsCollector>>,
566    mut crash_rx: mpsc::Receiver<CrashNotification>,
567) -> JoinHandle<()> {
568    tokio::spawn(async move {
569        let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
570        let mut last_restart_time: std::collections::HashMap<String, Instant> =
571            std::collections::HashMap::new();
572        let mut currently_restarting: std::collections::HashSet<String> =
573            std::collections::HashSet::new();
574
575        info!("Supervision loop started");
576
577        while let Some(notification) = crash_rx.recv().await {
578            let route_id = notification.route_id;
579            if currently_restarting.contains(&route_id) {
580                continue;
581            }
582
583            if let Some(last_time) = last_restart_time.get(&route_id)
584                && last_time.elapsed() >= config.initial_delay
585            {
586                attempts.insert(route_id.clone(), 0);
587            }
588
589            let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
590            *current_attempt += 1;
591
592            if config
593                .max_attempts
594                .is_some_and(|max| *current_attempt > max)
595            {
596                error!(
597                    route_id = %route_id,
598                    attempts = *current_attempt,
599                    "Route exceeded max restart attempts, giving up"
600                );
601                continue;
602            }
603
604            let delay = config.next_delay(*current_attempt);
605            currently_restarting.insert(route_id.clone());
606            tokio::time::sleep(delay).await;
607
608            match controller.restart_route(route_id.clone()).await {
609                Ok(()) => {
610                    info!(route_id = %route_id, "Route restarted successfully");
611                    last_restart_time.insert(route_id.clone(), Instant::now());
612                }
613                Err(err) => {
614                    error!(route_id = %route_id, error = %err, "Failed to restart route");
615                }
616            }
617
618            currently_restarting.remove(&route_id);
619        }
620
621        info!("Supervision loop ended");
622    })
623}
624
625#[cfg(test)]
626mod tests {
627    use super::{
628        RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
629        spawn_supervision_task,
630    };
631    use crate::lifecycle::adapters::route_controller::{CrashNotification, DefaultRouteController};
632    use crate::lifecycle::application::route_definition::RouteDefinition;
633    use crate::shared::components::domain::Registry;
634    use crate::shared::observability::domain::TracerConfig;
635    use camel_api::{
636        CamelError, ErrorHandlerConfig, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult,
637        RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult, SupervisionConfig,
638    };
639    use std::sync::Arc;
640    use std::time::Duration;
641    use tokio::sync::mpsc;
642    use tokio::time::sleep;
643
644    fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
645        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
646        {
647            let mut guard = registry.lock().expect("lock");
648            guard.register(std::sync::Arc::new(
649                camel_component_timer::TimerComponent::new(),
650            ));
651            guard.register(std::sync::Arc::new(
652                camel_component_mock::MockComponent::new(),
653            ));
654        }
655        let controller = DefaultRouteController::new(Arc::clone(&registry));
656        spawn_controller_actor(controller)
657    }
658
659    fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
660        let controller =
661            DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new())));
662        spawn_controller_actor(controller)
663    }
664
665    fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
666        RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
667    }
668
669    struct NoopRuntime;
670
671    #[async_trait::async_trait]
672    impl RuntimeCommandBus for NoopRuntime {
673        async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
674            Ok(RuntimeCommandResult::Accepted)
675        }
676    }
677
678    #[async_trait::async_trait]
679    impl RuntimeQueryBus for NoopRuntime {
680        async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
681            Ok(match query {
682                RuntimeQuery::GetRouteStatus { route_id }
683                | RuntimeQuery::InFlightCount { route_id } => {
684                    RuntimeQueryResult::RouteNotFound { route_id }
685                }
686                RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
687                    route_ids: Vec::new(),
688                },
689            })
690        }
691    }
692
693    #[tokio::test]
694    async fn start_route_sends_command_and_returns_reply() {
695        let (tx, mut rx) = mpsc::channel(1);
696        let handle = RouteControllerHandle { tx };
697
698        let task = tokio::spawn(async move { handle.start_route("route-a").await });
699
700        let command = rx.recv().await.expect("command should be received");
701        match command {
702            RouteControllerCommand::StartRoute { route_id, reply } => {
703                assert_eq!(route_id, "route-a");
704                let _ = reply.send(Ok(()));
705            }
706            _ => panic!("unexpected command variant"),
707        }
708
709        let result = task.await.expect("join should succeed");
710        assert!(result.is_ok());
711    }
712
713    #[tokio::test]
714    async fn start_route_returns_error_when_actor_stops() {
715        let (tx, rx) = mpsc::channel(1);
716        drop(rx);
717
718        let handle = RouteControllerHandle { tx };
719        let result = handle.start_route("route-a").await;
720
721        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
722    }
723
724    #[tokio::test]
725    async fn spawn_controller_actor_processes_commands_and_shutdown() {
726        let controller =
727            DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new())));
728        let (handle, join_handle) = spawn_controller_actor(controller);
729
730        assert_eq!(handle.route_count().await.expect("route_count"), 0);
731        assert_eq!(
732            handle.route_ids().await.expect("route_ids"),
733            Vec::<String>::new()
734        );
735
736        handle.shutdown().await.expect("shutdown send");
737        join_handle.await.expect("actor join");
738    }
739
740    #[tokio::test]
741    async fn actor_handle_introspection_and_mutation_commands() {
742        let (handle, join_handle) = build_actor_with_components();
743        let definition = route_def("h-1", "timer:tick?period=100");
744
745        handle.add_route(definition).await.expect("add route");
746        assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
747        assert!(
748            !handle
749                .route_exists("no-such")
750                .await
751                .expect("route exists no-such")
752        );
753
754        let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
755        assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
756        assert_eq!(handle.route_count().await.expect("route_count"), 1);
757
758        let auto_ids = handle
759            .auto_startup_route_ids()
760            .await
761            .expect("auto_startup_route_ids");
762        assert!(auto_ids.iter().any(|id| id == "h-1"));
763
764        let shutdown_ids = handle
765            .shutdown_route_ids()
766            .await
767            .expect("shutdown_route_ids");
768        assert!(shutdown_ids.iter().any(|id| id == "h-1"));
769
770        let compiled = handle
771            .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
772            .await
773            .expect("compile_route_definition");
774
775        assert!(
776            handle
777                .get_pipeline("h-1")
778                .await
779                .expect("get_pipeline")
780                .is_some()
781        );
782        handle
783            .swap_pipeline("h-1", compiled)
784            .await
785            .expect("swap_pipeline");
786
787        let _ = handle
788            .in_flight_count("h-1")
789            .await
790            .expect("in_flight_count");
791        let _ = handle.route_source_hash("h-1").await;
792
793        handle
794            .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
795            .await
796            .expect("set_error_handler");
797        handle
798            .set_tracer_config(TracerConfig::default())
799            .await
800            .expect("set_tracer_config");
801        handle
802            .set_runtime_handle(Arc::new(NoopRuntime))
803            .await
804            .expect("set_runtime_handle");
805
806        handle.remove_route("h-1").await.expect("remove_route");
807        assert_eq!(
808            handle
809                .route_count()
810                .await
811                .expect("route_count after remove"),
812            0
813        );
814        handle
815            .stop_all_routes()
816            .await
817            .expect("stop_all_routes on empty");
818
819        handle.shutdown().await.expect("shutdown send");
820        join_handle.await.expect("actor join");
821    }
822
823    #[tokio::test]
824    async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
825        let (handle, join_handle) = build_actor_with_components();
826        handle
827            .add_route(route_def("lc-1", "timer:tick?period=50"))
828            .await
829            .expect("add route lc-1");
830
831        handle.start_route("lc-1").await.expect("start_route");
832        sleep(Duration::from_millis(20)).await;
833
834        handle.restart_route("lc-1").await.expect("restart_route");
835        sleep(Duration::from_millis(20)).await;
836
837        handle.suspend_route("lc-1").await.expect("suspend_route");
838        handle.resume_route("lc-1").await.expect("resume_route");
839        sleep(Duration::from_millis(20)).await;
840
841        handle.stop_route("lc-1").await.expect("stop_route");
842        handle.start_all_routes().await.expect("start_all_routes");
843        sleep(Duration::from_millis(20)).await;
844        handle.stop_all_routes().await.expect("stop_all_routes");
845
846        handle
847            .start_route_reload("lc-1")
848            .await
849            .expect("start_route_reload");
850        handle
851            .stop_route_reload("lc-1")
852            .await
853            .expect("stop_route_reload");
854
855        handle.shutdown().await.expect("shutdown send");
856        join_handle.await.expect("actor join");
857    }
858
859    #[tokio::test]
860    async fn spawn_supervision_restarts_route_on_crash() {
861        let (handle, join_handle) = build_actor_with_components();
862        handle
863            .add_route(route_def("sup-1", "timer:tick?period=100"))
864            .await
865            .expect("add route sup-1");
866        handle
867            .start_route("sup-1")
868            .await
869            .expect("start_route sup-1");
870
871        let (crash_tx, crash_rx) = mpsc::channel(8);
872        let supervision = spawn_supervision_task(
873            handle.clone(),
874            SupervisionConfig {
875                initial_delay: Duration::from_millis(10),
876                max_attempts: Some(2),
877                ..SupervisionConfig::default()
878            },
879            None,
880            crash_rx,
881        );
882
883        crash_tx
884            .send(CrashNotification {
885                route_id: "sup-1".to_string(),
886                error: "simulated".to_string(),
887            })
888            .await
889            .expect("send crash notification");
890
891        sleep(Duration::from_millis(150)).await;
892        drop(crash_tx);
893        supervision.await.expect("supervision join");
894
895        handle.shutdown().await.expect("shutdown send");
896        join_handle.await.expect("actor join");
897    }
898
899    #[tokio::test]
900    async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
901        let (handle, join_handle) = build_actor_with_components();
902        handle
903            .add_route(route_def("sup-2", "timer:tick?period=100"))
904            .await
905            .expect("add route sup-2");
906        handle
907            .start_route("sup-2")
908            .await
909            .expect("start_route sup-2");
910
911        let (crash_tx, crash_rx) = mpsc::channel(8);
912        let supervision = spawn_supervision_task(
913            handle.clone(),
914            SupervisionConfig {
915                initial_delay: Duration::from_millis(10),
916                max_attempts: Some(1),
917                ..SupervisionConfig::default()
918            },
919            None,
920            crash_rx,
921        );
922
923        crash_tx
924            .send(CrashNotification {
925                route_id: "sup-2".to_string(),
926                error: "attempt-1".to_string(),
927            })
928            .await
929            .expect("send crash attempt-1");
930        crash_tx
931            .send(CrashNotification {
932                route_id: "sup-2".to_string(),
933                error: "attempt-2".to_string(),
934            })
935            .await
936            .expect("send crash attempt-2");
937
938        sleep(Duration::from_millis(200)).await;
939        drop(crash_tx);
940        supervision.await.expect("supervision join");
941
942        handle.shutdown().await.expect("shutdown send");
943        join_handle.await.expect("actor join");
944    }
945
946    #[tokio::test]
947    async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
948        let (handle, join_handle) = build_empty_actor();
949
950        handle
951            .try_set_runtime_handle(Arc::new(NoopRuntime))
952            .expect("try_set_runtime_handle should succeed");
953
954        handle.shutdown().await.expect("shutdown send");
955        join_handle.await.expect("actor join");
956    }
957
958    #[tokio::test]
959    async fn shutdown_returns_error_when_actor_stopped() {
960        let (tx, rx) = mpsc::channel(1);
961        drop(rx);
962
963        let handle = RouteControllerHandle { tx };
964        let result = handle.shutdown().await;
965
966        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
967    }
968}