Skip to main content

camel_core/lifecycle/adapters/
controller_actor.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::{
6    BoxProcessor, CamelError, MetricsCollector, RouteController, RuntimeHandle, SupervisionConfig,
7};
8use tokio::sync::{mpsc, oneshot};
9use tokio::task::JoinHandle;
10use tracing::{error, info};
11
12use super::route_controller::{CrashNotification, DefaultRouteController};
13use crate::lifecycle::application::route_definition::RouteDefinition;
14use crate::shared::observability::domain::TracerConfig;
15
16pub(crate) enum RouteControllerCommand {
17    StartRoute {
18        route_id: String,
19        reply: oneshot::Sender<Result<(), CamelError>>,
20    },
21    StopRoute {
22        route_id: String,
23        reply: oneshot::Sender<Result<(), CamelError>>,
24    },
25    RestartRoute {
26        route_id: String,
27        reply: oneshot::Sender<Result<(), CamelError>>,
28    },
29    SuspendRoute {
30        route_id: String,
31        reply: oneshot::Sender<Result<(), CamelError>>,
32    },
33    ResumeRoute {
34        route_id: String,
35        reply: oneshot::Sender<Result<(), CamelError>>,
36    },
37    StartAllRoutes {
38        reply: oneshot::Sender<Result<(), CamelError>>,
39    },
40    StopAllRoutes {
41        reply: oneshot::Sender<Result<(), CamelError>>,
42    },
43    AddRoute {
44        definition: RouteDefinition,
45        reply: oneshot::Sender<Result<(), CamelError>>,
46    },
47    RemoveRoute {
48        route_id: String,
49        reply: oneshot::Sender<Result<(), CamelError>>,
50    },
51    SwapPipeline {
52        route_id: String,
53        pipeline: BoxProcessor,
54        reply: oneshot::Sender<Result<(), CamelError>>,
55    },
56    CompileRouteDefinition {
57        definition: RouteDefinition,
58        reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
59    },
60    RouteFromUri {
61        route_id: String,
62        reply: oneshot::Sender<Option<String>>,
63    },
64    SetErrorHandler {
65        config: ErrorHandlerConfig,
66    },
67    SetTracerConfig {
68        config: TracerConfig,
69    },
70    RouteCount {
71        reply: oneshot::Sender<usize>,
72    },
73    InFlightCount {
74        route_id: String,
75        reply: oneshot::Sender<Option<u64>>,
76    },
77    RouteExists {
78        route_id: String,
79        reply: oneshot::Sender<bool>,
80    },
81    RouteIds {
82        reply: oneshot::Sender<Vec<String>>,
83    },
84    AutoStartupRouteIds {
85        reply: oneshot::Sender<Vec<String>>,
86    },
87    ShutdownRouteIds {
88        reply: oneshot::Sender<Vec<String>>,
89    },
90    GetPipeline {
91        route_id: String,
92        reply: oneshot::Sender<Option<BoxProcessor>>,
93    },
94    StartRouteReload {
95        route_id: String,
96        reply: oneshot::Sender<Result<(), CamelError>>,
97    },
98    StopRouteReload {
99        route_id: String,
100        reply: oneshot::Sender<Result<(), CamelError>>,
101    },
102    SetRuntimeHandle {
103        runtime: Arc<dyn RuntimeHandle>,
104    },
105    RouteSourceHash {
106        route_id: String,
107        reply: oneshot::Sender<Option<u64>>,
108    },
109    Shutdown,
110}
111
112#[derive(Clone)]
113pub struct RouteControllerHandle {
114    tx: mpsc::Sender<RouteControllerCommand>,
115}
116
117impl RouteControllerHandle {
118    pub async fn start_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
119        let (reply_tx, reply_rx) = oneshot::channel();
120        self.tx
121            .send(RouteControllerCommand::StartRoute {
122                route_id: route_id.into(),
123                reply: reply_tx,
124            })
125            .await
126            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
127        reply_rx
128            .await
129            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
130    }
131
132    pub async fn stop_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
133        let (reply_tx, reply_rx) = oneshot::channel();
134        self.tx
135            .send(RouteControllerCommand::StopRoute {
136                route_id: route_id.into(),
137                reply: reply_tx,
138            })
139            .await
140            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
141        reply_rx
142            .await
143            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
144    }
145
146    pub async fn restart_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
147        let (reply_tx, reply_rx) = oneshot::channel();
148        self.tx
149            .send(RouteControllerCommand::RestartRoute {
150                route_id: route_id.into(),
151                reply: reply_tx,
152            })
153            .await
154            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
155        reply_rx
156            .await
157            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
158    }
159
160    pub async fn suspend_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
161        let (reply_tx, reply_rx) = oneshot::channel();
162        self.tx
163            .send(RouteControllerCommand::SuspendRoute {
164                route_id: route_id.into(),
165                reply: reply_tx,
166            })
167            .await
168            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
169        reply_rx
170            .await
171            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
172    }
173
174    pub async fn resume_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
175        let (reply_tx, reply_rx) = oneshot::channel();
176        self.tx
177            .send(RouteControllerCommand::ResumeRoute {
178                route_id: route_id.into(),
179                reply: reply_tx,
180            })
181            .await
182            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
183        reply_rx
184            .await
185            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
186    }
187
188    pub async fn start_all_routes(&self) -> Result<(), CamelError> {
189        let (reply_tx, reply_rx) = oneshot::channel();
190        self.tx
191            .send(RouteControllerCommand::StartAllRoutes { reply: reply_tx })
192            .await
193            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
194        reply_rx
195            .await
196            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
197    }
198
199    pub async fn stop_all_routes(&self) -> Result<(), CamelError> {
200        let (reply_tx, reply_rx) = oneshot::channel();
201        self.tx
202            .send(RouteControllerCommand::StopAllRoutes { reply: reply_tx })
203            .await
204            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
205        reply_rx
206            .await
207            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
208    }
209
210    pub async fn add_route(&self, definition: RouteDefinition) -> Result<(), CamelError> {
211        let (reply_tx, reply_rx) = oneshot::channel();
212        self.tx
213            .send(RouteControllerCommand::AddRoute {
214                definition,
215                reply: reply_tx,
216            })
217            .await
218            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
219        reply_rx
220            .await
221            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
222    }
223
224    pub async fn remove_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
225        let (reply_tx, reply_rx) = oneshot::channel();
226        self.tx
227            .send(RouteControllerCommand::RemoveRoute {
228                route_id: route_id.into(),
229                reply: reply_tx,
230            })
231            .await
232            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
233        reply_rx
234            .await
235            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
236    }
237
238    pub async fn swap_pipeline(
239        &self,
240        route_id: impl Into<String>,
241        pipeline: BoxProcessor,
242    ) -> Result<(), CamelError> {
243        let (reply_tx, reply_rx) = oneshot::channel();
244        self.tx
245            .send(RouteControllerCommand::SwapPipeline {
246                route_id: route_id.into(),
247                pipeline,
248                reply: reply_tx,
249            })
250            .await
251            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
252        reply_rx
253            .await
254            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
255    }
256
257    pub async fn compile_route_definition(
258        &self,
259        definition: RouteDefinition,
260    ) -> Result<BoxProcessor, CamelError> {
261        let (reply_tx, reply_rx) = oneshot::channel();
262        self.tx
263            .send(RouteControllerCommand::CompileRouteDefinition {
264                definition,
265                reply: reply_tx,
266            })
267            .await
268            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
269        reply_rx
270            .await
271            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
272    }
273
274    pub async fn route_from_uri(
275        &self,
276        route_id: impl Into<String>,
277    ) -> Result<Option<String>, CamelError> {
278        let (reply_tx, reply_rx) = oneshot::channel();
279        self.tx
280            .send(RouteControllerCommand::RouteFromUri {
281                route_id: route_id.into(),
282                reply: reply_tx,
283            })
284            .await
285            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
286        reply_rx
287            .await
288            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
289    }
290
291    pub async fn route_count(&self) -> Result<usize, CamelError> {
292        let (reply_tx, reply_rx) = oneshot::channel();
293        self.tx
294            .send(RouteControllerCommand::RouteCount { reply: reply_tx })
295            .await
296            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
297        reply_rx
298            .await
299            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
300    }
301
302    pub async fn in_flight_count(
303        &self,
304        route_id: impl Into<String>,
305    ) -> Result<Option<u64>, CamelError> {
306        let (reply_tx, reply_rx) = oneshot::channel();
307        self.tx
308            .send(RouteControllerCommand::InFlightCount {
309                route_id: route_id.into(),
310                reply: reply_tx,
311            })
312            .await
313            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
314        reply_rx
315            .await
316            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
317    }
318
319    pub async fn route_exists(&self, route_id: impl Into<String>) -> Result<bool, CamelError> {
320        let (reply_tx, reply_rx) = oneshot::channel();
321        self.tx
322            .send(RouteControllerCommand::RouteExists {
323                route_id: route_id.into(),
324                reply: reply_tx,
325            })
326            .await
327            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
328        reply_rx
329            .await
330            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
331    }
332
333    pub async fn route_ids(&self) -> Result<Vec<String>, CamelError> {
334        let (reply_tx, reply_rx) = oneshot::channel();
335        self.tx
336            .send(RouteControllerCommand::RouteIds { reply: reply_tx })
337            .await
338            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
339        reply_rx
340            .await
341            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
342    }
343
344    pub async fn auto_startup_route_ids(&self) -> Result<Vec<String>, CamelError> {
345        let (reply_tx, reply_rx) = oneshot::channel();
346        self.tx
347            .send(RouteControllerCommand::AutoStartupRouteIds { reply: reply_tx })
348            .await
349            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
350        reply_rx
351            .await
352            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
353    }
354
355    pub async fn shutdown_route_ids(&self) -> Result<Vec<String>, CamelError> {
356        let (reply_tx, reply_rx) = oneshot::channel();
357        self.tx
358            .send(RouteControllerCommand::ShutdownRouteIds { reply: reply_tx })
359            .await
360            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
361        reply_rx
362            .await
363            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
364    }
365
366    pub async fn start_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
367        let (reply_tx, reply_rx) = oneshot::channel();
368        self.tx
369            .send(RouteControllerCommand::StartRouteReload {
370                route_id: route_id.into(),
371                reply: reply_tx,
372            })
373            .await
374            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
375        reply_rx
376            .await
377            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
378    }
379
380    pub async fn stop_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
381        let (reply_tx, reply_rx) = oneshot::channel();
382        self.tx
383            .send(RouteControllerCommand::StopRouteReload {
384                route_id: route_id.into(),
385                reply: reply_tx,
386            })
387            .await
388            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
389        reply_rx
390            .await
391            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
392    }
393
394    pub async fn get_pipeline(
395        &self,
396        route_id: impl Into<String>,
397    ) -> Result<Option<BoxProcessor>, CamelError> {
398        let (reply_tx, reply_rx) = oneshot::channel();
399        self.tx
400            .send(RouteControllerCommand::GetPipeline {
401                route_id: route_id.into(),
402                reply: reply_tx,
403            })
404            .await
405            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
406        reply_rx
407            .await
408            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
409    }
410
411    pub async fn set_error_handler(&self, config: ErrorHandlerConfig) -> Result<(), CamelError> {
412        self.tx
413            .send(RouteControllerCommand::SetErrorHandler { config })
414            .await
415            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
416    }
417
418    pub async fn set_tracer_config(&self, config: TracerConfig) -> Result<(), CamelError> {
419        self.tx
420            .send(RouteControllerCommand::SetTracerConfig { config })
421            .await
422            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
423    }
424
425    pub async fn set_runtime_handle(
426        &self,
427        runtime: Arc<dyn RuntimeHandle>,
428    ) -> Result<(), CamelError> {
429        self.tx
430            .send(RouteControllerCommand::SetRuntimeHandle { runtime })
431            .await
432            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
433    }
434
435    pub fn try_set_runtime_handle(
436        &self,
437        runtime: Arc<dyn RuntimeHandle>,
438    ) -> Result<(), CamelError> {
439        self.tx
440            .try_send(RouteControllerCommand::SetRuntimeHandle { runtime })
441            .map_err(|err| {
442                CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
443            })
444    }
445
446    pub async fn route_source_hash(&self, route_id: impl Into<String>) -> Option<u64> {
447        let (reply_tx, reply_rx) = oneshot::channel();
448        self.tx
449            .send(RouteControllerCommand::RouteSourceHash {
450                route_id: route_id.into(),
451                reply: reply_tx,
452            })
453            .await
454            .ok()?;
455        reply_rx.await.ok()?
456    }
457
458    pub async fn shutdown(&self) -> Result<(), CamelError> {
459        self.tx
460            .send(RouteControllerCommand::Shutdown)
461            .await
462            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
463    }
464}
465
466pub fn spawn_controller_actor(
467    controller: DefaultRouteController,
468) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
469    let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
470    let handle = tokio::spawn(async move {
471        let mut controller = controller;
472        while let Some(cmd) = rx.recv().await {
473            match cmd {
474                RouteControllerCommand::StartRoute { route_id, reply } => {
475                    let _ = reply.send(controller.start_route(&route_id).await);
476                }
477                RouteControllerCommand::StopRoute { route_id, reply } => {
478                    let _ = reply.send(controller.stop_route(&route_id).await);
479                }
480                RouteControllerCommand::RestartRoute { route_id, reply } => {
481                    let _ = reply.send(controller.restart_route(&route_id).await);
482                }
483                RouteControllerCommand::SuspendRoute { route_id, reply } => {
484                    let _ = reply.send(controller.suspend_route(&route_id).await);
485                }
486                RouteControllerCommand::ResumeRoute { route_id, reply } => {
487                    let _ = reply.send(controller.resume_route(&route_id).await);
488                }
489                RouteControllerCommand::StartAllRoutes { reply } => {
490                    let _ = reply.send(controller.start_all_routes().await);
491                }
492                RouteControllerCommand::StopAllRoutes { reply } => {
493                    let _ = reply.send(controller.stop_all_routes().await);
494                }
495                RouteControllerCommand::AddRoute { definition, reply } => {
496                    let _ = reply.send(controller.add_route(definition));
497                }
498                RouteControllerCommand::RemoveRoute { route_id, reply } => {
499                    let _ = reply.send(controller.remove_route(&route_id));
500                }
501                RouteControllerCommand::SwapPipeline {
502                    route_id,
503                    pipeline,
504                    reply,
505                } => {
506                    let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
507                }
508                RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
509                    let _ = reply.send(controller.compile_route_definition(definition));
510                }
511                RouteControllerCommand::RouteFromUri { route_id, reply } => {
512                    let _ = reply.send(controller.route_from_uri(&route_id));
513                }
514                RouteControllerCommand::SetErrorHandler { config } => {
515                    controller.set_error_handler(config);
516                }
517                RouteControllerCommand::SetTracerConfig { config } => {
518                    controller.set_tracer_config(&config);
519                }
520                RouteControllerCommand::RouteCount { reply } => {
521                    let _ = reply.send(controller.route_count());
522                }
523                RouteControllerCommand::InFlightCount { route_id, reply } => {
524                    let _ = reply.send(controller.in_flight_count(&route_id));
525                }
526                RouteControllerCommand::RouteExists { route_id, reply } => {
527                    let _ = reply.send(controller.route_exists(&route_id));
528                }
529                RouteControllerCommand::RouteIds { reply } => {
530                    let _ = reply.send(controller.route_ids());
531                }
532                RouteControllerCommand::AutoStartupRouteIds { reply } => {
533                    let _ = reply.send(controller.auto_startup_route_ids());
534                }
535                RouteControllerCommand::ShutdownRouteIds { reply } => {
536                    let _ = reply.send(controller.shutdown_route_ids());
537                }
538                RouteControllerCommand::GetPipeline { route_id, reply } => {
539                    let _ = reply.send(controller.get_pipeline(&route_id));
540                }
541                RouteControllerCommand::StartRouteReload { route_id, reply } => {
542                    let _ = reply.send(controller.start_route_reload(&route_id).await);
543                }
544                RouteControllerCommand::StopRouteReload { route_id, reply } => {
545                    let _ = reply.send(controller.stop_route_reload(&route_id).await);
546                }
547                RouteControllerCommand::SetRuntimeHandle { runtime } => {
548                    controller.set_runtime_handle(runtime);
549                }
550                RouteControllerCommand::RouteSourceHash { route_id, reply } => {
551                    let _ = reply.send(controller.route_source_hash(&route_id));
552                }
553                RouteControllerCommand::Shutdown => {
554                    break;
555                }
556            }
557        }
558    });
559    (RouteControllerHandle { tx }, handle)
560}
561
562pub fn spawn_supervision_task(
563    controller: RouteControllerHandle,
564    config: SupervisionConfig,
565    _metrics: Option<Arc<dyn MetricsCollector>>,
566    mut crash_rx: mpsc::Receiver<CrashNotification>,
567) -> JoinHandle<()> {
568    tokio::spawn(async move {
569        let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
570        let mut last_restart_time: std::collections::HashMap<String, Instant> =
571            std::collections::HashMap::new();
572        let mut currently_restarting: std::collections::HashSet<String> =
573            std::collections::HashSet::new();
574
575        info!("Supervision loop started");
576
577        while let Some(notification) = crash_rx.recv().await {
578            let route_id = notification.route_id;
579            if currently_restarting.contains(&route_id) {
580                continue;
581            }
582
583            if let Some(last_time) = last_restart_time.get(&route_id)
584                && last_time.elapsed() >= config.initial_delay
585            {
586                attempts.insert(route_id.clone(), 0);
587            }
588
589            let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
590            *current_attempt += 1;
591
592            if config
593                .max_attempts
594                .is_some_and(|max| *current_attempt > max)
595            {
596                error!(
597                    route_id = %route_id,
598                    attempts = *current_attempt,
599                    "Route exceeded max restart attempts, giving up"
600                );
601                continue;
602            }
603
604            let delay = config.next_delay(*current_attempt);
605            currently_restarting.insert(route_id.clone());
606            tokio::time::sleep(delay).await;
607
608            match controller.restart_route(route_id.clone()).await {
609                Ok(()) => {
610                    info!(route_id = %route_id, "Route restarted successfully");
611                    last_restart_time.insert(route_id.clone(), Instant::now());
612                }
613                Err(err) => {
614                    error!(route_id = %route_id, error = %err, "Failed to restart route");
615                }
616            }
617
618            currently_restarting.remove(&route_id);
619        }
620
621        info!("Supervision loop ended");
622    })
623}
624
625#[cfg(test)]
626mod tests {
627    use super::{
628        RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
629        spawn_supervision_task,
630    };
631    use crate::lifecycle::adapters::route_controller::{CrashNotification, DefaultRouteController};
632    use crate::lifecycle::application::route_definition::RouteDefinition;
633    use crate::shared::components::domain::Registry;
634    use crate::shared::observability::domain::TracerConfig;
635    use camel_api::{
636        CamelError, ErrorHandlerConfig, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult,
637        RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult, SupervisionConfig,
638    };
639    use std::sync::Arc;
640    use std::time::Duration;
641    use tokio::sync::mpsc;
642    use tokio::time::sleep;
643
644    fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
645        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
646        {
647            let mut guard = registry.lock().expect("lock");
648            guard.register(std::sync::Arc::new(
649                camel_component_timer::TimerComponent::new(),
650            ));
651            guard.register(std::sync::Arc::new(
652                camel_component_mock::MockComponent::new(),
653            ));
654        }
655        let controller = DefaultRouteController::new(
656            Arc::clone(&registry),
657            Arc::new(camel_api::NoopLeaderElector),
658        );
659        spawn_controller_actor(controller)
660    }
661
662    fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
663        let controller = DefaultRouteController::new(
664            Arc::new(std::sync::Mutex::new(Registry::new())),
665            Arc::new(camel_api::NoopLeaderElector),
666        );
667        spawn_controller_actor(controller)
668    }
669
670    fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
671        RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
672    }
673
674    struct NoopRuntime;
675
676    #[async_trait::async_trait]
677    impl RuntimeCommandBus for NoopRuntime {
678        async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
679            Ok(RuntimeCommandResult::Accepted)
680        }
681    }
682
683    #[async_trait::async_trait]
684    impl RuntimeQueryBus for NoopRuntime {
685        async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
686            Ok(match query {
687                RuntimeQuery::GetRouteStatus { route_id }
688                | RuntimeQuery::InFlightCount { route_id } => {
689                    RuntimeQueryResult::RouteNotFound { route_id }
690                }
691                RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
692                    route_ids: Vec::new(),
693                },
694            })
695        }
696    }
697
698    #[tokio::test]
699    async fn start_route_sends_command_and_returns_reply() {
700        let (tx, mut rx) = mpsc::channel(1);
701        let handle = RouteControllerHandle { tx };
702
703        let task = tokio::spawn(async move { handle.start_route("route-a").await });
704
705        let command = rx.recv().await.expect("command should be received");
706        match command {
707            RouteControllerCommand::StartRoute { route_id, reply } => {
708                assert_eq!(route_id, "route-a");
709                let _ = reply.send(Ok(()));
710            }
711            _ => panic!("unexpected command variant"),
712        }
713
714        let result = task.await.expect("join should succeed");
715        assert!(result.is_ok());
716    }
717
718    #[tokio::test]
719    async fn start_route_returns_error_when_actor_stops() {
720        let (tx, rx) = mpsc::channel(1);
721        drop(rx);
722
723        let handle = RouteControllerHandle { tx };
724        let result = handle.start_route("route-a").await;
725
726        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
727    }
728
729    #[tokio::test]
730    async fn spawn_controller_actor_processes_commands_and_shutdown() {
731        let controller = DefaultRouteController::new(
732            Arc::new(std::sync::Mutex::new(Registry::new())),
733            Arc::new(camel_api::NoopLeaderElector),
734        );
735        let (handle, join_handle) = spawn_controller_actor(controller);
736
737        assert_eq!(handle.route_count().await.expect("route_count"), 0);
738        assert_eq!(
739            handle.route_ids().await.expect("route_ids"),
740            Vec::<String>::new()
741        );
742
743        handle.shutdown().await.expect("shutdown send");
744        join_handle.await.expect("actor join");
745    }
746
747    #[tokio::test]
748    async fn actor_handle_introspection_and_mutation_commands() {
749        let (handle, join_handle) = build_actor_with_components();
750        let definition = route_def("h-1", "timer:tick?period=100");
751
752        handle.add_route(definition).await.expect("add route");
753        assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
754        assert!(
755            !handle
756                .route_exists("no-such")
757                .await
758                .expect("route exists no-such")
759        );
760
761        let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
762        assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
763        assert_eq!(handle.route_count().await.expect("route_count"), 1);
764
765        let auto_ids = handle
766            .auto_startup_route_ids()
767            .await
768            .expect("auto_startup_route_ids");
769        assert!(auto_ids.iter().any(|id| id == "h-1"));
770
771        let shutdown_ids = handle
772            .shutdown_route_ids()
773            .await
774            .expect("shutdown_route_ids");
775        assert!(shutdown_ids.iter().any(|id| id == "h-1"));
776
777        let compiled = handle
778            .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
779            .await
780            .expect("compile_route_definition");
781
782        assert!(
783            handle
784                .get_pipeline("h-1")
785                .await
786                .expect("get_pipeline")
787                .is_some()
788        );
789        handle
790            .swap_pipeline("h-1", compiled)
791            .await
792            .expect("swap_pipeline");
793
794        let _ = handle
795            .in_flight_count("h-1")
796            .await
797            .expect("in_flight_count");
798        let _ = handle.route_source_hash("h-1").await;
799
800        handle
801            .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
802            .await
803            .expect("set_error_handler");
804        handle
805            .set_tracer_config(TracerConfig::default())
806            .await
807            .expect("set_tracer_config");
808        handle
809            .set_runtime_handle(Arc::new(NoopRuntime))
810            .await
811            .expect("set_runtime_handle");
812
813        handle.remove_route("h-1").await.expect("remove_route");
814        assert_eq!(
815            handle
816                .route_count()
817                .await
818                .expect("route_count after remove"),
819            0
820        );
821        handle
822            .stop_all_routes()
823            .await
824            .expect("stop_all_routes on empty");
825
826        handle.shutdown().await.expect("shutdown send");
827        join_handle.await.expect("actor join");
828    }
829
830    #[tokio::test]
831    async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
832        let (handle, join_handle) = build_actor_with_components();
833        handle
834            .add_route(route_def("lc-1", "timer:tick?period=50"))
835            .await
836            .expect("add route lc-1");
837
838        handle.start_route("lc-1").await.expect("start_route");
839        sleep(Duration::from_millis(20)).await;
840
841        handle.restart_route("lc-1").await.expect("restart_route");
842        sleep(Duration::from_millis(20)).await;
843
844        handle.suspend_route("lc-1").await.expect("suspend_route");
845        handle.resume_route("lc-1").await.expect("resume_route");
846        sleep(Duration::from_millis(20)).await;
847
848        handle.stop_route("lc-1").await.expect("stop_route");
849        handle.start_all_routes().await.expect("start_all_routes");
850        sleep(Duration::from_millis(20)).await;
851        handle.stop_all_routes().await.expect("stop_all_routes");
852
853        handle
854            .start_route_reload("lc-1")
855            .await
856            .expect("start_route_reload");
857        handle
858            .stop_route_reload("lc-1")
859            .await
860            .expect("stop_route_reload");
861
862        handle.shutdown().await.expect("shutdown send");
863        join_handle.await.expect("actor join");
864    }
865
866    #[tokio::test]
867    async fn spawn_supervision_restarts_route_on_crash() {
868        let (handle, join_handle) = build_actor_with_components();
869        handle
870            .add_route(route_def("sup-1", "timer:tick?period=100"))
871            .await
872            .expect("add route sup-1");
873        handle
874            .start_route("sup-1")
875            .await
876            .expect("start_route sup-1");
877
878        let (crash_tx, crash_rx) = mpsc::channel(8);
879        let supervision = spawn_supervision_task(
880            handle.clone(),
881            SupervisionConfig {
882                initial_delay: Duration::from_millis(10),
883                max_attempts: Some(2),
884                ..SupervisionConfig::default()
885            },
886            None,
887            crash_rx,
888        );
889
890        crash_tx
891            .send(CrashNotification {
892                route_id: "sup-1".to_string(),
893                error: "simulated".to_string(),
894            })
895            .await
896            .expect("send crash notification");
897
898        sleep(Duration::from_millis(150)).await;
899        drop(crash_tx);
900        supervision.await.expect("supervision join");
901
902        handle.shutdown().await.expect("shutdown send");
903        join_handle.await.expect("actor join");
904    }
905
906    #[tokio::test]
907    async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
908        let (handle, join_handle) = build_actor_with_components();
909        handle
910            .add_route(route_def("sup-2", "timer:tick?period=100"))
911            .await
912            .expect("add route sup-2");
913        handle
914            .start_route("sup-2")
915            .await
916            .expect("start_route sup-2");
917
918        let (crash_tx, crash_rx) = mpsc::channel(8);
919        let supervision = spawn_supervision_task(
920            handle.clone(),
921            SupervisionConfig {
922                initial_delay: Duration::from_millis(10),
923                max_attempts: Some(1),
924                ..SupervisionConfig::default()
925            },
926            None,
927            crash_rx,
928        );
929
930        crash_tx
931            .send(CrashNotification {
932                route_id: "sup-2".to_string(),
933                error: "attempt-1".to_string(),
934            })
935            .await
936            .expect("send crash attempt-1");
937        crash_tx
938            .send(CrashNotification {
939                route_id: "sup-2".to_string(),
940                error: "attempt-2".to_string(),
941            })
942            .await
943            .expect("send crash attempt-2");
944
945        sleep(Duration::from_millis(200)).await;
946        drop(crash_tx);
947        supervision.await.expect("supervision join");
948
949        handle.shutdown().await.expect("shutdown send");
950        join_handle.await.expect("actor join");
951    }
952
953    #[tokio::test]
954    async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
955        let (handle, join_handle) = build_empty_actor();
956
957        handle
958            .try_set_runtime_handle(Arc::new(NoopRuntime))
959            .expect("try_set_runtime_handle should succeed");
960
961        handle.shutdown().await.expect("shutdown send");
962        join_handle.await.expect("actor join");
963    }
964
965    #[tokio::test]
966    async fn shutdown_returns_error_when_actor_stopped() {
967        let (tx, rx) = mpsc::channel(1);
968        drop(rx);
969
970        let handle = RouteControllerHandle { tx };
971        let result = handle.shutdown().await;
972
973        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
974    }
975}