Skip to main content

camel_core/lifecycle/adapters/
controller_actor.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::{
6    BoxProcessor, CamelError, FunctionInvoker, MetricsCollector, RouteController, RuntimeHandle,
7    SupervisionConfig,
8};
9use tokio::sync::{mpsc, oneshot};
10use tokio::task::JoinHandle;
11use tracing::{error, info};
12
13use super::route_controller::{CrashNotification, DefaultRouteController, PreparedRoute};
14use crate::lifecycle::application::route_definition::RouteDefinition;
15use crate::shared::observability::domain::TracerConfig;
16
17pub(crate) enum RouteControllerCommand {
18    StartRoute {
19        route_id: String,
20        reply: oneshot::Sender<Result<(), CamelError>>,
21    },
22    StopRoute {
23        route_id: String,
24        reply: oneshot::Sender<Result<(), CamelError>>,
25    },
26    RestartRoute {
27        route_id: String,
28        reply: oneshot::Sender<Result<(), CamelError>>,
29    },
30    SuspendRoute {
31        route_id: String,
32        reply: oneshot::Sender<Result<(), CamelError>>,
33    },
34    ResumeRoute {
35        route_id: String,
36        reply: oneshot::Sender<Result<(), CamelError>>,
37    },
38    StartAllRoutes {
39        reply: oneshot::Sender<Result<(), CamelError>>,
40    },
41    StopAllRoutes {
42        reply: oneshot::Sender<Result<(), CamelError>>,
43    },
44    AddRoute {
45        definition: RouteDefinition,
46        reply: oneshot::Sender<Result<(), CamelError>>,
47    },
48    RemoveRoute {
49        route_id: String,
50        reply: oneshot::Sender<Result<(), CamelError>>,
51    },
52    SwapPipeline {
53        route_id: String,
54        pipeline: BoxProcessor,
55        reply: oneshot::Sender<Result<(), CamelError>>,
56    },
57    CompileRouteDefinition {
58        definition: RouteDefinition,
59        reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
60    },
61    CompileRouteDefinitionWithGeneration {
62        definition: RouteDefinition,
63        generation: u64,
64        reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
65    },
66    PrepareRouteDefinitionWithGeneration {
67        definition: RouteDefinition,
68        generation: u64,
69        reply: oneshot::Sender<Result<PreparedRoute, CamelError>>,
70    },
71    InsertPreparedRoute {
72        prepared: PreparedRoute,
73        reply: oneshot::Sender<Result<(), CamelError>>,
74    },
75    RemoveRoutePreservingFunctions {
76        route_id: String,
77        reply: oneshot::Sender<Result<(), CamelError>>,
78    },
79    RouteFromUri {
80        route_id: String,
81        reply: oneshot::Sender<Option<String>>,
82    },
83    SetErrorHandler {
84        config: ErrorHandlerConfig,
85    },
86    SetTracerConfig {
87        config: TracerConfig,
88    },
89    RouteCount {
90        reply: oneshot::Sender<usize>,
91    },
92    InFlightCount {
93        route_id: String,
94        reply: oneshot::Sender<Option<u64>>,
95    },
96    RouteExists {
97        route_id: String,
98        reply: oneshot::Sender<bool>,
99    },
100    RouteIds {
101        reply: oneshot::Sender<Vec<String>>,
102    },
103    AutoStartupRouteIds {
104        reply: oneshot::Sender<Vec<String>>,
105    },
106    ShutdownRouteIds {
107        reply: oneshot::Sender<Vec<String>>,
108    },
109    GetPipeline {
110        route_id: String,
111        reply: oneshot::Sender<Option<BoxProcessor>>,
112    },
113    StartRouteReload {
114        route_id: String,
115        reply: oneshot::Sender<Result<(), CamelError>>,
116    },
117    StopRouteReload {
118        route_id: String,
119        reply: oneshot::Sender<Result<(), CamelError>>,
120    },
121    SetRuntimeHandle {
122        runtime: Arc<dyn RuntimeHandle>,
123    },
124    SetFunctionInvoker {
125        invoker: Arc<dyn FunctionInvoker>,
126    },
127    RouteSourceHash {
128        route_id: String,
129        reply: oneshot::Sender<Option<u64>>,
130    },
131    Shutdown,
132}
133
134#[derive(Clone)]
135pub struct RouteControllerHandle {
136    tx: mpsc::Sender<RouteControllerCommand>,
137}
138
139impl RouteControllerHandle {
140    pub async fn start_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
141        let (reply_tx, reply_rx) = oneshot::channel();
142        self.tx
143            .send(RouteControllerCommand::StartRoute {
144                route_id: route_id.into(),
145                reply: reply_tx,
146            })
147            .await
148            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
149        reply_rx
150            .await
151            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
152    }
153
154    pub async fn stop_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
155        let (reply_tx, reply_rx) = oneshot::channel();
156        self.tx
157            .send(RouteControllerCommand::StopRoute {
158                route_id: route_id.into(),
159                reply: reply_tx,
160            })
161            .await
162            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
163        reply_rx
164            .await
165            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
166    }
167
168    pub async fn restart_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
169        let (reply_tx, reply_rx) = oneshot::channel();
170        self.tx
171            .send(RouteControllerCommand::RestartRoute {
172                route_id: route_id.into(),
173                reply: reply_tx,
174            })
175            .await
176            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
177        reply_rx
178            .await
179            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
180    }
181
182    pub async fn suspend_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
183        let (reply_tx, reply_rx) = oneshot::channel();
184        self.tx
185            .send(RouteControllerCommand::SuspendRoute {
186                route_id: route_id.into(),
187                reply: reply_tx,
188            })
189            .await
190            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
191        reply_rx
192            .await
193            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
194    }
195
196    pub async fn resume_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
197        let (reply_tx, reply_rx) = oneshot::channel();
198        self.tx
199            .send(RouteControllerCommand::ResumeRoute {
200                route_id: route_id.into(),
201                reply: reply_tx,
202            })
203            .await
204            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
205        reply_rx
206            .await
207            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
208    }
209
210    pub async fn start_all_routes(&self) -> Result<(), CamelError> {
211        let (reply_tx, reply_rx) = oneshot::channel();
212        self.tx
213            .send(RouteControllerCommand::StartAllRoutes { reply: reply_tx })
214            .await
215            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
216        reply_rx
217            .await
218            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
219    }
220
221    pub async fn stop_all_routes(&self) -> Result<(), CamelError> {
222        let (reply_tx, reply_rx) = oneshot::channel();
223        self.tx
224            .send(RouteControllerCommand::StopAllRoutes { reply: reply_tx })
225            .await
226            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
227        reply_rx
228            .await
229            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
230    }
231
232    pub async fn add_route(&self, definition: RouteDefinition) -> Result<(), CamelError> {
233        let (reply_tx, reply_rx) = oneshot::channel();
234        self.tx
235            .send(RouteControllerCommand::AddRoute {
236                definition,
237                reply: reply_tx,
238            })
239            .await
240            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
241        reply_rx
242            .await
243            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
244    }
245
246    pub async fn remove_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
247        let (reply_tx, reply_rx) = oneshot::channel();
248        self.tx
249            .send(RouteControllerCommand::RemoveRoute {
250                route_id: route_id.into(),
251                reply: reply_tx,
252            })
253            .await
254            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
255        reply_rx
256            .await
257            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
258    }
259
260    pub async fn swap_pipeline(
261        &self,
262        route_id: impl Into<String>,
263        pipeline: BoxProcessor,
264    ) -> Result<(), CamelError> {
265        let (reply_tx, reply_rx) = oneshot::channel();
266        self.tx
267            .send(RouteControllerCommand::SwapPipeline {
268                route_id: route_id.into(),
269                pipeline,
270                reply: reply_tx,
271            })
272            .await
273            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
274        reply_rx
275            .await
276            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
277    }
278
279    pub async fn compile_route_definition(
280        &self,
281        definition: RouteDefinition,
282    ) -> Result<BoxProcessor, CamelError> {
283        let (reply_tx, reply_rx) = oneshot::channel();
284        self.tx
285            .send(RouteControllerCommand::CompileRouteDefinition {
286                definition,
287                reply: reply_tx,
288            })
289            .await
290            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
291        reply_rx
292            .await
293            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
294    }
295
296    pub async fn compile_route_definition_with_generation(
297        &self,
298        definition: RouteDefinition,
299        generation: u64,
300    ) -> Result<BoxProcessor, CamelError> {
301        let (reply_tx, reply_rx) = oneshot::channel();
302        self.tx
303            .send(
304                RouteControllerCommand::CompileRouteDefinitionWithGeneration {
305                    definition,
306                    generation,
307                    reply: reply_tx,
308                },
309            )
310            .await
311            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
312        reply_rx
313            .await
314            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
315    }
316
317    pub(crate) async fn prepare_route_definition_with_generation(
318        &self,
319        definition: RouteDefinition,
320        generation: u64,
321    ) -> Result<PreparedRoute, CamelError> {
322        let (reply_tx, reply_rx) = oneshot::channel();
323        self.tx
324            .send(
325                RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
326                    definition,
327                    generation,
328                    reply: reply_tx,
329                },
330            )
331            .await
332            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
333        reply_rx
334            .await
335            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
336    }
337
338    pub(crate) async fn insert_prepared_route(
339        &self,
340        prepared: PreparedRoute,
341    ) -> Result<(), CamelError> {
342        let (reply_tx, reply_rx) = oneshot::channel();
343        self.tx
344            .send(RouteControllerCommand::InsertPreparedRoute {
345                prepared,
346                reply: reply_tx,
347            })
348            .await
349            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
350        reply_rx
351            .await
352            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
353    }
354
355    pub async fn remove_route_preserving_functions(
356        &self,
357        route_id: String,
358    ) -> Result<(), CamelError> {
359        let (reply_tx, reply_rx) = oneshot::channel();
360        self.tx
361            .send(RouteControllerCommand::RemoveRoutePreservingFunctions {
362                route_id,
363                reply: reply_tx,
364            })
365            .await
366            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
367        reply_rx
368            .await
369            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
370    }
371
372    pub async fn route_from_uri(
373        &self,
374        route_id: impl Into<String>,
375    ) -> Result<Option<String>, CamelError> {
376        let (reply_tx, reply_rx) = oneshot::channel();
377        self.tx
378            .send(RouteControllerCommand::RouteFromUri {
379                route_id: route_id.into(),
380                reply: reply_tx,
381            })
382            .await
383            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
384        reply_rx
385            .await
386            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
387    }
388
389    pub async fn route_count(&self) -> Result<usize, CamelError> {
390        let (reply_tx, reply_rx) = oneshot::channel();
391        self.tx
392            .send(RouteControllerCommand::RouteCount { reply: reply_tx })
393            .await
394            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
395        reply_rx
396            .await
397            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
398    }
399
400    pub async fn in_flight_count(
401        &self,
402        route_id: impl Into<String>,
403    ) -> Result<Option<u64>, CamelError> {
404        let (reply_tx, reply_rx) = oneshot::channel();
405        self.tx
406            .send(RouteControllerCommand::InFlightCount {
407                route_id: route_id.into(),
408                reply: reply_tx,
409            })
410            .await
411            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
412        reply_rx
413            .await
414            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
415    }
416
417    pub async fn route_exists(&self, route_id: impl Into<String>) -> Result<bool, CamelError> {
418        let (reply_tx, reply_rx) = oneshot::channel();
419        self.tx
420            .send(RouteControllerCommand::RouteExists {
421                route_id: route_id.into(),
422                reply: reply_tx,
423            })
424            .await
425            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
426        reply_rx
427            .await
428            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
429    }
430
431    pub async fn route_ids(&self) -> Result<Vec<String>, CamelError> {
432        let (reply_tx, reply_rx) = oneshot::channel();
433        self.tx
434            .send(RouteControllerCommand::RouteIds { reply: reply_tx })
435            .await
436            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
437        reply_rx
438            .await
439            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
440    }
441
442    pub async fn auto_startup_route_ids(&self) -> Result<Vec<String>, CamelError> {
443        let (reply_tx, reply_rx) = oneshot::channel();
444        self.tx
445            .send(RouteControllerCommand::AutoStartupRouteIds { reply: reply_tx })
446            .await
447            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
448        reply_rx
449            .await
450            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
451    }
452
453    pub async fn shutdown_route_ids(&self) -> Result<Vec<String>, CamelError> {
454        let (reply_tx, reply_rx) = oneshot::channel();
455        self.tx
456            .send(RouteControllerCommand::ShutdownRouteIds { reply: reply_tx })
457            .await
458            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
459        reply_rx
460            .await
461            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
462    }
463
464    pub async fn start_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
465        let (reply_tx, reply_rx) = oneshot::channel();
466        self.tx
467            .send(RouteControllerCommand::StartRouteReload {
468                route_id: route_id.into(),
469                reply: reply_tx,
470            })
471            .await
472            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
473        reply_rx
474            .await
475            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
476    }
477
478    pub async fn stop_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
479        let (reply_tx, reply_rx) = oneshot::channel();
480        self.tx
481            .send(RouteControllerCommand::StopRouteReload {
482                route_id: route_id.into(),
483                reply: reply_tx,
484            })
485            .await
486            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
487        reply_rx
488            .await
489            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
490    }
491
492    pub async fn get_pipeline(
493        &self,
494        route_id: impl Into<String>,
495    ) -> Result<Option<BoxProcessor>, CamelError> {
496        let (reply_tx, reply_rx) = oneshot::channel();
497        self.tx
498            .send(RouteControllerCommand::GetPipeline {
499                route_id: route_id.into(),
500                reply: reply_tx,
501            })
502            .await
503            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
504        reply_rx
505            .await
506            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
507    }
508
509    pub async fn set_error_handler(&self, config: ErrorHandlerConfig) -> Result<(), CamelError> {
510        self.tx
511            .send(RouteControllerCommand::SetErrorHandler { config })
512            .await
513            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
514    }
515
516    pub async fn set_tracer_config(&self, config: TracerConfig) -> Result<(), CamelError> {
517        self.tx
518            .send(RouteControllerCommand::SetTracerConfig { config })
519            .await
520            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
521    }
522
523    pub async fn set_runtime_handle(
524        &self,
525        runtime: Arc<dyn RuntimeHandle>,
526    ) -> Result<(), CamelError> {
527        self.tx
528            .send(RouteControllerCommand::SetRuntimeHandle { runtime })
529            .await
530            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
531    }
532
533    pub fn try_set_runtime_handle(
534        &self,
535        runtime: Arc<dyn RuntimeHandle>,
536    ) -> Result<(), CamelError> {
537        self.tx
538            .try_send(RouteControllerCommand::SetRuntimeHandle { runtime })
539            .map_err(|err| {
540                CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
541            })
542    }
543
544    pub async fn set_function_invoker(
545        &self,
546        invoker: Arc<dyn FunctionInvoker>,
547    ) -> Result<(), CamelError> {
548        self.tx
549            .send(RouteControllerCommand::SetFunctionInvoker { invoker })
550            .await
551            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
552    }
553
554    pub fn try_set_function_invoker(
555        &self,
556        invoker: Arc<dyn FunctionInvoker>,
557    ) -> Result<(), CamelError> {
558        self.tx
559            .try_send(RouteControllerCommand::SetFunctionInvoker { invoker })
560            .map_err(|err| {
561                CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
562            })
563    }
564
565    pub async fn route_source_hash(&self, route_id: impl Into<String>) -> Option<u64> {
566        let (reply_tx, reply_rx) = oneshot::channel();
567        self.tx
568            .send(RouteControllerCommand::RouteSourceHash {
569                route_id: route_id.into(),
570                reply: reply_tx,
571            })
572            .await
573            .ok()?;
574        reply_rx.await.ok()?
575    }
576
577    pub async fn shutdown(&self) -> Result<(), CamelError> {
578        self.tx
579            .send(RouteControllerCommand::Shutdown)
580            .await
581            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
582    }
583}
584
585pub fn spawn_controller_actor(
586    controller: DefaultRouteController,
587) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
588    let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
589    let handle = tokio::spawn(async move {
590        let mut controller = controller;
591        while let Some(cmd) = rx.recv().await {
592            match cmd {
593                RouteControllerCommand::StartRoute { route_id, reply } => {
594                    let _ = reply.send(controller.start_route(&route_id).await);
595                }
596                RouteControllerCommand::StopRoute { route_id, reply } => {
597                    let _ = reply.send(controller.stop_route(&route_id).await);
598                }
599                RouteControllerCommand::RestartRoute { route_id, reply } => {
600                    let _ = reply.send(controller.restart_route(&route_id).await);
601                }
602                RouteControllerCommand::SuspendRoute { route_id, reply } => {
603                    let _ = reply.send(controller.suspend_route(&route_id).await);
604                }
605                RouteControllerCommand::ResumeRoute { route_id, reply } => {
606                    let _ = reply.send(controller.resume_route(&route_id).await);
607                }
608                RouteControllerCommand::StartAllRoutes { reply } => {
609                    let _ = reply.send(controller.start_all_routes().await);
610                }
611                RouteControllerCommand::StopAllRoutes { reply } => {
612                    let _ = reply.send(controller.stop_all_routes().await);
613                }
614                RouteControllerCommand::AddRoute { definition, reply } => {
615                    let _ = reply.send(controller.add_route(definition).await);
616                }
617                RouteControllerCommand::RemoveRoute { route_id, reply } => {
618                    let _ = reply.send(controller.remove_route(&route_id).await);
619                }
620                RouteControllerCommand::SwapPipeline {
621                    route_id,
622                    pipeline,
623                    reply,
624                } => {
625                    let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
626                }
627                RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
628                    let _ = reply.send(controller.compile_route_definition(definition));
629                }
630                RouteControllerCommand::CompileRouteDefinitionWithGeneration {
631                    definition,
632                    generation,
633                    reply,
634                } => {
635                    let _ = reply.send(
636                        controller.compile_route_definition_with_generation(definition, generation),
637                    );
638                }
639                RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
640                    definition,
641                    generation,
642                    reply,
643                } => {
644                    let _ = reply.send(
645                        controller.prepare_route_definition_with_generation(definition, generation),
646                    );
647                }
648                RouteControllerCommand::InsertPreparedRoute { prepared, reply } => {
649                    let _ = reply.send(controller.insert_prepared_route(prepared));
650                }
651                RouteControllerCommand::RemoveRoutePreservingFunctions { route_id, reply } => {
652                    let _ = reply.send(
653                        controller
654                            .remove_route_preserving_functions(&route_id)
655                            .await,
656                    );
657                }
658                RouteControllerCommand::RouteFromUri { route_id, reply } => {
659                    let _ = reply.send(controller.route_from_uri(&route_id));
660                }
661                RouteControllerCommand::SetErrorHandler { config } => {
662                    controller.set_error_handler(config);
663                }
664                RouteControllerCommand::SetTracerConfig { config } => {
665                    controller.set_tracer_config(&config);
666                }
667                RouteControllerCommand::RouteCount { reply } => {
668                    let _ = reply.send(controller.route_count());
669                }
670                RouteControllerCommand::InFlightCount { route_id, reply } => {
671                    let _ = reply.send(controller.in_flight_count(&route_id));
672                }
673                RouteControllerCommand::RouteExists { route_id, reply } => {
674                    let _ = reply.send(controller.route_exists(&route_id));
675                }
676                RouteControllerCommand::RouteIds { reply } => {
677                    let _ = reply.send(controller.route_ids());
678                }
679                RouteControllerCommand::AutoStartupRouteIds { reply } => {
680                    let _ = reply.send(controller.auto_startup_route_ids());
681                }
682                RouteControllerCommand::ShutdownRouteIds { reply } => {
683                    let _ = reply.send(controller.shutdown_route_ids());
684                }
685                RouteControllerCommand::GetPipeline { route_id, reply } => {
686                    let _ = reply.send(controller.get_pipeline(&route_id));
687                }
688                RouteControllerCommand::StartRouteReload { route_id, reply } => {
689                    let _ = reply.send(controller.start_route_reload(&route_id).await);
690                }
691                RouteControllerCommand::StopRouteReload { route_id, reply } => {
692                    let _ = reply.send(controller.stop_route_reload(&route_id).await);
693                }
694                RouteControllerCommand::SetRuntimeHandle { runtime } => {
695                    controller.set_runtime_handle(runtime);
696                }
697                RouteControllerCommand::SetFunctionInvoker { invoker } => {
698                    controller.set_function_invoker(invoker);
699                }
700                RouteControllerCommand::RouteSourceHash { route_id, reply } => {
701                    let _ = reply.send(controller.route_source_hash(&route_id));
702                }
703                RouteControllerCommand::Shutdown => {
704                    break;
705                }
706            }
707        }
708    });
709    (RouteControllerHandle { tx }, handle)
710}
711
712pub fn spawn_supervision_task(
713    controller: RouteControllerHandle,
714    config: SupervisionConfig,
715    _metrics: Option<Arc<dyn MetricsCollector>>,
716    mut crash_rx: mpsc::Receiver<CrashNotification>,
717) -> JoinHandle<()> {
718    tokio::spawn(async move {
719        let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
720        let mut last_restart_time: std::collections::HashMap<String, Instant> =
721            std::collections::HashMap::new();
722        let mut currently_restarting: std::collections::HashSet<String> =
723            std::collections::HashSet::new();
724
725        info!("Supervision loop started");
726
727        while let Some(notification) = crash_rx.recv().await {
728            let route_id = notification.route_id;
729            if currently_restarting.contains(&route_id) {
730                continue;
731            }
732
733            if let Some(last_time) = last_restart_time.get(&route_id)
734                && last_time.elapsed() >= config.initial_delay
735            {
736                attempts.insert(route_id.clone(), 0);
737            }
738
739            let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
740            *current_attempt += 1;
741
742            if config
743                .max_attempts
744                .is_some_and(|max| *current_attempt > max)
745            {
746                error!(
747                    route_id = %route_id,
748                    attempts = *current_attempt,
749                    "Route exceeded max restart attempts, giving up"
750                );
751                continue;
752            }
753
754            let delay = config.next_delay(*current_attempt);
755            currently_restarting.insert(route_id.clone());
756            tokio::time::sleep(delay).await;
757
758            match controller.restart_route(route_id.clone()).await {
759                Ok(()) => {
760                    info!(route_id = %route_id, "Route restarted successfully");
761                    last_restart_time.insert(route_id.clone(), Instant::now());
762                }
763                Err(err) => {
764                    error!(route_id = %route_id, error = %err, "Failed to restart route");
765                }
766            }
767
768            currently_restarting.remove(&route_id);
769        }
770
771        info!("Supervision loop ended");
772    })
773}
774
775#[cfg(test)]
776mod tests {
777    use super::{
778        RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
779        spawn_supervision_task,
780    };
781    use crate::lifecycle::adapters::route_controller::{CrashNotification, DefaultRouteController};
782    use crate::lifecycle::application::route_definition::RouteDefinition;
783    use crate::shared::components::domain::Registry;
784    use crate::shared::observability::domain::TracerConfig;
785    use camel_api::{
786        CamelError, ErrorHandlerConfig, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult,
787        RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult, SupervisionConfig,
788    };
789    use std::sync::Arc;
790    use std::time::Duration;
791    use tokio::sync::mpsc;
792    use tokio::time::sleep;
793
794    fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
795        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
796        {
797            let mut guard = registry.lock().expect("lock");
798            guard.register(std::sync::Arc::new(
799                camel_component_timer::TimerComponent::new(),
800            ));
801            guard.register(std::sync::Arc::new(
802                camel_component_mock::MockComponent::new(),
803            ));
804        }
805        let controller = DefaultRouteController::new(
806            Arc::clone(&registry),
807            Arc::new(camel_api::NoopPlatformService::default()),
808        );
809        spawn_controller_actor(controller)
810    }
811
812    fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
813        let controller = DefaultRouteController::new(
814            Arc::new(std::sync::Mutex::new(Registry::new())),
815            Arc::new(camel_api::NoopPlatformService::default()),
816        );
817        spawn_controller_actor(controller)
818    }
819
820    fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
821        RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
822    }
823
824    struct NoopRuntime;
825
826    #[async_trait::async_trait]
827    impl RuntimeCommandBus for NoopRuntime {
828        async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
829            Ok(RuntimeCommandResult::Accepted)
830        }
831    }
832
833    #[async_trait::async_trait]
834    impl RuntimeQueryBus for NoopRuntime {
835        async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
836            Ok(match query {
837                RuntimeQuery::GetRouteStatus { route_id }
838                | RuntimeQuery::InFlightCount { route_id } => {
839                    RuntimeQueryResult::RouteNotFound { route_id }
840                }
841                RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
842                    route_ids: Vec::new(),
843                },
844            })
845        }
846    }
847
848    #[tokio::test]
849    async fn start_route_sends_command_and_returns_reply() {
850        let (tx, mut rx) = mpsc::channel(1);
851        let handle = RouteControllerHandle { tx };
852
853        let task = tokio::spawn(async move { handle.start_route("route-a").await });
854
855        let command = rx.recv().await.expect("command should be received");
856        match command {
857            RouteControllerCommand::StartRoute { route_id, reply } => {
858                assert_eq!(route_id, "route-a");
859                let _ = reply.send(Ok(()));
860            }
861            _ => panic!("unexpected command variant"),
862        }
863
864        let result = task.await.expect("join should succeed");
865        assert!(result.is_ok());
866    }
867
868    #[tokio::test]
869    async fn start_route_returns_error_when_actor_stops() {
870        let (tx, rx) = mpsc::channel(1);
871        drop(rx);
872
873        let handle = RouteControllerHandle { tx };
874        let result = handle.start_route("route-a").await;
875
876        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
877    }
878
879    #[tokio::test]
880    async fn spawn_controller_actor_processes_commands_and_shutdown() {
881        let controller = DefaultRouteController::new(
882            Arc::new(std::sync::Mutex::new(Registry::new())),
883            Arc::new(camel_api::NoopPlatformService::default()),
884        );
885        let (handle, join_handle) = spawn_controller_actor(controller);
886
887        assert_eq!(handle.route_count().await.expect("route_count"), 0);
888        assert_eq!(
889            handle.route_ids().await.expect("route_ids"),
890            Vec::<String>::new()
891        );
892
893        handle.shutdown().await.expect("shutdown send");
894        join_handle.await.expect("actor join");
895    }
896
897    #[tokio::test]
898    async fn actor_handle_introspection_and_mutation_commands() {
899        let (handle, join_handle) = build_actor_with_components();
900        let definition = route_def("h-1", "timer:tick?period=100");
901
902        handle.add_route(definition).await.expect("add route");
903        assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
904        assert!(
905            !handle
906                .route_exists("no-such")
907                .await
908                .expect("route exists no-such")
909        );
910
911        let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
912        assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
913        assert_eq!(handle.route_count().await.expect("route_count"), 1);
914
915        let auto_ids = handle
916            .auto_startup_route_ids()
917            .await
918            .expect("auto_startup_route_ids");
919        assert!(auto_ids.iter().any(|id| id == "h-1"));
920
921        let shutdown_ids = handle
922            .shutdown_route_ids()
923            .await
924            .expect("shutdown_route_ids");
925        assert!(shutdown_ids.iter().any(|id| id == "h-1"));
926
927        let compiled = handle
928            .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
929            .await
930            .expect("compile_route_definition");
931
932        assert!(
933            handle
934                .get_pipeline("h-1")
935                .await
936                .expect("get_pipeline")
937                .is_some()
938        );
939        handle
940            .swap_pipeline("h-1", compiled)
941            .await
942            .expect("swap_pipeline");
943
944        let _ = handle
945            .in_flight_count("h-1")
946            .await
947            .expect("in_flight_count");
948        let _ = handle.route_source_hash("h-1").await;
949
950        handle
951            .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
952            .await
953            .expect("set_error_handler");
954        handle
955            .set_tracer_config(TracerConfig::default())
956            .await
957            .expect("set_tracer_config");
958        handle
959            .set_runtime_handle(Arc::new(NoopRuntime))
960            .await
961            .expect("set_runtime_handle");
962
963        handle.remove_route("h-1").await.expect("remove_route");
964        assert_eq!(
965            handle
966                .route_count()
967                .await
968                .expect("route_count after remove"),
969            0
970        );
971        handle
972            .stop_all_routes()
973            .await
974            .expect("stop_all_routes on empty");
975
976        handle.shutdown().await.expect("shutdown send");
977        join_handle.await.expect("actor join");
978    }
979
980    #[tokio::test]
981    async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
982        let (handle, join_handle) = build_actor_with_components();
983        handle
984            .add_route(route_def("lc-1", "timer:tick?period=50"))
985            .await
986            .expect("add route lc-1");
987
988        handle.start_route("lc-1").await.expect("start_route");
989        sleep(Duration::from_millis(20)).await;
990
991        handle.restart_route("lc-1").await.expect("restart_route");
992        sleep(Duration::from_millis(20)).await;
993
994        handle.suspend_route("lc-1").await.expect("suspend_route");
995        handle.resume_route("lc-1").await.expect("resume_route");
996        sleep(Duration::from_millis(20)).await;
997
998        handle.stop_route("lc-1").await.expect("stop_route");
999        handle.start_all_routes().await.expect("start_all_routes");
1000        sleep(Duration::from_millis(20)).await;
1001        handle.stop_all_routes().await.expect("stop_all_routes");
1002
1003        handle
1004            .start_route_reload("lc-1")
1005            .await
1006            .expect("start_route_reload");
1007        handle
1008            .stop_route_reload("lc-1")
1009            .await
1010            .expect("stop_route_reload");
1011
1012        handle.shutdown().await.expect("shutdown send");
1013        join_handle.await.expect("actor join");
1014    }
1015
1016    #[tokio::test]
1017    async fn spawn_supervision_restarts_route_on_crash() {
1018        let (handle, join_handle) = build_actor_with_components();
1019        handle
1020            .add_route(route_def("sup-1", "timer:tick?period=100"))
1021            .await
1022            .expect("add route sup-1");
1023        handle
1024            .start_route("sup-1")
1025            .await
1026            .expect("start_route sup-1");
1027
1028        let (crash_tx, crash_rx) = mpsc::channel(8);
1029        let supervision = spawn_supervision_task(
1030            handle.clone(),
1031            SupervisionConfig {
1032                initial_delay: Duration::from_millis(10),
1033                max_attempts: Some(2),
1034                ..SupervisionConfig::default()
1035            },
1036            None,
1037            crash_rx,
1038        );
1039
1040        crash_tx
1041            .send(CrashNotification {
1042                route_id: "sup-1".to_string(),
1043                error: "simulated".to_string(),
1044            })
1045            .await
1046            .expect("send crash notification");
1047
1048        sleep(Duration::from_millis(150)).await;
1049        drop(crash_tx);
1050        supervision.await.expect("supervision join");
1051
1052        handle.shutdown().await.expect("shutdown send");
1053        join_handle.await.expect("actor join");
1054    }
1055
1056    #[tokio::test]
1057    async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
1058        let (handle, join_handle) = build_actor_with_components();
1059        handle
1060            .add_route(route_def("sup-2", "timer:tick?period=100"))
1061            .await
1062            .expect("add route sup-2");
1063        handle
1064            .start_route("sup-2")
1065            .await
1066            .expect("start_route sup-2");
1067
1068        let (crash_tx, crash_rx) = mpsc::channel(8);
1069        let supervision = spawn_supervision_task(
1070            handle.clone(),
1071            SupervisionConfig {
1072                initial_delay: Duration::from_millis(10),
1073                max_attempts: Some(1),
1074                ..SupervisionConfig::default()
1075            },
1076            None,
1077            crash_rx,
1078        );
1079
1080        crash_tx
1081            .send(CrashNotification {
1082                route_id: "sup-2".to_string(),
1083                error: "attempt-1".to_string(),
1084            })
1085            .await
1086            .expect("send crash attempt-1");
1087        crash_tx
1088            .send(CrashNotification {
1089                route_id: "sup-2".to_string(),
1090                error: "attempt-2".to_string(),
1091            })
1092            .await
1093            .expect("send crash attempt-2");
1094
1095        sleep(Duration::from_millis(200)).await;
1096        drop(crash_tx);
1097        supervision.await.expect("supervision join");
1098
1099        handle.shutdown().await.expect("shutdown send");
1100        join_handle.await.expect("actor join");
1101    }
1102
1103    #[tokio::test]
1104    async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
1105        let (handle, join_handle) = build_empty_actor();
1106
1107        handle
1108            .try_set_runtime_handle(Arc::new(NoopRuntime))
1109            .expect("try_set_runtime_handle should succeed");
1110
1111        handle.shutdown().await.expect("shutdown send");
1112        join_handle.await.expect("actor join");
1113    }
1114
1115    #[tokio::test]
1116    async fn shutdown_returns_error_when_actor_stopped() {
1117        let (tx, rx) = mpsc::channel(1);
1118        drop(rx);
1119
1120        let handle = RouteControllerHandle { tx };
1121        let result = handle.shutdown().await;
1122
1123        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
1124    }
1125}