Skip to main content

camel_core/lifecycle/adapters/
controller_actor.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::{
6    BoxProcessor, CamelError, FunctionInvoker, MetricsCollector, RouteController, RuntimeHandle,
7    SupervisionConfig,
8};
9use tokio::sync::{mpsc, oneshot};
10use tokio::task::JoinHandle;
11use tracing::{error, info};
12
13use super::route_controller::{CrashNotification, DefaultRouteController, PreparedRoute};
14use crate::lifecycle::application::route_definition::RouteDefinition;
15use crate::shared::observability::domain::TracerConfig;
16
17pub(crate) enum RouteControllerCommand {
18    StartRoute {
19        route_id: String,
20        reply: oneshot::Sender<Result<(), CamelError>>,
21    },
22    StopRoute {
23        route_id: String,
24        reply: oneshot::Sender<Result<(), CamelError>>,
25    },
26    RestartRoute {
27        route_id: String,
28        reply: oneshot::Sender<Result<(), CamelError>>,
29    },
30    SuspendRoute {
31        route_id: String,
32        reply: oneshot::Sender<Result<(), CamelError>>,
33    },
34    ResumeRoute {
35        route_id: String,
36        reply: oneshot::Sender<Result<(), CamelError>>,
37    },
38    StartAllRoutes {
39        reply: oneshot::Sender<Result<(), CamelError>>,
40    },
41    StopAllRoutes {
42        reply: oneshot::Sender<Result<(), CamelError>>,
43    },
44    AddRoute {
45        definition: RouteDefinition,
46        reply: oneshot::Sender<Result<(), CamelError>>,
47    },
48    RemoveRoute {
49        route_id: String,
50        reply: oneshot::Sender<Result<(), CamelError>>,
51    },
52    SwapPipeline {
53        route_id: String,
54        pipeline: BoxProcessor,
55        reply: oneshot::Sender<Result<(), CamelError>>,
56    },
57    CompileRouteDefinition {
58        definition: RouteDefinition,
59        reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
60    },
61    CompileRouteDefinitionWithGeneration {
62        definition: RouteDefinition,
63        generation: u64,
64        reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
65    },
66    PrepareRouteDefinitionWithGeneration {
67        definition: RouteDefinition,
68        generation: u64,
69        reply: oneshot::Sender<Result<PreparedRoute, CamelError>>,
70    },
71    InsertPreparedRoute {
72        prepared: PreparedRoute,
73        reply: oneshot::Sender<Result<(), CamelError>>,
74    },
75    RemoveRoutePreservingFunctions {
76        route_id: String,
77        reply: oneshot::Sender<Result<(), CamelError>>,
78    },
79    RouteFromUri {
80        route_id: String,
81        reply: oneshot::Sender<Option<String>>,
82    },
83    SetErrorHandler {
84        config: ErrorHandlerConfig,
85    },
86    SetTracerConfig {
87        config: TracerConfig,
88    },
89    RouteCount {
90        reply: oneshot::Sender<usize>,
91    },
92    InFlightCount {
93        route_id: String,
94        reply: oneshot::Sender<Option<u64>>,
95    },
96    RouteExists {
97        route_id: String,
98        reply: oneshot::Sender<bool>,
99    },
100    RouteIds {
101        reply: oneshot::Sender<Vec<String>>,
102    },
103    AutoStartupRouteIds {
104        reply: oneshot::Sender<Vec<String>>,
105    },
106    ShutdownRouteIds {
107        reply: oneshot::Sender<Vec<String>>,
108    },
109    GetPipeline {
110        route_id: String,
111        reply: oneshot::Sender<Option<BoxProcessor>>,
112    },
113    StartRouteReload {
114        route_id: String,
115        reply: oneshot::Sender<Result<(), CamelError>>,
116    },
117    StopRouteReload {
118        route_id: String,
119        reply: oneshot::Sender<Result<(), CamelError>>,
120    },
121    SetRuntimeHandle {
122        runtime: Arc<dyn RuntimeHandle>,
123    },
124    SetFunctionInvoker {
125        invoker: Arc<dyn FunctionInvoker>,
126    },
127    RouteSourceHash {
128        route_id: String,
129        reply: oneshot::Sender<Option<u64>>,
130    },
131    Shutdown,
132}
133
134#[derive(Clone)]
135pub struct RouteControllerHandle {
136    tx: mpsc::Sender<RouteControllerCommand>,
137}
138
139impl RouteControllerHandle {
140    pub async fn start_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
141        let (reply_tx, reply_rx) = oneshot::channel();
142        self.tx
143            .send(RouteControllerCommand::StartRoute {
144                route_id: route_id.into(),
145                reply: reply_tx,
146            })
147            .await
148            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
149        reply_rx
150            .await
151            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
152    }
153
154    pub async fn stop_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
155        let (reply_tx, reply_rx) = oneshot::channel();
156        self.tx
157            .send(RouteControllerCommand::StopRoute {
158                route_id: route_id.into(),
159                reply: reply_tx,
160            })
161            .await
162            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
163        reply_rx
164            .await
165            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
166    }
167
168    pub async fn restart_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
169        let (reply_tx, reply_rx) = oneshot::channel();
170        self.tx
171            .send(RouteControllerCommand::RestartRoute {
172                route_id: route_id.into(),
173                reply: reply_tx,
174            })
175            .await
176            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
177        reply_rx
178            .await
179            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
180    }
181
182    pub async fn suspend_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
183        let (reply_tx, reply_rx) = oneshot::channel();
184        self.tx
185            .send(RouteControllerCommand::SuspendRoute {
186                route_id: route_id.into(),
187                reply: reply_tx,
188            })
189            .await
190            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
191        reply_rx
192            .await
193            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
194    }
195
196    pub async fn resume_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
197        let (reply_tx, reply_rx) = oneshot::channel();
198        self.tx
199            .send(RouteControllerCommand::ResumeRoute {
200                route_id: route_id.into(),
201                reply: reply_tx,
202            })
203            .await
204            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
205        reply_rx
206            .await
207            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
208    }
209
210    pub async fn start_all_routes(&self) -> Result<(), CamelError> {
211        let (reply_tx, reply_rx) = oneshot::channel();
212        self.tx
213            .send(RouteControllerCommand::StartAllRoutes { reply: reply_tx })
214            .await
215            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
216        reply_rx
217            .await
218            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
219    }
220
221    pub async fn stop_all_routes(&self) -> Result<(), CamelError> {
222        let (reply_tx, reply_rx) = oneshot::channel();
223        self.tx
224            .send(RouteControllerCommand::StopAllRoutes { reply: reply_tx })
225            .await
226            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
227        reply_rx
228            .await
229            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
230    }
231
232    pub async fn add_route(&self, definition: RouteDefinition) -> Result<(), CamelError> {
233        let (reply_tx, reply_rx) = oneshot::channel();
234        self.tx
235            .send(RouteControllerCommand::AddRoute {
236                definition,
237                reply: reply_tx,
238            })
239            .await
240            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
241        reply_rx
242            .await
243            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
244    }
245
246    pub async fn remove_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
247        let (reply_tx, reply_rx) = oneshot::channel();
248        self.tx
249            .send(RouteControllerCommand::RemoveRoute {
250                route_id: route_id.into(),
251                reply: reply_tx,
252            })
253            .await
254            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
255        reply_rx
256            .await
257            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
258    }
259
260    pub async fn swap_pipeline(
261        &self,
262        route_id: impl Into<String>,
263        pipeline: BoxProcessor,
264    ) -> Result<(), CamelError> {
265        let (reply_tx, reply_rx) = oneshot::channel();
266        self.tx
267            .send(RouteControllerCommand::SwapPipeline {
268                route_id: route_id.into(),
269                pipeline,
270                reply: reply_tx,
271            })
272            .await
273            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
274        reply_rx
275            .await
276            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
277    }
278
279    pub async fn compile_route_definition(
280        &self,
281        definition: RouteDefinition,
282    ) -> Result<BoxProcessor, CamelError> {
283        let (reply_tx, reply_rx) = oneshot::channel();
284        self.tx
285            .send(RouteControllerCommand::CompileRouteDefinition {
286                definition,
287                reply: reply_tx,
288            })
289            .await
290            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
291        reply_rx
292            .await
293            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
294    }
295
296    pub async fn compile_route_definition_with_generation(
297        &self,
298        definition: RouteDefinition,
299        generation: u64,
300    ) -> Result<BoxProcessor, CamelError> {
301        let (reply_tx, reply_rx) = oneshot::channel();
302        self.tx
303            .send(
304                RouteControllerCommand::CompileRouteDefinitionWithGeneration {
305                    definition,
306                    generation,
307                    reply: reply_tx,
308                },
309            )
310            .await
311            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
312        reply_rx
313            .await
314            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
315    }
316
317    pub(crate) async fn prepare_route_definition_with_generation(
318        &self,
319        definition: RouteDefinition,
320        generation: u64,
321    ) -> Result<PreparedRoute, CamelError> {
322        let (reply_tx, reply_rx) = oneshot::channel();
323        self.tx
324            .send(
325                RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
326                    definition,
327                    generation,
328                    reply: reply_tx,
329                },
330            )
331            .await
332            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
333        reply_rx
334            .await
335            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
336    }
337
338    pub(crate) async fn insert_prepared_route(
339        &self,
340        prepared: PreparedRoute,
341    ) -> Result<(), CamelError> {
342        let (reply_tx, reply_rx) = oneshot::channel();
343        self.tx
344            .send(RouteControllerCommand::InsertPreparedRoute {
345                prepared,
346                reply: reply_tx,
347            })
348            .await
349            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
350        reply_rx
351            .await
352            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
353    }
354
355    pub async fn remove_route_preserving_functions(
356        &self,
357        route_id: String,
358    ) -> Result<(), CamelError> {
359        let (reply_tx, reply_rx) = oneshot::channel();
360        self.tx
361            .send(RouteControllerCommand::RemoveRoutePreservingFunctions {
362                route_id,
363                reply: reply_tx,
364            })
365            .await
366            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
367        reply_rx
368            .await
369            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
370    }
371
372    pub async fn route_from_uri(
373        &self,
374        route_id: impl Into<String>,
375    ) -> Result<Option<String>, CamelError> {
376        let (reply_tx, reply_rx) = oneshot::channel();
377        self.tx
378            .send(RouteControllerCommand::RouteFromUri {
379                route_id: route_id.into(),
380                reply: reply_tx,
381            })
382            .await
383            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
384        reply_rx
385            .await
386            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
387    }
388
389    pub async fn route_count(&self) -> Result<usize, CamelError> {
390        let (reply_tx, reply_rx) = oneshot::channel();
391        self.tx
392            .send(RouteControllerCommand::RouteCount { reply: reply_tx })
393            .await
394            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
395        reply_rx
396            .await
397            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
398    }
399
400    pub async fn in_flight_count(
401        &self,
402        route_id: impl Into<String>,
403    ) -> Result<Option<u64>, CamelError> {
404        let (reply_tx, reply_rx) = oneshot::channel();
405        self.tx
406            .send(RouteControllerCommand::InFlightCount {
407                route_id: route_id.into(),
408                reply: reply_tx,
409            })
410            .await
411            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
412        reply_rx
413            .await
414            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
415    }
416
417    pub async fn route_exists(&self, route_id: impl Into<String>) -> Result<bool, CamelError> {
418        let (reply_tx, reply_rx) = oneshot::channel();
419        self.tx
420            .send(RouteControllerCommand::RouteExists {
421                route_id: route_id.into(),
422                reply: reply_tx,
423            })
424            .await
425            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
426        reply_rx
427            .await
428            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
429    }
430
431    pub async fn route_ids(&self) -> Result<Vec<String>, CamelError> {
432        let (reply_tx, reply_rx) = oneshot::channel();
433        self.tx
434            .send(RouteControllerCommand::RouteIds { reply: reply_tx })
435            .await
436            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
437        reply_rx
438            .await
439            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
440    }
441
442    pub async fn auto_startup_route_ids(&self) -> Result<Vec<String>, CamelError> {
443        let (reply_tx, reply_rx) = oneshot::channel();
444        self.tx
445            .send(RouteControllerCommand::AutoStartupRouteIds { reply: reply_tx })
446            .await
447            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
448        reply_rx
449            .await
450            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
451    }
452
453    pub async fn shutdown_route_ids(&self) -> Result<Vec<String>, CamelError> {
454        let (reply_tx, reply_rx) = oneshot::channel();
455        self.tx
456            .send(RouteControllerCommand::ShutdownRouteIds { reply: reply_tx })
457            .await
458            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
459        reply_rx
460            .await
461            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
462    }
463
464    pub async fn start_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
465        let (reply_tx, reply_rx) = oneshot::channel();
466        self.tx
467            .send(RouteControllerCommand::StartRouteReload {
468                route_id: route_id.into(),
469                reply: reply_tx,
470            })
471            .await
472            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
473        reply_rx
474            .await
475            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
476    }
477
478    pub async fn stop_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
479        let (reply_tx, reply_rx) = oneshot::channel();
480        self.tx
481            .send(RouteControllerCommand::StopRouteReload {
482                route_id: route_id.into(),
483                reply: reply_tx,
484            })
485            .await
486            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
487        reply_rx
488            .await
489            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
490    }
491
492    pub async fn get_pipeline(
493        &self,
494        route_id: impl Into<String>,
495    ) -> Result<Option<BoxProcessor>, CamelError> {
496        let (reply_tx, reply_rx) = oneshot::channel();
497        self.tx
498            .send(RouteControllerCommand::GetPipeline {
499                route_id: route_id.into(),
500                reply: reply_tx,
501            })
502            .await
503            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
504        reply_rx
505            .await
506            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
507    }
508
509    pub async fn set_error_handler(&self, config: ErrorHandlerConfig) -> Result<(), CamelError> {
510        self.tx
511            .send(RouteControllerCommand::SetErrorHandler { config })
512            .await
513            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
514    }
515
516    pub async fn set_tracer_config(&self, config: TracerConfig) -> Result<(), CamelError> {
517        self.tx
518            .send(RouteControllerCommand::SetTracerConfig { config })
519            .await
520            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
521    }
522
523    pub async fn set_runtime_handle(
524        &self,
525        runtime: Arc<dyn RuntimeHandle>,
526    ) -> Result<(), CamelError> {
527        self.tx
528            .send(RouteControllerCommand::SetRuntimeHandle { runtime })
529            .await
530            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
531    }
532
533    pub fn try_set_runtime_handle(
534        &self,
535        runtime: Arc<dyn RuntimeHandle>,
536    ) -> Result<(), CamelError> {
537        self.tx
538            .try_send(RouteControllerCommand::SetRuntimeHandle { runtime })
539            .map_err(|err| {
540                CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
541            })
542    }
543
544    pub async fn set_function_invoker(
545        &self,
546        invoker: Arc<dyn FunctionInvoker>,
547    ) -> Result<(), CamelError> {
548        self.tx
549            .send(RouteControllerCommand::SetFunctionInvoker { invoker })
550            .await
551            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
552    }
553
554    pub fn try_set_function_invoker(
555        &self,
556        invoker: Arc<dyn FunctionInvoker>,
557    ) -> Result<(), CamelError> {
558        self.tx
559            .try_send(RouteControllerCommand::SetFunctionInvoker { invoker })
560            .map_err(|err| {
561                CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
562            })
563    }
564
565    pub async fn route_source_hash(&self, route_id: impl Into<String>) -> Option<u64> {
566        let (reply_tx, reply_rx) = oneshot::channel();
567        self.tx
568            .send(RouteControllerCommand::RouteSourceHash {
569                route_id: route_id.into(),
570                reply: reply_tx,
571            })
572            .await
573            .ok()?;
574        reply_rx.await.ok()?
575    }
576
577    pub async fn shutdown(&self) -> Result<(), CamelError> {
578        self.tx
579            .send(RouteControllerCommand::Shutdown)
580            .await
581            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
582    }
583}
584
585pub fn spawn_controller_actor(
586    controller: DefaultRouteController,
587) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
588    let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
589    let handle = tokio::spawn(async move {
590        let mut controller = controller;
591        while let Some(cmd) = rx.recv().await {
592            match cmd {
593                RouteControllerCommand::StartRoute { route_id, reply } => {
594                    let _ = reply.send(controller.start_route(&route_id).await);
595                }
596                RouteControllerCommand::StopRoute { route_id, reply } => {
597                    let _ = reply.send(controller.stop_route(&route_id).await);
598                }
599                RouteControllerCommand::RestartRoute { route_id, reply } => {
600                    let _ = reply.send(controller.restart_route(&route_id).await);
601                }
602                RouteControllerCommand::SuspendRoute { route_id, reply } => {
603                    let _ = reply.send(controller.suspend_route(&route_id).await);
604                }
605                RouteControllerCommand::ResumeRoute { route_id, reply } => {
606                    let _ = reply.send(controller.resume_route(&route_id).await);
607                }
608                RouteControllerCommand::StartAllRoutes { reply } => {
609                    let _ = reply.send(controller.start_all_routes().await);
610                }
611                RouteControllerCommand::StopAllRoutes { reply } => {
612                    let _ = reply.send(controller.stop_all_routes().await);
613                }
614                RouteControllerCommand::AddRoute { definition, reply } => {
615                    let _ = reply.send(controller.add_route(definition).await);
616                }
617                RouteControllerCommand::RemoveRoute { route_id, reply } => {
618                    let _ = reply.send(controller.remove_route(&route_id).await);
619                }
620                RouteControllerCommand::SwapPipeline {
621                    route_id,
622                    pipeline,
623                    reply,
624                } => {
625                    let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
626                }
627                RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
628                    let _ = reply.send(controller.compile_route_definition(definition));
629                }
630                RouteControllerCommand::CompileRouteDefinitionWithGeneration {
631                    definition,
632                    generation,
633                    reply,
634                } => {
635                    let _ = reply.send(
636                        controller.compile_route_definition_with_generation(definition, generation),
637                    );
638                }
639                RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
640                    definition,
641                    generation,
642                    reply,
643                } => {
644                    let _ = reply.send(
645                        controller.prepare_route_definition_with_generation(definition, generation),
646                    );
647                }
648                RouteControllerCommand::InsertPreparedRoute { prepared, reply } => {
649                    let _ = reply.send(controller.insert_prepared_route(prepared));
650                }
651                RouteControllerCommand::RemoveRoutePreservingFunctions { route_id, reply } => {
652                    let _ = reply.send(
653                        controller
654                            .remove_route_preserving_functions(&route_id)
655                            .await,
656                    );
657                }
658                RouteControllerCommand::RouteFromUri { route_id, reply } => {
659                    let _ = reply.send(controller.route_from_uri(&route_id));
660                }
661                RouteControllerCommand::SetErrorHandler { config } => {
662                    controller.set_error_handler(config);
663                }
664                RouteControllerCommand::SetTracerConfig { config } => {
665                    controller.set_tracer_config(&config);
666                }
667                RouteControllerCommand::RouteCount { reply } => {
668                    let _ = reply.send(controller.route_count());
669                }
670                RouteControllerCommand::InFlightCount { route_id, reply } => {
671                    let _ = reply.send(controller.in_flight_count(&route_id));
672                }
673                RouteControllerCommand::RouteExists { route_id, reply } => {
674                    let _ = reply.send(controller.route_exists(&route_id));
675                }
676                RouteControllerCommand::RouteIds { reply } => {
677                    let _ = reply.send(controller.route_ids());
678                }
679                RouteControllerCommand::AutoStartupRouteIds { reply } => {
680                    let _ = reply.send(controller.auto_startup_route_ids());
681                }
682                RouteControllerCommand::ShutdownRouteIds { reply } => {
683                    let _ = reply.send(controller.shutdown_route_ids());
684                }
685                RouteControllerCommand::GetPipeline { route_id, reply } => {
686                    let _ = reply.send(controller.get_pipeline(&route_id));
687                }
688                RouteControllerCommand::StartRouteReload { route_id, reply } => {
689                    let _ = reply.send(controller.start_route_reload(&route_id).await);
690                }
691                RouteControllerCommand::StopRouteReload { route_id, reply } => {
692                    let _ = reply.send(controller.stop_route_reload(&route_id).await);
693                }
694                RouteControllerCommand::SetRuntimeHandle { runtime } => {
695                    controller.set_runtime_handle(runtime);
696                }
697                RouteControllerCommand::SetFunctionInvoker { invoker } => {
698                    controller.set_function_invoker(invoker);
699                }
700                RouteControllerCommand::RouteSourceHash { route_id, reply } => {
701                    let _ = reply.send(controller.route_source_hash(&route_id));
702                }
703                RouteControllerCommand::Shutdown => {
704                    break;
705                }
706            }
707        }
708    });
709    (RouteControllerHandle { tx }, handle)
710}
711
712pub fn spawn_supervision_task(
713    controller: RouteControllerHandle,
714    config: SupervisionConfig,
715    _metrics: Option<Arc<dyn MetricsCollector>>,
716    mut crash_rx: mpsc::Receiver<CrashNotification>,
717) -> JoinHandle<()> {
718    tokio::spawn(async move {
719        let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
720        let mut last_restart_time: std::collections::HashMap<String, Instant> =
721            std::collections::HashMap::new();
722        let mut currently_restarting: std::collections::HashSet<String> =
723            std::collections::HashSet::new();
724
725        info!("Supervision loop started");
726
727        while let Some(notification) = crash_rx.recv().await {
728            let route_id = notification.route_id;
729            if currently_restarting.contains(&route_id) {
730                continue;
731            }
732
733            if let Some(last_time) = last_restart_time.get(&route_id)
734                && last_time.elapsed() >= config.initial_delay
735            {
736                attempts.insert(route_id.clone(), 0);
737            }
738
739            let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
740            *current_attempt += 1;
741
742            if config
743                .max_attempts
744                .is_some_and(|max| *current_attempt > max)
745            {
746                error!(
747                    route_id = %route_id,
748                    attempts = *current_attempt,
749                    "Route exceeded max restart attempts, giving up"
750                );
751                continue;
752            }
753
754            let delay = config.next_delay(*current_attempt);
755            currently_restarting.insert(route_id.clone());
756            tokio::time::sleep(delay).await;
757
758            match controller.restart_route(route_id.clone()).await {
759                Ok(()) => {
760                    info!(route_id = %route_id, "Route restarted successfully");
761                    last_restart_time.insert(route_id.clone(), Instant::now());
762                }
763                Err(err) => {
764                    error!(route_id = %route_id, error = %err, "Failed to restart route");
765                }
766            }
767
768            currently_restarting.remove(&route_id);
769        }
770
771        info!("Supervision loop ended");
772    })
773}
774
775#[cfg(test)]
776mod tests {
777    use super::{
778        RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
779        spawn_supervision_task,
780    };
781    use crate::lifecycle::adapters::route_controller::{CrashNotification, DefaultRouteController};
782    use crate::lifecycle::application::route_definition::RouteDefinition;
783    use crate::shared::components::domain::Registry;
784    use crate::shared::observability::domain::TracerConfig;
785    use camel_api::function::PrepareToken;
786    use camel_api::{
787        CamelError, ErrorHandlerConfig, Exchange, ExchangePatch, FunctionDefinition, FunctionDiff,
788        FunctionId, FunctionInvocationError, FunctionInvoker, FunctionInvokerSync, RuntimeCommand,
789        RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult,
790        SupervisionConfig,
791    };
792    use std::sync::Arc;
793    use std::time::Duration;
794    use tokio::sync::mpsc;
795    use tokio::time::sleep;
796
797    fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
798        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
799        {
800            let mut guard = registry.lock().expect("lock");
801            guard.register(std::sync::Arc::new(
802                camel_component_timer::TimerComponent::new(),
803            ));
804            guard.register(std::sync::Arc::new(
805                camel_component_mock::MockComponent::new(),
806            ));
807        }
808        let controller = DefaultRouteController::new(
809            Arc::clone(&registry),
810            Arc::new(camel_api::NoopPlatformService::default()),
811        );
812        spawn_controller_actor(controller)
813    }
814
815    fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
816        let controller = DefaultRouteController::new(
817            Arc::new(std::sync::Mutex::new(Registry::new())),
818            Arc::new(camel_api::NoopPlatformService::default()),
819        );
820        spawn_controller_actor(controller)
821    }
822
823    fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
824        RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
825    }
826
827    struct NoopRuntime;
828    struct NoopInvoker;
829
830    #[async_trait::async_trait]
831    impl RuntimeCommandBus for NoopRuntime {
832        async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
833            Ok(RuntimeCommandResult::Accepted)
834        }
835    }
836
837    #[async_trait::async_trait]
838    impl RuntimeQueryBus for NoopRuntime {
839        async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
840            Ok(match query {
841                RuntimeQuery::GetRouteStatus { route_id }
842                | RuntimeQuery::InFlightCount { route_id } => {
843                    RuntimeQueryResult::RouteNotFound { route_id }
844                }
845                RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
846                    route_ids: Vec::new(),
847                },
848            })
849        }
850    }
851
852    impl FunctionInvokerSync for NoopInvoker {
853        fn stage_pending(
854            &self,
855            _def: FunctionDefinition,
856            _route_id: Option<&str>,
857            _generation: u64,
858        ) {
859        }
860        fn discard_staging(&self, _generation: u64) {}
861        fn begin_reload(&self) -> u64 {
862            1
863        }
864        fn function_refs_for_route(&self, _route_id: &str) -> Vec<(FunctionId, Option<String>)> {
865            vec![]
866        }
867        fn staged_refs_for_route(
868            &self,
869            _route_id: &str,
870            _generation: u64,
871        ) -> Vec<(FunctionId, Option<String>)> {
872            vec![]
873        }
874        fn staged_defs_for_route(
875            &self,
876            _route_id: &str,
877            _generation: u64,
878        ) -> Vec<(FunctionDefinition, Option<String>)> {
879            vec![]
880        }
881    }
882
883    #[async_trait::async_trait]
884    impl FunctionInvoker for NoopInvoker {
885        async fn register(
886            &self,
887            _def: FunctionDefinition,
888            _route_id: Option<&str>,
889        ) -> Result<(), FunctionInvocationError> {
890            Ok(())
891        }
892        async fn unregister(
893            &self,
894            _id: &FunctionId,
895            _route_id: Option<&str>,
896        ) -> Result<(), FunctionInvocationError> {
897            Ok(())
898        }
899        async fn invoke(
900            &self,
901            _id: &FunctionId,
902            _exchange: &Exchange,
903        ) -> Result<ExchangePatch, FunctionInvocationError> {
904            Ok(ExchangePatch::default())
905        }
906        async fn prepare_reload(
907            &self,
908            _diff: FunctionDiff,
909            _generation: u64,
910        ) -> Result<PrepareToken, FunctionInvocationError> {
911            Ok(PrepareToken::default())
912        }
913        async fn finalize_reload(
914            &self,
915            _diff: &FunctionDiff,
916            _generation: u64,
917        ) -> Result<(), FunctionInvocationError> {
918            Ok(())
919        }
920        async fn rollback_reload(
921            &self,
922            _token: PrepareToken,
923            _generation: u64,
924        ) -> Result<(), FunctionInvocationError> {
925            Ok(())
926        }
927        async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
928            Ok(())
929        }
930    }
931
932    #[tokio::test]
933    async fn start_route_sends_command_and_returns_reply() {
934        let (tx, mut rx) = mpsc::channel(1);
935        let handle = RouteControllerHandle { tx };
936
937        let task = tokio::spawn(async move { handle.start_route("route-a").await });
938
939        let command = rx.recv().await.expect("command should be received");
940        match command {
941            RouteControllerCommand::StartRoute { route_id, reply } => {
942                assert_eq!(route_id, "route-a");
943                let _ = reply.send(Ok(()));
944            }
945            _ => panic!("unexpected command variant"),
946        }
947
948        let result = task.await.expect("join should succeed");
949        assert!(result.is_ok());
950    }
951
952    #[tokio::test]
953    async fn start_route_returns_error_when_actor_stops() {
954        let (tx, rx) = mpsc::channel(1);
955        drop(rx);
956
957        let handle = RouteControllerHandle { tx };
958        let result = handle.start_route("route-a").await;
959
960        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
961    }
962
963    #[tokio::test]
964    async fn spawn_controller_actor_processes_commands_and_shutdown() {
965        let controller = DefaultRouteController::new(
966            Arc::new(std::sync::Mutex::new(Registry::new())),
967            Arc::new(camel_api::NoopPlatformService::default()),
968        );
969        let (handle, join_handle) = spawn_controller_actor(controller);
970
971        assert_eq!(handle.route_count().await.expect("route_count"), 0);
972        assert_eq!(
973            handle.route_ids().await.expect("route_ids"),
974            Vec::<String>::new()
975        );
976
977        handle.shutdown().await.expect("shutdown send");
978        join_handle.await.expect("actor join");
979    }
980
981    #[tokio::test]
982    async fn actor_handle_introspection_and_mutation_commands() {
983        let (handle, join_handle) = build_actor_with_components();
984        let definition = route_def("h-1", "timer:tick?period=100");
985
986        handle.add_route(definition).await.expect("add route");
987        assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
988        assert!(
989            !handle
990                .route_exists("no-such")
991                .await
992                .expect("route exists no-such")
993        );
994
995        let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
996        assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
997        assert_eq!(handle.route_count().await.expect("route_count"), 1);
998
999        let auto_ids = handle
1000            .auto_startup_route_ids()
1001            .await
1002            .expect("auto_startup_route_ids");
1003        assert!(auto_ids.iter().any(|id| id == "h-1"));
1004
1005        let shutdown_ids = handle
1006            .shutdown_route_ids()
1007            .await
1008            .expect("shutdown_route_ids");
1009        assert!(shutdown_ids.iter().any(|id| id == "h-1"));
1010
1011        let compiled = handle
1012            .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
1013            .await
1014            .expect("compile_route_definition");
1015
1016        assert!(
1017            handle
1018                .get_pipeline("h-1")
1019                .await
1020                .expect("get_pipeline")
1021                .is_some()
1022        );
1023        handle
1024            .swap_pipeline("h-1", compiled)
1025            .await
1026            .expect("swap_pipeline");
1027
1028        let _ = handle
1029            .in_flight_count("h-1")
1030            .await
1031            .expect("in_flight_count");
1032        let _ = handle.route_source_hash("h-1").await;
1033
1034        handle
1035            .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
1036            .await
1037            .expect("set_error_handler");
1038        handle
1039            .set_tracer_config(TracerConfig::default())
1040            .await
1041            .expect("set_tracer_config");
1042        handle
1043            .set_runtime_handle(Arc::new(NoopRuntime))
1044            .await
1045            .expect("set_runtime_handle");
1046
1047        handle.remove_route("h-1").await.expect("remove_route");
1048        assert_eq!(
1049            handle
1050                .route_count()
1051                .await
1052                .expect("route_count after remove"),
1053            0
1054        );
1055        handle
1056            .stop_all_routes()
1057            .await
1058            .expect("stop_all_routes on empty");
1059
1060        handle.shutdown().await.expect("shutdown send");
1061        join_handle.await.expect("actor join");
1062    }
1063
1064    #[tokio::test]
1065    async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
1066        let (handle, join_handle) = build_actor_with_components();
1067        handle
1068            .add_route(route_def("lc-1", "timer:tick?period=50"))
1069            .await
1070            .expect("add route lc-1");
1071
1072        handle.start_route("lc-1").await.expect("start_route");
1073        sleep(Duration::from_millis(20)).await;
1074
1075        handle.restart_route("lc-1").await.expect("restart_route");
1076        sleep(Duration::from_millis(20)).await;
1077
1078        handle.suspend_route("lc-1").await.expect("suspend_route");
1079        handle.resume_route("lc-1").await.expect("resume_route");
1080        sleep(Duration::from_millis(20)).await;
1081
1082        handle.stop_route("lc-1").await.expect("stop_route");
1083        handle.start_all_routes().await.expect("start_all_routes");
1084        sleep(Duration::from_millis(20)).await;
1085        handle.stop_all_routes().await.expect("stop_all_routes");
1086
1087        handle
1088            .start_route_reload("lc-1")
1089            .await
1090            .expect("start_route_reload");
1091        handle
1092            .stop_route_reload("lc-1")
1093            .await
1094            .expect("stop_route_reload");
1095
1096        handle.shutdown().await.expect("shutdown send");
1097        join_handle.await.expect("actor join");
1098    }
1099
1100    #[tokio::test]
1101    async fn spawn_supervision_restarts_route_on_crash() {
1102        let (handle, join_handle) = build_actor_with_components();
1103        handle
1104            .add_route(route_def("sup-1", "timer:tick?period=100"))
1105            .await
1106            .expect("add route sup-1");
1107        handle
1108            .start_route("sup-1")
1109            .await
1110            .expect("start_route sup-1");
1111
1112        let (crash_tx, crash_rx) = mpsc::channel(8);
1113        let supervision = spawn_supervision_task(
1114            handle.clone(),
1115            SupervisionConfig {
1116                initial_delay: Duration::from_millis(10),
1117                max_attempts: Some(2),
1118                ..SupervisionConfig::default()
1119            },
1120            None,
1121            crash_rx,
1122        );
1123
1124        crash_tx
1125            .send(CrashNotification {
1126                route_id: "sup-1".to_string(),
1127                error: "simulated".to_string(),
1128            })
1129            .await
1130            .expect("send crash notification");
1131
1132        sleep(Duration::from_millis(150)).await;
1133        drop(crash_tx);
1134        supervision.await.expect("supervision join");
1135
1136        handle.shutdown().await.expect("shutdown send");
1137        join_handle.await.expect("actor join");
1138    }
1139
1140    #[tokio::test]
1141    async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
1142        let (handle, join_handle) = build_actor_with_components();
1143        handle
1144            .add_route(route_def("sup-2", "timer:tick?period=100"))
1145            .await
1146            .expect("add route sup-2");
1147        handle
1148            .start_route("sup-2")
1149            .await
1150            .expect("start_route sup-2");
1151
1152        let (crash_tx, crash_rx) = mpsc::channel(8);
1153        let supervision = spawn_supervision_task(
1154            handle.clone(),
1155            SupervisionConfig {
1156                initial_delay: Duration::from_millis(10),
1157                max_attempts: Some(1),
1158                ..SupervisionConfig::default()
1159            },
1160            None,
1161            crash_rx,
1162        );
1163
1164        crash_tx
1165            .send(CrashNotification {
1166                route_id: "sup-2".to_string(),
1167                error: "attempt-1".to_string(),
1168            })
1169            .await
1170            .expect("send crash attempt-1");
1171        crash_tx
1172            .send(CrashNotification {
1173                route_id: "sup-2".to_string(),
1174                error: "attempt-2".to_string(),
1175            })
1176            .await
1177            .expect("send crash attempt-2");
1178
1179        sleep(Duration::from_millis(200)).await;
1180        drop(crash_tx);
1181        supervision.await.expect("supervision join");
1182
1183        handle.shutdown().await.expect("shutdown send");
1184        join_handle.await.expect("actor join");
1185    }
1186
1187    #[tokio::test]
1188    async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
1189        let (handle, join_handle) = build_empty_actor();
1190
1191        handle
1192            .try_set_runtime_handle(Arc::new(NoopRuntime))
1193            .expect("try_set_runtime_handle should succeed");
1194
1195        handle.shutdown().await.expect("shutdown send");
1196        join_handle.await.expect("actor join");
1197    }
1198
1199    #[tokio::test]
1200    async fn shutdown_returns_error_when_actor_stopped() {
1201        let (tx, rx) = mpsc::channel(1);
1202        drop(rx);
1203
1204        let handle = RouteControllerHandle { tx };
1205        let result = handle.shutdown().await;
1206
1207        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
1208    }
1209
1210    #[tokio::test]
1211    async fn handle_methods_send_expected_commands_and_receive_replies() {
1212        let (tx, mut rx) = mpsc::channel(16);
1213        let handle = RouteControllerHandle { tx };
1214
1215        let stop_task = tokio::spawn({
1216            let h = handle.clone();
1217            async move { h.stop_route("r-1").await }
1218        });
1219        let cmd = rx.recv().await.expect("stop command");
1220        match cmd {
1221            RouteControllerCommand::StopRoute { route_id, reply } => {
1222                assert_eq!(route_id, "r-1");
1223                let _ = reply.send(Ok(()));
1224            }
1225            _ => panic!("unexpected command"),
1226        }
1227        assert!(stop_task.await.expect("join").is_ok());
1228
1229        let exists_task = tokio::spawn({
1230            let h = handle.clone();
1231            async move { h.route_exists("r-2").await }
1232        });
1233        let cmd = rx.recv().await.expect("exists command");
1234        match cmd {
1235            RouteControllerCommand::RouteExists { route_id, reply } => {
1236                assert_eq!(route_id, "r-2");
1237                let _ = reply.send(true);
1238            }
1239            _ => panic!("unexpected command"),
1240        }
1241        assert_eq!(exists_task.await.expect("join").expect("ok"), true);
1242
1243        let hash_task = tokio::spawn({
1244            let h = handle.clone();
1245            async move { h.route_source_hash("r-3").await }
1246        });
1247        let cmd = rx.recv().await.expect("hash command");
1248        match cmd {
1249            RouteControllerCommand::RouteSourceHash { route_id, reply } => {
1250                assert_eq!(route_id, "r-3");
1251                let _ = reply.send(Some(77));
1252            }
1253            _ => panic!("unexpected command"),
1254        }
1255        assert_eq!(hash_task.await.expect("join"), Some(77));
1256    }
1257
1258    #[tokio::test]
1259    async fn handle_methods_error_on_dropped_reply_channel() {
1260        let (tx, mut rx) = mpsc::channel(16);
1261        let handle = RouteControllerHandle { tx };
1262
1263        let count_task = tokio::spawn({
1264            let h = handle.clone();
1265            async move { h.route_count().await }
1266        });
1267        let cmd = rx.recv().await.expect("route_count command");
1268        match cmd {
1269            RouteControllerCommand::RouteCount { reply } => drop(reply),
1270            _ => panic!("unexpected command"),
1271        }
1272        assert!(matches!(
1273            count_task.await.expect("join"),
1274            Err(CamelError::ProcessorError(_))
1275        ));
1276
1277        let stop_task = tokio::spawn({
1278            let h = handle.clone();
1279            async move { h.stop_route("x").await }
1280        });
1281        let cmd = rx.recv().await.expect("stop command");
1282        match cmd {
1283            RouteControllerCommand::StopRoute { reply, .. } => drop(reply),
1284            _ => panic!("unexpected command"),
1285        }
1286        assert!(matches!(
1287            stop_task.await.expect("join"),
1288            Err(CamelError::ProcessorError(_))
1289        ));
1290
1291        let maybe_hash = tokio::spawn({
1292            let h = handle.clone();
1293            async move { h.route_source_hash("x").await }
1294        });
1295        let cmd = rx.recv().await.expect("hash command");
1296        match cmd {
1297            RouteControllerCommand::RouteSourceHash { reply, .. } => drop(reply),
1298            _ => panic!("unexpected command"),
1299        }
1300        assert_eq!(maybe_hash.await.expect("join"), None);
1301    }
1302
1303    #[test]
1304    fn try_set_function_invoker_returns_mailbox_full() {
1305        let (tx, mut rx) = mpsc::channel(1);
1306        tx.try_send(RouteControllerCommand::Shutdown)
1307            .expect("fill mailbox");
1308        let handle = RouteControllerHandle { tx };
1309
1310        let result = handle.try_set_function_invoker(Arc::new(NoopInvoker));
1311        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
1312
1313        rx.try_recv().expect("mailbox still has first message");
1314    }
1315}