Skip to main content

camel_core/lifecycle/adapters/
controller_actor.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::{
6    BoxProcessor, CamelError, FunctionInvoker, MetricsCollector, RouteController, RuntimeHandle,
7    SupervisionConfig,
8};
9use tokio::sync::{mpsc, oneshot};
10use tokio::task::JoinHandle;
11use tracing::{error, info};
12
13use super::route_controller::{CrashNotification, DefaultRouteController, PreparedRoute};
14use crate::lifecycle::application::route_definition::RouteDefinition;
15use crate::shared::observability::domain::TracerConfig;
16
17pub(crate) enum RouteControllerCommand {
18    StartRoute {
19        route_id: String,
20        reply: oneshot::Sender<Result<(), CamelError>>,
21    },
22    StopRoute {
23        route_id: String,
24        reply: oneshot::Sender<Result<(), CamelError>>,
25    },
26    RestartRoute {
27        route_id: String,
28        reply: oneshot::Sender<Result<(), CamelError>>,
29    },
30    SuspendRoute {
31        route_id: String,
32        reply: oneshot::Sender<Result<(), CamelError>>,
33    },
34    ResumeRoute {
35        route_id: String,
36        reply: oneshot::Sender<Result<(), CamelError>>,
37    },
38    StartAllRoutes {
39        reply: oneshot::Sender<Result<(), CamelError>>,
40    },
41    StopAllRoutes {
42        reply: oneshot::Sender<Result<(), CamelError>>,
43    },
44    AddRoute {
45        definition: RouteDefinition,
46        reply: oneshot::Sender<Result<(), CamelError>>,
47    },
48    RemoveRoute {
49        route_id: String,
50        reply: oneshot::Sender<Result<(), CamelError>>,
51    },
52    SwapPipeline {
53        route_id: String,
54        pipeline: BoxProcessor,
55        reply: oneshot::Sender<Result<(), CamelError>>,
56    },
57    CompileRouteDefinition {
58        definition: RouteDefinition,
59        reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
60    },
61    CompileRouteDefinitionWithGeneration {
62        definition: RouteDefinition,
63        generation: u64,
64        reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
65    },
66    PrepareRouteDefinitionWithGeneration {
67        definition: RouteDefinition,
68        generation: u64,
69        reply: oneshot::Sender<Result<PreparedRoute, CamelError>>,
70    },
71    InsertPreparedRoute {
72        prepared: PreparedRoute,
73        reply: oneshot::Sender<Result<(), CamelError>>,
74    },
75    RemoveRoutePreservingFunctions {
76        route_id: String,
77        reply: oneshot::Sender<Result<(), CamelError>>,
78    },
79    RouteFromUri {
80        route_id: String,
81        reply: oneshot::Sender<Option<String>>,
82    },
83    SetErrorHandler {
84        config: ErrorHandlerConfig,
85    },
86    SetTracerConfig {
87        config: TracerConfig,
88    },
89    RouteCount {
90        reply: oneshot::Sender<usize>,
91    },
92    InFlightCount {
93        route_id: String,
94        reply: oneshot::Sender<Option<u64>>,
95    },
96    RouteExists {
97        route_id: String,
98        reply: oneshot::Sender<bool>,
99    },
100    RouteIds {
101        reply: oneshot::Sender<Vec<String>>,
102    },
103    AutoStartupRouteIds {
104        reply: oneshot::Sender<Vec<String>>,
105    },
106    ShutdownRouteIds {
107        reply: oneshot::Sender<Vec<String>>,
108    },
109    GetPipeline {
110        route_id: String,
111        reply: oneshot::Sender<Option<BoxProcessor>>,
112    },
113    StartRouteReload {
114        route_id: String,
115        reply: oneshot::Sender<Result<(), CamelError>>,
116    },
117    StopRouteReload {
118        route_id: String,
119        reply: oneshot::Sender<Result<(), CamelError>>,
120    },
121    SetRuntimeHandle {
122        runtime: Arc<dyn RuntimeHandle>,
123    },
124    SetFunctionInvoker {
125        invoker: Arc<dyn FunctionInvoker>,
126    },
127    RouteSourceHash {
128        route_id: String,
129        reply: oneshot::Sender<Option<u64>>,
130    },
131    Shutdown,
132}
133
134#[derive(Clone)]
135pub struct RouteControllerHandle {
136    tx: mpsc::Sender<RouteControllerCommand>,
137}
138
139impl RouteControllerHandle {
140    pub async fn start_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
141        let (reply_tx, reply_rx) = oneshot::channel();
142        self.tx
143            .send(RouteControllerCommand::StartRoute {
144                route_id: route_id.into(),
145                reply: reply_tx,
146            })
147            .await
148            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
149        reply_rx
150            .await
151            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
152    }
153
154    pub async fn stop_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
155        let (reply_tx, reply_rx) = oneshot::channel();
156        self.tx
157            .send(RouteControllerCommand::StopRoute {
158                route_id: route_id.into(),
159                reply: reply_tx,
160            })
161            .await
162            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
163        reply_rx
164            .await
165            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
166    }
167
168    pub async fn restart_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
169        let (reply_tx, reply_rx) = oneshot::channel();
170        self.tx
171            .send(RouteControllerCommand::RestartRoute {
172                route_id: route_id.into(),
173                reply: reply_tx,
174            })
175            .await
176            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
177        reply_rx
178            .await
179            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
180    }
181
182    pub async fn suspend_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
183        let (reply_tx, reply_rx) = oneshot::channel();
184        self.tx
185            .send(RouteControllerCommand::SuspendRoute {
186                route_id: route_id.into(),
187                reply: reply_tx,
188            })
189            .await
190            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
191        reply_rx
192            .await
193            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
194    }
195
196    pub async fn resume_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
197        let (reply_tx, reply_rx) = oneshot::channel();
198        self.tx
199            .send(RouteControllerCommand::ResumeRoute {
200                route_id: route_id.into(),
201                reply: reply_tx,
202            })
203            .await
204            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
205        reply_rx
206            .await
207            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
208    }
209
210    pub async fn start_all_routes(&self) -> Result<(), CamelError> {
211        let (reply_tx, reply_rx) = oneshot::channel();
212        self.tx
213            .send(RouteControllerCommand::StartAllRoutes { reply: reply_tx })
214            .await
215            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
216        reply_rx
217            .await
218            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
219    }
220
221    pub async fn stop_all_routes(&self) -> Result<(), CamelError> {
222        let (reply_tx, reply_rx) = oneshot::channel();
223        self.tx
224            .send(RouteControllerCommand::StopAllRoutes { reply: reply_tx })
225            .await
226            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
227        reply_rx
228            .await
229            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
230    }
231
232    pub async fn add_route(&self, definition: RouteDefinition) -> Result<(), CamelError> {
233        let (reply_tx, reply_rx) = oneshot::channel();
234        self.tx
235            .send(RouteControllerCommand::AddRoute {
236                definition,
237                reply: reply_tx,
238            })
239            .await
240            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
241        reply_rx
242            .await
243            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
244    }
245
246    pub async fn remove_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
247        let (reply_tx, reply_rx) = oneshot::channel();
248        self.tx
249            .send(RouteControllerCommand::RemoveRoute {
250                route_id: route_id.into(),
251                reply: reply_tx,
252            })
253            .await
254            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
255        reply_rx
256            .await
257            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
258    }
259
260    pub async fn swap_pipeline(
261        &self,
262        route_id: impl Into<String>,
263        pipeline: BoxProcessor,
264    ) -> Result<(), CamelError> {
265        let (reply_tx, reply_rx) = oneshot::channel();
266        self.tx
267            .send(RouteControllerCommand::SwapPipeline {
268                route_id: route_id.into(),
269                pipeline,
270                reply: reply_tx,
271            })
272            .await
273            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
274        reply_rx
275            .await
276            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
277    }
278
279    pub async fn compile_route_definition(
280        &self,
281        definition: RouteDefinition,
282    ) -> Result<BoxProcessor, CamelError> {
283        let (reply_tx, reply_rx) = oneshot::channel();
284        self.tx
285            .send(RouteControllerCommand::CompileRouteDefinition {
286                definition,
287                reply: reply_tx,
288            })
289            .await
290            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
291        reply_rx
292            .await
293            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
294    }
295
296    pub async fn compile_route_definition_with_generation(
297        &self,
298        definition: RouteDefinition,
299        generation: u64,
300    ) -> Result<BoxProcessor, CamelError> {
301        let (reply_tx, reply_rx) = oneshot::channel();
302        self.tx
303            .send(
304                RouteControllerCommand::CompileRouteDefinitionWithGeneration {
305                    definition,
306                    generation,
307                    reply: reply_tx,
308                },
309            )
310            .await
311            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
312        reply_rx
313            .await
314            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
315    }
316
317    pub(crate) async fn prepare_route_definition_with_generation(
318        &self,
319        definition: RouteDefinition,
320        generation: u64,
321    ) -> Result<PreparedRoute, CamelError> {
322        let (reply_tx, reply_rx) = oneshot::channel();
323        self.tx
324            .send(
325                RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
326                    definition,
327                    generation,
328                    reply: reply_tx,
329                },
330            )
331            .await
332            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
333        reply_rx
334            .await
335            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
336    }
337
338    pub(crate) async fn insert_prepared_route(
339        &self,
340        prepared: PreparedRoute,
341    ) -> Result<(), CamelError> {
342        let (reply_tx, reply_rx) = oneshot::channel();
343        self.tx
344            .send(RouteControllerCommand::InsertPreparedRoute {
345                prepared,
346                reply: reply_tx,
347            })
348            .await
349            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
350        reply_rx
351            .await
352            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
353    }
354
355    pub async fn remove_route_preserving_functions(
356        &self,
357        route_id: String,
358    ) -> Result<(), CamelError> {
359        let (reply_tx, reply_rx) = oneshot::channel();
360        self.tx
361            .send(RouteControllerCommand::RemoveRoutePreservingFunctions {
362                route_id,
363                reply: reply_tx,
364            })
365            .await
366            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
367        reply_rx
368            .await
369            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
370    }
371
372    pub async fn route_from_uri(
373        &self,
374        route_id: impl Into<String>,
375    ) -> Result<Option<String>, CamelError> {
376        let (reply_tx, reply_rx) = oneshot::channel();
377        self.tx
378            .send(RouteControllerCommand::RouteFromUri {
379                route_id: route_id.into(),
380                reply: reply_tx,
381            })
382            .await
383            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
384        reply_rx
385            .await
386            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
387    }
388
389    pub async fn route_count(&self) -> Result<usize, CamelError> {
390        let (reply_tx, reply_rx) = oneshot::channel();
391        self.tx
392            .send(RouteControllerCommand::RouteCount { reply: reply_tx })
393            .await
394            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
395        reply_rx
396            .await
397            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
398    }
399
400    pub async fn in_flight_count(
401        &self,
402        route_id: impl Into<String>,
403    ) -> Result<Option<u64>, CamelError> {
404        let (reply_tx, reply_rx) = oneshot::channel();
405        self.tx
406            .send(RouteControllerCommand::InFlightCount {
407                route_id: route_id.into(),
408                reply: reply_tx,
409            })
410            .await
411            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
412        reply_rx
413            .await
414            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
415    }
416
417    pub async fn route_exists(&self, route_id: impl Into<String>) -> Result<bool, CamelError> {
418        let (reply_tx, reply_rx) = oneshot::channel();
419        self.tx
420            .send(RouteControllerCommand::RouteExists {
421                route_id: route_id.into(),
422                reply: reply_tx,
423            })
424            .await
425            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
426        reply_rx
427            .await
428            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
429    }
430
431    pub async fn route_ids(&self) -> Result<Vec<String>, CamelError> {
432        let (reply_tx, reply_rx) = oneshot::channel();
433        self.tx
434            .send(RouteControllerCommand::RouteIds { reply: reply_tx })
435            .await
436            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
437        reply_rx
438            .await
439            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
440    }
441
442    pub async fn auto_startup_route_ids(&self) -> Result<Vec<String>, CamelError> {
443        let (reply_tx, reply_rx) = oneshot::channel();
444        self.tx
445            .send(RouteControllerCommand::AutoStartupRouteIds { reply: reply_tx })
446            .await
447            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
448        reply_rx
449            .await
450            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
451    }
452
453    pub async fn shutdown_route_ids(&self) -> Result<Vec<String>, CamelError> {
454        let (reply_tx, reply_rx) = oneshot::channel();
455        self.tx
456            .send(RouteControllerCommand::ShutdownRouteIds { reply: reply_tx })
457            .await
458            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
459        reply_rx
460            .await
461            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
462    }
463
464    pub async fn start_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
465        let (reply_tx, reply_rx) = oneshot::channel();
466        self.tx
467            .send(RouteControllerCommand::StartRouteReload {
468                route_id: route_id.into(),
469                reply: reply_tx,
470            })
471            .await
472            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
473        reply_rx
474            .await
475            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
476    }
477
478    pub async fn stop_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
479        let (reply_tx, reply_rx) = oneshot::channel();
480        self.tx
481            .send(RouteControllerCommand::StopRouteReload {
482                route_id: route_id.into(),
483                reply: reply_tx,
484            })
485            .await
486            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
487        reply_rx
488            .await
489            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
490    }
491
492    pub async fn get_pipeline(
493        &self,
494        route_id: impl Into<String>,
495    ) -> Result<Option<BoxProcessor>, CamelError> {
496        let (reply_tx, reply_rx) = oneshot::channel();
497        self.tx
498            .send(RouteControllerCommand::GetPipeline {
499                route_id: route_id.into(),
500                reply: reply_tx,
501            })
502            .await
503            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
504        reply_rx
505            .await
506            .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
507    }
508
509    pub async fn set_error_handler(&self, config: ErrorHandlerConfig) -> Result<(), CamelError> {
510        self.tx
511            .send(RouteControllerCommand::SetErrorHandler { config })
512            .await
513            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
514    }
515
516    pub async fn set_tracer_config(&self, config: TracerConfig) -> Result<(), CamelError> {
517        self.tx
518            .send(RouteControllerCommand::SetTracerConfig { config })
519            .await
520            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
521    }
522
523    pub async fn set_runtime_handle(
524        &self,
525        runtime: Arc<dyn RuntimeHandle>,
526    ) -> Result<(), CamelError> {
527        self.tx
528            .send(RouteControllerCommand::SetRuntimeHandle { runtime })
529            .await
530            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
531    }
532
533    pub fn try_set_runtime_handle(
534        &self,
535        runtime: Arc<dyn RuntimeHandle>,
536    ) -> Result<(), CamelError> {
537        self.tx
538            .try_send(RouteControllerCommand::SetRuntimeHandle { runtime })
539            .map_err(|err| {
540                CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
541            })
542    }
543
544    pub async fn set_function_invoker(
545        &self,
546        invoker: Arc<dyn FunctionInvoker>,
547    ) -> Result<(), CamelError> {
548        self.tx
549            .send(RouteControllerCommand::SetFunctionInvoker { invoker })
550            .await
551            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
552    }
553
554    pub fn try_set_function_invoker(
555        &self,
556        invoker: Arc<dyn FunctionInvoker>,
557    ) -> Result<(), CamelError> {
558        self.tx
559            .try_send(RouteControllerCommand::SetFunctionInvoker { invoker })
560            .map_err(|err| {
561                CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
562            })
563    }
564
565    pub async fn route_source_hash(&self, route_id: impl Into<String>) -> Option<u64> {
566        let (reply_tx, reply_rx) = oneshot::channel();
567        self.tx
568            .send(RouteControllerCommand::RouteSourceHash {
569                route_id: route_id.into(),
570                reply: reply_tx,
571            })
572            .await
573            .ok()?;
574        reply_rx.await.ok()?
575    }
576
577    pub async fn shutdown(&self) -> Result<(), CamelError> {
578        self.tx
579            .send(RouteControllerCommand::Shutdown)
580            .await
581            .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
582    }
583}
584
585pub fn spawn_controller_actor(
586    controller: DefaultRouteController,
587) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
588    let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
589    let handle = tokio::spawn(async move {
590        let mut controller = controller;
591        while let Some(cmd) = rx.recv().await {
592            match cmd {
593                RouteControllerCommand::StartRoute { route_id, reply } => {
594                    let _ = reply.send(controller.start_route(&route_id).await);
595                }
596                RouteControllerCommand::StopRoute { route_id, reply } => {
597                    let _ = reply.send(controller.stop_route(&route_id).await);
598                }
599                RouteControllerCommand::RestartRoute { route_id, reply } => {
600                    let _ = reply.send(controller.restart_route(&route_id).await);
601                }
602                RouteControllerCommand::SuspendRoute { route_id, reply } => {
603                    let _ = reply.send(controller.suspend_route(&route_id).await);
604                }
605                RouteControllerCommand::ResumeRoute { route_id, reply } => {
606                    let _ = reply.send(controller.resume_route(&route_id).await);
607                }
608                RouteControllerCommand::StartAllRoutes { reply } => {
609                    let _ = reply.send(controller.start_all_routes().await);
610                }
611                RouteControllerCommand::StopAllRoutes { reply } => {
612                    let _ = reply.send(controller.stop_all_routes().await);
613                }
614                RouteControllerCommand::AddRoute { definition, reply } => {
615                    let _ = reply.send(controller.add_route(definition).await);
616                }
617                RouteControllerCommand::RemoveRoute { route_id, reply } => {
618                    let _ = reply.send(controller.remove_route(&route_id).await);
619                }
620                RouteControllerCommand::SwapPipeline {
621                    route_id,
622                    pipeline,
623                    reply,
624                } => {
625                    let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
626                }
627                RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
628                    let _ = reply.send(controller.compile_route_definition(definition));
629                }
630                RouteControllerCommand::CompileRouteDefinitionWithGeneration {
631                    definition,
632                    generation,
633                    reply,
634                } => {
635                    let _ = reply.send(
636                        controller.compile_route_definition_with_generation(definition, generation),
637                    );
638                }
639                RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
640                    definition,
641                    generation,
642                    reply,
643                } => {
644                    let _ = reply.send(
645                        controller.prepare_route_definition_with_generation(definition, generation),
646                    );
647                }
648                RouteControllerCommand::InsertPreparedRoute { prepared, reply } => {
649                    let _ = reply.send(controller.insert_prepared_route(prepared));
650                }
651                RouteControllerCommand::RemoveRoutePreservingFunctions { route_id, reply } => {
652                    let _ = reply.send(
653                        controller
654                            .remove_route_preserving_functions(&route_id)
655                            .await,
656                    );
657                }
658                RouteControllerCommand::RouteFromUri { route_id, reply } => {
659                    let _ = reply.send(controller.route_from_uri(&route_id));
660                }
661                RouteControllerCommand::SetErrorHandler { config } => {
662                    controller.set_error_handler(config);
663                }
664                RouteControllerCommand::SetTracerConfig { config } => {
665                    controller.set_tracer_config(&config);
666                }
667                RouteControllerCommand::RouteCount { reply } => {
668                    let _ = reply.send(controller.route_count());
669                }
670                RouteControllerCommand::InFlightCount { route_id, reply } => {
671                    let _ = reply.send(controller.in_flight_count(&route_id));
672                }
673                RouteControllerCommand::RouteExists { route_id, reply } => {
674                    let _ = reply.send(controller.route_exists(&route_id));
675                }
676                RouteControllerCommand::RouteIds { reply } => {
677                    let _ = reply.send(controller.route_ids());
678                }
679                RouteControllerCommand::AutoStartupRouteIds { reply } => {
680                    let _ = reply.send(controller.auto_startup_route_ids());
681                }
682                RouteControllerCommand::ShutdownRouteIds { reply } => {
683                    let _ = reply.send(controller.shutdown_route_ids());
684                }
685                RouteControllerCommand::GetPipeline { route_id, reply } => {
686                    let _ = reply.send(controller.get_pipeline(&route_id));
687                }
688                RouteControllerCommand::StartRouteReload { route_id, reply } => {
689                    let _ = reply.send(controller.start_route_reload(&route_id).await);
690                }
691                RouteControllerCommand::StopRouteReload { route_id, reply } => {
692                    let _ = reply.send(controller.stop_route_reload(&route_id).await);
693                }
694                RouteControllerCommand::SetRuntimeHandle { runtime } => {
695                    controller.set_runtime_handle(runtime);
696                }
697                RouteControllerCommand::SetFunctionInvoker { invoker } => {
698                    controller.set_function_invoker(invoker);
699                }
700                RouteControllerCommand::RouteSourceHash { route_id, reply } => {
701                    let _ = reply.send(controller.route_source_hash(&route_id));
702                }
703                RouteControllerCommand::Shutdown => {
704                    break;
705                }
706            }
707        }
708    });
709    (RouteControllerHandle { tx }, handle)
710}
711
712pub fn spawn_supervision_task(
713    controller: RouteControllerHandle,
714    config: SupervisionConfig,
715    _metrics: Option<Arc<dyn MetricsCollector>>,
716    mut crash_rx: mpsc::Receiver<CrashNotification>,
717) -> JoinHandle<()> {
718    tokio::spawn(async move {
719        let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
720        let mut last_restart_time: std::collections::HashMap<String, Instant> =
721            std::collections::HashMap::new();
722        let mut currently_restarting: std::collections::HashSet<String> =
723            std::collections::HashSet::new();
724
725        info!("Supervision loop started");
726
727        while let Some(notification) = crash_rx.recv().await {
728            let route_id = notification.route_id;
729            if currently_restarting.contains(&route_id) {
730                continue;
731            }
732
733            if let Some(last_time) = last_restart_time.get(&route_id)
734                && last_time.elapsed() >= config.initial_delay
735            {
736                attempts.insert(route_id.clone(), 0);
737            }
738
739            let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
740            *current_attempt += 1;
741
742            if config
743                .max_attempts
744                .is_some_and(|max| *current_attempt > max)
745            {
746                // log-policy: system-broken
747                error!(
748                    route_id = %route_id,
749                    attempts = *current_attempt,
750                    "Route exceeded max restart attempts, giving up"
751                );
752                continue;
753            }
754
755            let delay = config.next_delay(*current_attempt);
756            currently_restarting.insert(route_id.clone());
757            tokio::time::sleep(delay).await;
758
759            match controller.restart_route(route_id.clone()).await {
760                Ok(()) => {
761                    info!(route_id = %route_id, "Route restarted successfully");
762                    last_restart_time.insert(route_id.clone(), Instant::now());
763                }
764                Err(err) => {
765                    // log-policy: system-broken
766                    error!(route_id = %route_id, error = %err, "Failed to restart route");
767                }
768            }
769
770            currently_restarting.remove(&route_id);
771        }
772
773        info!("Supervision loop ended");
774    })
775}
776
777#[cfg(test)]
778mod tests {
779    use super::{
780        RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
781        spawn_supervision_task,
782    };
783    use crate::lifecycle::adapters::route_controller::{CrashNotification, DefaultRouteController};
784    use crate::lifecycle::application::route_definition::RouteDefinition;
785    use crate::shared::components::domain::Registry;
786    use crate::shared::observability::domain::TracerConfig;
787    use camel_api::function::PrepareToken;
788    use camel_api::{
789        CamelError, ErrorHandlerConfig, Exchange, ExchangePatch, FunctionDefinition, FunctionDiff,
790        FunctionId, FunctionInvocationError, FunctionInvoker, FunctionInvokerSync, RuntimeCommand,
791        RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult,
792        SupervisionConfig,
793    };
794    use std::sync::Arc;
795    use std::time::Duration;
796    use tokio::sync::mpsc;
797    use tokio::time::sleep;
798
799    fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
800        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
801        {
802            let mut guard = registry.lock().expect("lock");
803            guard.register(std::sync::Arc::new(
804                camel_component_timer::TimerComponent::new(),
805            ));
806            guard.register(std::sync::Arc::new(
807                camel_component_mock::MockComponent::new(),
808            ));
809        }
810        let controller = DefaultRouteController::new(
811            Arc::clone(&registry),
812            Arc::new(camel_api::NoopPlatformService::default()),
813        );
814        spawn_controller_actor(controller)
815    }
816
817    fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
818        let controller = DefaultRouteController::new(
819            Arc::new(std::sync::Mutex::new(Registry::new())),
820            Arc::new(camel_api::NoopPlatformService::default()),
821        );
822        spawn_controller_actor(controller)
823    }
824
825    fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
826        RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
827    }
828
829    struct NoopRuntime;
830    struct NoopInvoker;
831
832    #[async_trait::async_trait]
833    impl RuntimeCommandBus for NoopRuntime {
834        async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
835            Ok(RuntimeCommandResult::Accepted)
836        }
837    }
838
839    #[async_trait::async_trait]
840    impl RuntimeQueryBus for NoopRuntime {
841        async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
842            Ok(match query {
843                RuntimeQuery::GetRouteStatus { route_id }
844                | RuntimeQuery::InFlightCount { route_id } => {
845                    RuntimeQueryResult::RouteNotFound { route_id }
846                }
847                RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
848                    route_ids: Vec::new(),
849                },
850            })
851        }
852    }
853
854    impl FunctionInvokerSync for NoopInvoker {
855        fn stage_pending(
856            &self,
857            _def: FunctionDefinition,
858            _route_id: Option<&str>,
859            _generation: u64,
860        ) {
861        }
862        fn discard_staging(&self, _generation: u64) {}
863        fn begin_reload(&self) -> u64 {
864            1
865        }
866        fn function_refs_for_route(&self, _route_id: &str) -> Vec<(FunctionId, Option<String>)> {
867            vec![]
868        }
869        fn staged_refs_for_route(
870            &self,
871            _route_id: &str,
872            _generation: u64,
873        ) -> Vec<(FunctionId, Option<String>)> {
874            vec![]
875        }
876        fn staged_defs_for_route(
877            &self,
878            _route_id: &str,
879            _generation: u64,
880        ) -> Vec<(FunctionDefinition, Option<String>)> {
881            vec![]
882        }
883    }
884
885    #[async_trait::async_trait]
886    impl FunctionInvoker for NoopInvoker {
887        async fn register(
888            &self,
889            _def: FunctionDefinition,
890            _route_id: Option<&str>,
891        ) -> Result<(), FunctionInvocationError> {
892            Ok(())
893        }
894        async fn unregister(
895            &self,
896            _id: &FunctionId,
897            _route_id: Option<&str>,
898        ) -> Result<(), FunctionInvocationError> {
899            Ok(())
900        }
901        async fn invoke(
902            &self,
903            _id: &FunctionId,
904            _exchange: &Exchange,
905        ) -> Result<ExchangePatch, FunctionInvocationError> {
906            Ok(ExchangePatch::default())
907        }
908        async fn prepare_reload(
909            &self,
910            _diff: FunctionDiff,
911            _generation: u64,
912        ) -> Result<PrepareToken, FunctionInvocationError> {
913            Ok(PrepareToken::default())
914        }
915        async fn finalize_reload(
916            &self,
917            _diff: &FunctionDiff,
918            _generation: u64,
919        ) -> Result<(), FunctionInvocationError> {
920            Ok(())
921        }
922        async fn rollback_reload(
923            &self,
924            _token: PrepareToken,
925            _generation: u64,
926        ) -> Result<(), FunctionInvocationError> {
927            Ok(())
928        }
929        async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
930            Ok(())
931        }
932    }
933
934    #[tokio::test]
935    async fn start_route_sends_command_and_returns_reply() {
936        let (tx, mut rx) = mpsc::channel(1);
937        let handle = RouteControllerHandle { tx };
938
939        let task = tokio::spawn(async move { handle.start_route("route-a").await });
940
941        let command = rx.recv().await.expect("command should be received");
942        match command {
943            RouteControllerCommand::StartRoute { route_id, reply } => {
944                assert_eq!(route_id, "route-a");
945                let _ = reply.send(Ok(()));
946            }
947            _ => panic!("unexpected command variant"),
948        }
949
950        let result = task.await.expect("join should succeed");
951        assert!(result.is_ok());
952    }
953
954    #[tokio::test]
955    async fn start_route_returns_error_when_actor_stops() {
956        let (tx, rx) = mpsc::channel(1);
957        drop(rx);
958
959        let handle = RouteControllerHandle { tx };
960        let result = handle.start_route("route-a").await;
961
962        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
963    }
964
965    #[tokio::test]
966    async fn spawn_controller_actor_processes_commands_and_shutdown() {
967        let controller = DefaultRouteController::new(
968            Arc::new(std::sync::Mutex::new(Registry::new())),
969            Arc::new(camel_api::NoopPlatformService::default()),
970        );
971        let (handle, join_handle) = spawn_controller_actor(controller);
972
973        assert_eq!(handle.route_count().await.expect("route_count"), 0);
974        assert_eq!(
975            handle.route_ids().await.expect("route_ids"),
976            Vec::<String>::new()
977        );
978
979        handle.shutdown().await.expect("shutdown send");
980        join_handle.await.expect("actor join");
981    }
982
983    #[tokio::test]
984    async fn actor_handle_introspection_and_mutation_commands() {
985        let (handle, join_handle) = build_actor_with_components();
986        let definition = route_def("h-1", "timer:tick?period=100");
987
988        handle.add_route(definition).await.expect("add route");
989        assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
990        assert!(
991            !handle
992                .route_exists("no-such")
993                .await
994                .expect("route exists no-such")
995        );
996
997        let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
998        assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
999        assert_eq!(handle.route_count().await.expect("route_count"), 1);
1000
1001        let auto_ids = handle
1002            .auto_startup_route_ids()
1003            .await
1004            .expect("auto_startup_route_ids");
1005        assert!(auto_ids.iter().any(|id| id == "h-1"));
1006
1007        let shutdown_ids = handle
1008            .shutdown_route_ids()
1009            .await
1010            .expect("shutdown_route_ids");
1011        assert!(shutdown_ids.iter().any(|id| id == "h-1"));
1012
1013        let compiled = handle
1014            .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
1015            .await
1016            .expect("compile_route_definition");
1017
1018        assert!(
1019            handle
1020                .get_pipeline("h-1")
1021                .await
1022                .expect("get_pipeline")
1023                .is_some()
1024        );
1025        handle
1026            .swap_pipeline("h-1", compiled)
1027            .await
1028            .expect("swap_pipeline");
1029
1030        let _ = handle
1031            .in_flight_count("h-1")
1032            .await
1033            .expect("in_flight_count");
1034        let _ = handle.route_source_hash("h-1").await;
1035
1036        handle
1037            .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
1038            .await
1039            .expect("set_error_handler");
1040        handle
1041            .set_tracer_config(TracerConfig::default())
1042            .await
1043            .expect("set_tracer_config");
1044        handle
1045            .set_runtime_handle(Arc::new(NoopRuntime))
1046            .await
1047            .expect("set_runtime_handle");
1048
1049        handle.remove_route("h-1").await.expect("remove_route");
1050        assert_eq!(
1051            handle
1052                .route_count()
1053                .await
1054                .expect("route_count after remove"),
1055            0
1056        );
1057        handle
1058            .stop_all_routes()
1059            .await
1060            .expect("stop_all_routes on empty");
1061
1062        handle.shutdown().await.expect("shutdown send");
1063        join_handle.await.expect("actor join");
1064    }
1065
1066    #[tokio::test]
1067    async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
1068        let (handle, join_handle) = build_actor_with_components();
1069        handle
1070            .add_route(route_def("lc-1", "timer:tick?period=50"))
1071            .await
1072            .expect("add route lc-1");
1073
1074        handle.start_route("lc-1").await.expect("start_route");
1075        sleep(Duration::from_millis(20)).await;
1076
1077        handle.restart_route("lc-1").await.expect("restart_route");
1078        sleep(Duration::from_millis(20)).await;
1079
1080        handle.suspend_route("lc-1").await.expect("suspend_route");
1081        handle.resume_route("lc-1").await.expect("resume_route");
1082        sleep(Duration::from_millis(20)).await;
1083
1084        handle.stop_route("lc-1").await.expect("stop_route");
1085        handle.start_all_routes().await.expect("start_all_routes");
1086        sleep(Duration::from_millis(20)).await;
1087        handle.stop_all_routes().await.expect("stop_all_routes");
1088
1089        handle
1090            .start_route_reload("lc-1")
1091            .await
1092            .expect("start_route_reload");
1093        handle
1094            .stop_route_reload("lc-1")
1095            .await
1096            .expect("stop_route_reload");
1097
1098        handle.shutdown().await.expect("shutdown send");
1099        join_handle.await.expect("actor join");
1100    }
1101
1102    #[tokio::test]
1103    async fn spawn_supervision_restarts_route_on_crash() {
1104        let (handle, join_handle) = build_actor_with_components();
1105        handle
1106            .add_route(route_def("sup-1", "timer:tick?period=100"))
1107            .await
1108            .expect("add route sup-1");
1109        handle
1110            .start_route("sup-1")
1111            .await
1112            .expect("start_route sup-1");
1113
1114        let (crash_tx, crash_rx) = mpsc::channel(8);
1115        let supervision = spawn_supervision_task(
1116            handle.clone(),
1117            SupervisionConfig {
1118                initial_delay: Duration::from_millis(10),
1119                max_attempts: Some(2),
1120                ..SupervisionConfig::default()
1121            },
1122            None,
1123            crash_rx,
1124        );
1125
1126        crash_tx
1127            .send(CrashNotification {
1128                route_id: "sup-1".to_string(),
1129                error: "simulated".to_string(),
1130            })
1131            .await
1132            .expect("send crash notification");
1133
1134        sleep(Duration::from_millis(150)).await;
1135        drop(crash_tx);
1136        supervision.await.expect("supervision join");
1137
1138        handle.shutdown().await.expect("shutdown send");
1139        join_handle.await.expect("actor join");
1140    }
1141
1142    #[tokio::test]
1143    async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
1144        let (handle, join_handle) = build_actor_with_components();
1145        handle
1146            .add_route(route_def("sup-2", "timer:tick?period=100"))
1147            .await
1148            .expect("add route sup-2");
1149        handle
1150            .start_route("sup-2")
1151            .await
1152            .expect("start_route sup-2");
1153
1154        let (crash_tx, crash_rx) = mpsc::channel(8);
1155        let supervision = spawn_supervision_task(
1156            handle.clone(),
1157            SupervisionConfig {
1158                initial_delay: Duration::from_millis(10),
1159                max_attempts: Some(1),
1160                ..SupervisionConfig::default()
1161            },
1162            None,
1163            crash_rx,
1164        );
1165
1166        crash_tx
1167            .send(CrashNotification {
1168                route_id: "sup-2".to_string(),
1169                error: "attempt-1".to_string(),
1170            })
1171            .await
1172            .expect("send crash attempt-1");
1173        crash_tx
1174            .send(CrashNotification {
1175                route_id: "sup-2".to_string(),
1176                error: "attempt-2".to_string(),
1177            })
1178            .await
1179            .expect("send crash attempt-2");
1180
1181        sleep(Duration::from_millis(200)).await;
1182        drop(crash_tx);
1183        supervision.await.expect("supervision join");
1184
1185        handle.shutdown().await.expect("shutdown send");
1186        join_handle.await.expect("actor join");
1187    }
1188
1189    #[tokio::test]
1190    async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
1191        let (handle, join_handle) = build_empty_actor();
1192
1193        handle
1194            .try_set_runtime_handle(Arc::new(NoopRuntime))
1195            .expect("try_set_runtime_handle should succeed");
1196
1197        handle.shutdown().await.expect("shutdown send");
1198        join_handle.await.expect("actor join");
1199    }
1200
1201    #[tokio::test]
1202    async fn shutdown_returns_error_when_actor_stopped() {
1203        let (tx, rx) = mpsc::channel(1);
1204        drop(rx);
1205
1206        let handle = RouteControllerHandle { tx };
1207        let result = handle.shutdown().await;
1208
1209        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
1210    }
1211
1212    #[tokio::test]
1213    async fn handle_methods_send_expected_commands_and_receive_replies() {
1214        let (tx, mut rx) = mpsc::channel(16);
1215        let handle = RouteControllerHandle { tx };
1216
1217        let stop_task = tokio::spawn({
1218            let h = handle.clone();
1219            async move { h.stop_route("r-1").await }
1220        });
1221        let cmd = rx.recv().await.expect("stop command");
1222        match cmd {
1223            RouteControllerCommand::StopRoute { route_id, reply } => {
1224                assert_eq!(route_id, "r-1");
1225                let _ = reply.send(Ok(()));
1226            }
1227            _ => panic!("unexpected command"),
1228        }
1229        assert!(stop_task.await.expect("join").is_ok());
1230
1231        let exists_task = tokio::spawn({
1232            let h = handle.clone();
1233            async move { h.route_exists("r-2").await }
1234        });
1235        let cmd = rx.recv().await.expect("exists command");
1236        match cmd {
1237            RouteControllerCommand::RouteExists { route_id, reply } => {
1238                assert_eq!(route_id, "r-2");
1239                let _ = reply.send(true);
1240            }
1241            _ => panic!("unexpected command"),
1242        }
1243        assert!(exists_task.await.expect("join").expect("ok"));
1244
1245        let hash_task = tokio::spawn({
1246            let h = handle.clone();
1247            async move { h.route_source_hash("r-3").await }
1248        });
1249        let cmd = rx.recv().await.expect("hash command");
1250        match cmd {
1251            RouteControllerCommand::RouteSourceHash { route_id, reply } => {
1252                assert_eq!(route_id, "r-3");
1253                let _ = reply.send(Some(77));
1254            }
1255            _ => panic!("unexpected command"),
1256        }
1257        assert_eq!(hash_task.await.expect("join"), Some(77));
1258    }
1259
1260    #[tokio::test]
1261    async fn handle_methods_error_on_dropped_reply_channel() {
1262        let (tx, mut rx) = mpsc::channel(16);
1263        let handle = RouteControllerHandle { tx };
1264
1265        let count_task = tokio::spawn({
1266            let h = handle.clone();
1267            async move { h.route_count().await }
1268        });
1269        let cmd = rx.recv().await.expect("route_count command");
1270        match cmd {
1271            RouteControllerCommand::RouteCount { reply } => drop(reply),
1272            _ => panic!("unexpected command"),
1273        }
1274        assert!(matches!(
1275            count_task.await.expect("join"),
1276            Err(CamelError::ProcessorError(_))
1277        ));
1278
1279        let stop_task = tokio::spawn({
1280            let h = handle.clone();
1281            async move { h.stop_route("x").await }
1282        });
1283        let cmd = rx.recv().await.expect("stop command");
1284        match cmd {
1285            RouteControllerCommand::StopRoute { reply, .. } => drop(reply),
1286            _ => panic!("unexpected command"),
1287        }
1288        assert!(matches!(
1289            stop_task.await.expect("join"),
1290            Err(CamelError::ProcessorError(_))
1291        ));
1292
1293        let maybe_hash = tokio::spawn({
1294            let h = handle.clone();
1295            async move { h.route_source_hash("x").await }
1296        });
1297        let cmd = rx.recv().await.expect("hash command");
1298        match cmd {
1299            RouteControllerCommand::RouteSourceHash { reply, .. } => drop(reply),
1300            _ => panic!("unexpected command"),
1301        }
1302        assert_eq!(maybe_hash.await.expect("join"), None);
1303    }
1304
1305    #[test]
1306    fn try_set_function_invoker_returns_mailbox_full() {
1307        let (tx, mut rx) = mpsc::channel(1);
1308        tx.try_send(RouteControllerCommand::Shutdown)
1309            .expect("fill mailbox");
1310        let handle = RouteControllerHandle { tx };
1311
1312        let result = handle.try_set_function_invoker(Arc::new(NoopInvoker));
1313        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
1314
1315        rx.try_recv().expect("mailbox still has first message");
1316    }
1317}