1use std::sync::Arc;
2use std::time::Instant;
3
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::{
6 BoxProcessor, CamelError, FunctionInvoker, MetricsCollector, RouteController, RuntimeHandle,
7 SupervisionConfig,
8};
9use tokio::sync::{mpsc, oneshot};
10use tokio::task::JoinHandle;
11use tracing::{error, info};
12
13use super::route_controller::{CrashNotification, DefaultRouteController, PreparedRoute};
14use crate::lifecycle::application::route_definition::RouteDefinition;
15use crate::shared::observability::domain::TracerConfig;
16
17pub(crate) enum RouteControllerCommand {
18 StartRoute {
19 route_id: String,
20 reply: oneshot::Sender<Result<(), CamelError>>,
21 },
22 StopRoute {
23 route_id: String,
24 reply: oneshot::Sender<Result<(), CamelError>>,
25 },
26 RestartRoute {
27 route_id: String,
28 reply: oneshot::Sender<Result<(), CamelError>>,
29 },
30 SuspendRoute {
31 route_id: String,
32 reply: oneshot::Sender<Result<(), CamelError>>,
33 },
34 ResumeRoute {
35 route_id: String,
36 reply: oneshot::Sender<Result<(), CamelError>>,
37 },
38 StartAllRoutes {
39 reply: oneshot::Sender<Result<(), CamelError>>,
40 },
41 StopAllRoutes {
42 reply: oneshot::Sender<Result<(), CamelError>>,
43 },
44 AddRoute {
45 definition: RouteDefinition,
46 reply: oneshot::Sender<Result<(), CamelError>>,
47 },
48 RemoveRoute {
49 route_id: String,
50 reply: oneshot::Sender<Result<(), CamelError>>,
51 },
52 SwapPipeline {
53 route_id: String,
54 pipeline: BoxProcessor,
55 reply: oneshot::Sender<Result<(), CamelError>>,
56 },
57 CompileRouteDefinition {
58 definition: RouteDefinition,
59 reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
60 },
61 CompileRouteDefinitionWithGeneration {
62 definition: RouteDefinition,
63 generation: u64,
64 reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
65 },
66 PrepareRouteDefinitionWithGeneration {
67 definition: RouteDefinition,
68 generation: u64,
69 reply: oneshot::Sender<Result<PreparedRoute, CamelError>>,
70 },
71 InsertPreparedRoute {
72 prepared: PreparedRoute,
73 reply: oneshot::Sender<Result<(), CamelError>>,
74 },
75 RemoveRoutePreservingFunctions {
76 route_id: String,
77 reply: oneshot::Sender<Result<(), CamelError>>,
78 },
79 RouteFromUri {
80 route_id: String,
81 reply: oneshot::Sender<Option<String>>,
82 },
83 SetErrorHandler {
84 config: ErrorHandlerConfig,
85 },
86 SetTracerConfig {
87 config: TracerConfig,
88 },
89 RouteCount {
90 reply: oneshot::Sender<usize>,
91 },
92 InFlightCount {
93 route_id: String,
94 reply: oneshot::Sender<Option<u64>>,
95 },
96 RouteExists {
97 route_id: String,
98 reply: oneshot::Sender<bool>,
99 },
100 RouteIds {
101 reply: oneshot::Sender<Vec<String>>,
102 },
103 AutoStartupRouteIds {
104 reply: oneshot::Sender<Vec<String>>,
105 },
106 ShutdownRouteIds {
107 reply: oneshot::Sender<Vec<String>>,
108 },
109 GetPipeline {
110 route_id: String,
111 reply: oneshot::Sender<Option<BoxProcessor>>,
112 },
113 StartRouteReload {
114 route_id: String,
115 reply: oneshot::Sender<Result<(), CamelError>>,
116 },
117 StopRouteReload {
118 route_id: String,
119 reply: oneshot::Sender<Result<(), CamelError>>,
120 },
121 SetRuntimeHandle {
122 runtime: Arc<dyn RuntimeHandle>,
123 },
124 SetFunctionInvoker {
125 invoker: Arc<dyn FunctionInvoker>,
126 },
127 RouteSourceHash {
128 route_id: String,
129 reply: oneshot::Sender<Option<u64>>,
130 },
131 Shutdown,
132}
133
134#[derive(Clone)]
135pub struct RouteControllerHandle {
136 tx: mpsc::Sender<RouteControllerCommand>,
137}
138
139impl RouteControllerHandle {
140 pub async fn start_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
141 let (reply_tx, reply_rx) = oneshot::channel();
142 self.tx
143 .send(RouteControllerCommand::StartRoute {
144 route_id: route_id.into(),
145 reply: reply_tx,
146 })
147 .await
148 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
149 reply_rx
150 .await
151 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
152 }
153
154 pub async fn stop_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
155 let (reply_tx, reply_rx) = oneshot::channel();
156 self.tx
157 .send(RouteControllerCommand::StopRoute {
158 route_id: route_id.into(),
159 reply: reply_tx,
160 })
161 .await
162 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
163 reply_rx
164 .await
165 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
166 }
167
168 pub async fn restart_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
169 let (reply_tx, reply_rx) = oneshot::channel();
170 self.tx
171 .send(RouteControllerCommand::RestartRoute {
172 route_id: route_id.into(),
173 reply: reply_tx,
174 })
175 .await
176 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
177 reply_rx
178 .await
179 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
180 }
181
182 pub async fn suspend_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
183 let (reply_tx, reply_rx) = oneshot::channel();
184 self.tx
185 .send(RouteControllerCommand::SuspendRoute {
186 route_id: route_id.into(),
187 reply: reply_tx,
188 })
189 .await
190 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
191 reply_rx
192 .await
193 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
194 }
195
196 pub async fn resume_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
197 let (reply_tx, reply_rx) = oneshot::channel();
198 self.tx
199 .send(RouteControllerCommand::ResumeRoute {
200 route_id: route_id.into(),
201 reply: reply_tx,
202 })
203 .await
204 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
205 reply_rx
206 .await
207 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
208 }
209
210 pub async fn start_all_routes(&self) -> Result<(), CamelError> {
211 let (reply_tx, reply_rx) = oneshot::channel();
212 self.tx
213 .send(RouteControllerCommand::StartAllRoutes { reply: reply_tx })
214 .await
215 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
216 reply_rx
217 .await
218 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
219 }
220
221 pub async fn stop_all_routes(&self) -> Result<(), CamelError> {
222 let (reply_tx, reply_rx) = oneshot::channel();
223 self.tx
224 .send(RouteControllerCommand::StopAllRoutes { reply: reply_tx })
225 .await
226 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
227 reply_rx
228 .await
229 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
230 }
231
232 pub async fn add_route(&self, definition: RouteDefinition) -> Result<(), CamelError> {
233 let (reply_tx, reply_rx) = oneshot::channel();
234 self.tx
235 .send(RouteControllerCommand::AddRoute {
236 definition,
237 reply: reply_tx,
238 })
239 .await
240 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
241 reply_rx
242 .await
243 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
244 }
245
246 pub async fn remove_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
247 let (reply_tx, reply_rx) = oneshot::channel();
248 self.tx
249 .send(RouteControllerCommand::RemoveRoute {
250 route_id: route_id.into(),
251 reply: reply_tx,
252 })
253 .await
254 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
255 reply_rx
256 .await
257 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
258 }
259
260 pub async fn swap_pipeline(
261 &self,
262 route_id: impl Into<String>,
263 pipeline: BoxProcessor,
264 ) -> Result<(), CamelError> {
265 let (reply_tx, reply_rx) = oneshot::channel();
266 self.tx
267 .send(RouteControllerCommand::SwapPipeline {
268 route_id: route_id.into(),
269 pipeline,
270 reply: reply_tx,
271 })
272 .await
273 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
274 reply_rx
275 .await
276 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
277 }
278
279 pub async fn compile_route_definition(
280 &self,
281 definition: RouteDefinition,
282 ) -> Result<BoxProcessor, CamelError> {
283 let (reply_tx, reply_rx) = oneshot::channel();
284 self.tx
285 .send(RouteControllerCommand::CompileRouteDefinition {
286 definition,
287 reply: reply_tx,
288 })
289 .await
290 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
291 reply_rx
292 .await
293 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
294 }
295
296 pub async fn compile_route_definition_with_generation(
297 &self,
298 definition: RouteDefinition,
299 generation: u64,
300 ) -> Result<BoxProcessor, CamelError> {
301 let (reply_tx, reply_rx) = oneshot::channel();
302 self.tx
303 .send(
304 RouteControllerCommand::CompileRouteDefinitionWithGeneration {
305 definition,
306 generation,
307 reply: reply_tx,
308 },
309 )
310 .await
311 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
312 reply_rx
313 .await
314 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
315 }
316
317 pub(crate) async fn prepare_route_definition_with_generation(
318 &self,
319 definition: RouteDefinition,
320 generation: u64,
321 ) -> Result<PreparedRoute, CamelError> {
322 let (reply_tx, reply_rx) = oneshot::channel();
323 self.tx
324 .send(
325 RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
326 definition,
327 generation,
328 reply: reply_tx,
329 },
330 )
331 .await
332 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
333 reply_rx
334 .await
335 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
336 }
337
338 pub(crate) async fn insert_prepared_route(
339 &self,
340 prepared: PreparedRoute,
341 ) -> Result<(), CamelError> {
342 let (reply_tx, reply_rx) = oneshot::channel();
343 self.tx
344 .send(RouteControllerCommand::InsertPreparedRoute {
345 prepared,
346 reply: reply_tx,
347 })
348 .await
349 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
350 reply_rx
351 .await
352 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
353 }
354
355 pub async fn remove_route_preserving_functions(
356 &self,
357 route_id: String,
358 ) -> Result<(), CamelError> {
359 let (reply_tx, reply_rx) = oneshot::channel();
360 self.tx
361 .send(RouteControllerCommand::RemoveRoutePreservingFunctions {
362 route_id,
363 reply: reply_tx,
364 })
365 .await
366 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
367 reply_rx
368 .await
369 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
370 }
371
372 pub async fn route_from_uri(
373 &self,
374 route_id: impl Into<String>,
375 ) -> Result<Option<String>, CamelError> {
376 let (reply_tx, reply_rx) = oneshot::channel();
377 self.tx
378 .send(RouteControllerCommand::RouteFromUri {
379 route_id: route_id.into(),
380 reply: reply_tx,
381 })
382 .await
383 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
384 reply_rx
385 .await
386 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
387 }
388
389 pub async fn route_count(&self) -> Result<usize, CamelError> {
390 let (reply_tx, reply_rx) = oneshot::channel();
391 self.tx
392 .send(RouteControllerCommand::RouteCount { reply: reply_tx })
393 .await
394 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
395 reply_rx
396 .await
397 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
398 }
399
400 pub async fn in_flight_count(
401 &self,
402 route_id: impl Into<String>,
403 ) -> Result<Option<u64>, CamelError> {
404 let (reply_tx, reply_rx) = oneshot::channel();
405 self.tx
406 .send(RouteControllerCommand::InFlightCount {
407 route_id: route_id.into(),
408 reply: reply_tx,
409 })
410 .await
411 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
412 reply_rx
413 .await
414 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
415 }
416
417 pub async fn route_exists(&self, route_id: impl Into<String>) -> Result<bool, CamelError> {
418 let (reply_tx, reply_rx) = oneshot::channel();
419 self.tx
420 .send(RouteControllerCommand::RouteExists {
421 route_id: route_id.into(),
422 reply: reply_tx,
423 })
424 .await
425 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
426 reply_rx
427 .await
428 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
429 }
430
431 pub async fn route_ids(&self) -> Result<Vec<String>, CamelError> {
432 let (reply_tx, reply_rx) = oneshot::channel();
433 self.tx
434 .send(RouteControllerCommand::RouteIds { reply: reply_tx })
435 .await
436 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
437 reply_rx
438 .await
439 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
440 }
441
442 pub async fn auto_startup_route_ids(&self) -> Result<Vec<String>, CamelError> {
443 let (reply_tx, reply_rx) = oneshot::channel();
444 self.tx
445 .send(RouteControllerCommand::AutoStartupRouteIds { reply: reply_tx })
446 .await
447 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
448 reply_rx
449 .await
450 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
451 }
452
453 pub async fn shutdown_route_ids(&self) -> Result<Vec<String>, CamelError> {
454 let (reply_tx, reply_rx) = oneshot::channel();
455 self.tx
456 .send(RouteControllerCommand::ShutdownRouteIds { reply: reply_tx })
457 .await
458 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
459 reply_rx
460 .await
461 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
462 }
463
464 pub async fn start_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
465 let (reply_tx, reply_rx) = oneshot::channel();
466 self.tx
467 .send(RouteControllerCommand::StartRouteReload {
468 route_id: route_id.into(),
469 reply: reply_tx,
470 })
471 .await
472 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
473 reply_rx
474 .await
475 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
476 }
477
478 pub async fn stop_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
479 let (reply_tx, reply_rx) = oneshot::channel();
480 self.tx
481 .send(RouteControllerCommand::StopRouteReload {
482 route_id: route_id.into(),
483 reply: reply_tx,
484 })
485 .await
486 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
487 reply_rx
488 .await
489 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
490 }
491
492 pub async fn get_pipeline(
493 &self,
494 route_id: impl Into<String>,
495 ) -> Result<Option<BoxProcessor>, CamelError> {
496 let (reply_tx, reply_rx) = oneshot::channel();
497 self.tx
498 .send(RouteControllerCommand::GetPipeline {
499 route_id: route_id.into(),
500 reply: reply_tx,
501 })
502 .await
503 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
504 reply_rx
505 .await
506 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
507 }
508
509 pub async fn set_error_handler(&self, config: ErrorHandlerConfig) -> Result<(), CamelError> {
510 self.tx
511 .send(RouteControllerCommand::SetErrorHandler { config })
512 .await
513 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
514 }
515
516 pub async fn set_tracer_config(&self, config: TracerConfig) -> Result<(), CamelError> {
517 self.tx
518 .send(RouteControllerCommand::SetTracerConfig { config })
519 .await
520 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
521 }
522
523 pub async fn set_runtime_handle(
524 &self,
525 runtime: Arc<dyn RuntimeHandle>,
526 ) -> Result<(), CamelError> {
527 self.tx
528 .send(RouteControllerCommand::SetRuntimeHandle { runtime })
529 .await
530 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
531 }
532
533 pub fn try_set_runtime_handle(
534 &self,
535 runtime: Arc<dyn RuntimeHandle>,
536 ) -> Result<(), CamelError> {
537 self.tx
538 .try_send(RouteControllerCommand::SetRuntimeHandle { runtime })
539 .map_err(|err| {
540 CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
541 })
542 }
543
544 pub async fn set_function_invoker(
545 &self,
546 invoker: Arc<dyn FunctionInvoker>,
547 ) -> Result<(), CamelError> {
548 self.tx
549 .send(RouteControllerCommand::SetFunctionInvoker { invoker })
550 .await
551 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
552 }
553
554 pub fn try_set_function_invoker(
555 &self,
556 invoker: Arc<dyn FunctionInvoker>,
557 ) -> Result<(), CamelError> {
558 self.tx
559 .try_send(RouteControllerCommand::SetFunctionInvoker { invoker })
560 .map_err(|err| {
561 CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
562 })
563 }
564
565 pub async fn route_source_hash(&self, route_id: impl Into<String>) -> Option<u64> {
566 let (reply_tx, reply_rx) = oneshot::channel();
567 self.tx
568 .send(RouteControllerCommand::RouteSourceHash {
569 route_id: route_id.into(),
570 reply: reply_tx,
571 })
572 .await
573 .ok()?;
574 reply_rx.await.ok()?
575 }
576
577 pub async fn shutdown(&self) -> Result<(), CamelError> {
578 self.tx
579 .send(RouteControllerCommand::Shutdown)
580 .await
581 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
582 }
583}
584
585pub fn spawn_controller_actor(
586 controller: DefaultRouteController,
587) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
588 let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
589 let handle = tokio::spawn(async move {
590 let mut controller = controller;
591 while let Some(cmd) = rx.recv().await {
592 match cmd {
593 RouteControllerCommand::StartRoute { route_id, reply } => {
594 let _ = reply.send(controller.start_route(&route_id).await);
595 }
596 RouteControllerCommand::StopRoute { route_id, reply } => {
597 let _ = reply.send(controller.stop_route(&route_id).await);
598 }
599 RouteControllerCommand::RestartRoute { route_id, reply } => {
600 let _ = reply.send(controller.restart_route(&route_id).await);
601 }
602 RouteControllerCommand::SuspendRoute { route_id, reply } => {
603 let _ = reply.send(controller.suspend_route(&route_id).await);
604 }
605 RouteControllerCommand::ResumeRoute { route_id, reply } => {
606 let _ = reply.send(controller.resume_route(&route_id).await);
607 }
608 RouteControllerCommand::StartAllRoutes { reply } => {
609 let _ = reply.send(controller.start_all_routes().await);
610 }
611 RouteControllerCommand::StopAllRoutes { reply } => {
612 let _ = reply.send(controller.stop_all_routes().await);
613 }
614 RouteControllerCommand::AddRoute { definition, reply } => {
615 let _ = reply.send(controller.add_route(definition).await);
616 }
617 RouteControllerCommand::RemoveRoute { route_id, reply } => {
618 let _ = reply.send(controller.remove_route(&route_id).await);
619 }
620 RouteControllerCommand::SwapPipeline {
621 route_id,
622 pipeline,
623 reply,
624 } => {
625 let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
626 }
627 RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
628 let _ = reply.send(controller.compile_route_definition(definition));
629 }
630 RouteControllerCommand::CompileRouteDefinitionWithGeneration {
631 definition,
632 generation,
633 reply,
634 } => {
635 let _ = reply.send(
636 controller.compile_route_definition_with_generation(definition, generation),
637 );
638 }
639 RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
640 definition,
641 generation,
642 reply,
643 } => {
644 let _ = reply.send(
645 controller.prepare_route_definition_with_generation(definition, generation),
646 );
647 }
648 RouteControllerCommand::InsertPreparedRoute { prepared, reply } => {
649 let _ = reply.send(controller.insert_prepared_route(prepared));
650 }
651 RouteControllerCommand::RemoveRoutePreservingFunctions { route_id, reply } => {
652 let _ = reply.send(
653 controller
654 .remove_route_preserving_functions(&route_id)
655 .await,
656 );
657 }
658 RouteControllerCommand::RouteFromUri { route_id, reply } => {
659 let _ = reply.send(controller.route_from_uri(&route_id));
660 }
661 RouteControllerCommand::SetErrorHandler { config } => {
662 controller.set_error_handler(config);
663 }
664 RouteControllerCommand::SetTracerConfig { config } => {
665 controller.set_tracer_config(&config);
666 }
667 RouteControllerCommand::RouteCount { reply } => {
668 let _ = reply.send(controller.route_count());
669 }
670 RouteControllerCommand::InFlightCount { route_id, reply } => {
671 let _ = reply.send(controller.in_flight_count(&route_id));
672 }
673 RouteControllerCommand::RouteExists { route_id, reply } => {
674 let _ = reply.send(controller.route_exists(&route_id));
675 }
676 RouteControllerCommand::RouteIds { reply } => {
677 let _ = reply.send(controller.route_ids());
678 }
679 RouteControllerCommand::AutoStartupRouteIds { reply } => {
680 let _ = reply.send(controller.auto_startup_route_ids());
681 }
682 RouteControllerCommand::ShutdownRouteIds { reply } => {
683 let _ = reply.send(controller.shutdown_route_ids());
684 }
685 RouteControllerCommand::GetPipeline { route_id, reply } => {
686 let _ = reply.send(controller.get_pipeline(&route_id));
687 }
688 RouteControllerCommand::StartRouteReload { route_id, reply } => {
689 let _ = reply.send(controller.start_route_reload(&route_id).await);
690 }
691 RouteControllerCommand::StopRouteReload { route_id, reply } => {
692 let _ = reply.send(controller.stop_route_reload(&route_id).await);
693 }
694 RouteControllerCommand::SetRuntimeHandle { runtime } => {
695 controller.set_runtime_handle(runtime);
696 }
697 RouteControllerCommand::SetFunctionInvoker { invoker } => {
698 controller.set_function_invoker(invoker);
699 }
700 RouteControllerCommand::RouteSourceHash { route_id, reply } => {
701 let _ = reply.send(controller.route_source_hash(&route_id));
702 }
703 RouteControllerCommand::Shutdown => {
704 break;
705 }
706 }
707 }
708 });
709 (RouteControllerHandle { tx }, handle)
710}
711
712pub fn spawn_supervision_task(
713 controller: RouteControllerHandle,
714 config: SupervisionConfig,
715 _metrics: Option<Arc<dyn MetricsCollector>>,
716 mut crash_rx: mpsc::Receiver<CrashNotification>,
717) -> JoinHandle<()> {
718 tokio::spawn(async move {
719 let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
720 let mut last_restart_time: std::collections::HashMap<String, Instant> =
721 std::collections::HashMap::new();
722 let mut currently_restarting: std::collections::HashSet<String> =
723 std::collections::HashSet::new();
724
725 info!("Supervision loop started");
726
727 while let Some(notification) = crash_rx.recv().await {
728 let route_id = notification.route_id;
729 if currently_restarting.contains(&route_id) {
730 continue;
731 }
732
733 if let Some(last_time) = last_restart_time.get(&route_id)
734 && last_time.elapsed() >= config.initial_delay
735 {
736 attempts.insert(route_id.clone(), 0);
737 }
738
739 let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
740 *current_attempt += 1;
741
742 if config
743 .max_attempts
744 .is_some_and(|max| *current_attempt > max)
745 {
746 error!(
747 route_id = %route_id,
748 attempts = *current_attempt,
749 "Route exceeded max restart attempts, giving up"
750 );
751 continue;
752 }
753
754 let delay = config.next_delay(*current_attempt);
755 currently_restarting.insert(route_id.clone());
756 tokio::time::sleep(delay).await;
757
758 match controller.restart_route(route_id.clone()).await {
759 Ok(()) => {
760 info!(route_id = %route_id, "Route restarted successfully");
761 last_restart_time.insert(route_id.clone(), Instant::now());
762 }
763 Err(err) => {
764 error!(route_id = %route_id, error = %err, "Failed to restart route");
765 }
766 }
767
768 currently_restarting.remove(&route_id);
769 }
770
771 info!("Supervision loop ended");
772 })
773}
774
775#[cfg(test)]
776mod tests {
777 use super::{
778 RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
779 spawn_supervision_task,
780 };
781 use crate::lifecycle::adapters::route_controller::{CrashNotification, DefaultRouteController};
782 use crate::lifecycle::application::route_definition::RouteDefinition;
783 use crate::shared::components::domain::Registry;
784 use crate::shared::observability::domain::TracerConfig;
785 use camel_api::{
786 CamelError, ErrorHandlerConfig, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult,
787 RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult, SupervisionConfig,
788 };
789 use std::sync::Arc;
790 use std::time::Duration;
791 use tokio::sync::mpsc;
792 use tokio::time::sleep;
793
794 fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
795 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
796 {
797 let mut guard = registry.lock().expect("lock");
798 guard.register(std::sync::Arc::new(
799 camel_component_timer::TimerComponent::new(),
800 ));
801 guard.register(std::sync::Arc::new(
802 camel_component_mock::MockComponent::new(),
803 ));
804 }
805 let controller = DefaultRouteController::new(
806 Arc::clone(®istry),
807 Arc::new(camel_api::NoopPlatformService::default()),
808 );
809 spawn_controller_actor(controller)
810 }
811
812 fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
813 let controller = DefaultRouteController::new(
814 Arc::new(std::sync::Mutex::new(Registry::new())),
815 Arc::new(camel_api::NoopPlatformService::default()),
816 );
817 spawn_controller_actor(controller)
818 }
819
820 fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
821 RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
822 }
823
824 struct NoopRuntime;
825
826 #[async_trait::async_trait]
827 impl RuntimeCommandBus for NoopRuntime {
828 async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
829 Ok(RuntimeCommandResult::Accepted)
830 }
831 }
832
833 #[async_trait::async_trait]
834 impl RuntimeQueryBus for NoopRuntime {
835 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
836 Ok(match query {
837 RuntimeQuery::GetRouteStatus { route_id }
838 | RuntimeQuery::InFlightCount { route_id } => {
839 RuntimeQueryResult::RouteNotFound { route_id }
840 }
841 RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
842 route_ids: Vec::new(),
843 },
844 })
845 }
846 }
847
848 #[tokio::test]
849 async fn start_route_sends_command_and_returns_reply() {
850 let (tx, mut rx) = mpsc::channel(1);
851 let handle = RouteControllerHandle { tx };
852
853 let task = tokio::spawn(async move { handle.start_route("route-a").await });
854
855 let command = rx.recv().await.expect("command should be received");
856 match command {
857 RouteControllerCommand::StartRoute { route_id, reply } => {
858 assert_eq!(route_id, "route-a");
859 let _ = reply.send(Ok(()));
860 }
861 _ => panic!("unexpected command variant"),
862 }
863
864 let result = task.await.expect("join should succeed");
865 assert!(result.is_ok());
866 }
867
868 #[tokio::test]
869 async fn start_route_returns_error_when_actor_stops() {
870 let (tx, rx) = mpsc::channel(1);
871 drop(rx);
872
873 let handle = RouteControllerHandle { tx };
874 let result = handle.start_route("route-a").await;
875
876 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
877 }
878
879 #[tokio::test]
880 async fn spawn_controller_actor_processes_commands_and_shutdown() {
881 let controller = DefaultRouteController::new(
882 Arc::new(std::sync::Mutex::new(Registry::new())),
883 Arc::new(camel_api::NoopPlatformService::default()),
884 );
885 let (handle, join_handle) = spawn_controller_actor(controller);
886
887 assert_eq!(handle.route_count().await.expect("route_count"), 0);
888 assert_eq!(
889 handle.route_ids().await.expect("route_ids"),
890 Vec::<String>::new()
891 );
892
893 handle.shutdown().await.expect("shutdown send");
894 join_handle.await.expect("actor join");
895 }
896
897 #[tokio::test]
898 async fn actor_handle_introspection_and_mutation_commands() {
899 let (handle, join_handle) = build_actor_with_components();
900 let definition = route_def("h-1", "timer:tick?period=100");
901
902 handle.add_route(definition).await.expect("add route");
903 assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
904 assert!(
905 !handle
906 .route_exists("no-such")
907 .await
908 .expect("route exists no-such")
909 );
910
911 let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
912 assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
913 assert_eq!(handle.route_count().await.expect("route_count"), 1);
914
915 let auto_ids = handle
916 .auto_startup_route_ids()
917 .await
918 .expect("auto_startup_route_ids");
919 assert!(auto_ids.iter().any(|id| id == "h-1"));
920
921 let shutdown_ids = handle
922 .shutdown_route_ids()
923 .await
924 .expect("shutdown_route_ids");
925 assert!(shutdown_ids.iter().any(|id| id == "h-1"));
926
927 let compiled = handle
928 .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
929 .await
930 .expect("compile_route_definition");
931
932 assert!(
933 handle
934 .get_pipeline("h-1")
935 .await
936 .expect("get_pipeline")
937 .is_some()
938 );
939 handle
940 .swap_pipeline("h-1", compiled)
941 .await
942 .expect("swap_pipeline");
943
944 let _ = handle
945 .in_flight_count("h-1")
946 .await
947 .expect("in_flight_count");
948 let _ = handle.route_source_hash("h-1").await;
949
950 handle
951 .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
952 .await
953 .expect("set_error_handler");
954 handle
955 .set_tracer_config(TracerConfig::default())
956 .await
957 .expect("set_tracer_config");
958 handle
959 .set_runtime_handle(Arc::new(NoopRuntime))
960 .await
961 .expect("set_runtime_handle");
962
963 handle.remove_route("h-1").await.expect("remove_route");
964 assert_eq!(
965 handle
966 .route_count()
967 .await
968 .expect("route_count after remove"),
969 0
970 );
971 handle
972 .stop_all_routes()
973 .await
974 .expect("stop_all_routes on empty");
975
976 handle.shutdown().await.expect("shutdown send");
977 join_handle.await.expect("actor join");
978 }
979
980 #[tokio::test]
981 async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
982 let (handle, join_handle) = build_actor_with_components();
983 handle
984 .add_route(route_def("lc-1", "timer:tick?period=50"))
985 .await
986 .expect("add route lc-1");
987
988 handle.start_route("lc-1").await.expect("start_route");
989 sleep(Duration::from_millis(20)).await;
990
991 handle.restart_route("lc-1").await.expect("restart_route");
992 sleep(Duration::from_millis(20)).await;
993
994 handle.suspend_route("lc-1").await.expect("suspend_route");
995 handle.resume_route("lc-1").await.expect("resume_route");
996 sleep(Duration::from_millis(20)).await;
997
998 handle.stop_route("lc-1").await.expect("stop_route");
999 handle.start_all_routes().await.expect("start_all_routes");
1000 sleep(Duration::from_millis(20)).await;
1001 handle.stop_all_routes().await.expect("stop_all_routes");
1002
1003 handle
1004 .start_route_reload("lc-1")
1005 .await
1006 .expect("start_route_reload");
1007 handle
1008 .stop_route_reload("lc-1")
1009 .await
1010 .expect("stop_route_reload");
1011
1012 handle.shutdown().await.expect("shutdown send");
1013 join_handle.await.expect("actor join");
1014 }
1015
1016 #[tokio::test]
1017 async fn spawn_supervision_restarts_route_on_crash() {
1018 let (handle, join_handle) = build_actor_with_components();
1019 handle
1020 .add_route(route_def("sup-1", "timer:tick?period=100"))
1021 .await
1022 .expect("add route sup-1");
1023 handle
1024 .start_route("sup-1")
1025 .await
1026 .expect("start_route sup-1");
1027
1028 let (crash_tx, crash_rx) = mpsc::channel(8);
1029 let supervision = spawn_supervision_task(
1030 handle.clone(),
1031 SupervisionConfig {
1032 initial_delay: Duration::from_millis(10),
1033 max_attempts: Some(2),
1034 ..SupervisionConfig::default()
1035 },
1036 None,
1037 crash_rx,
1038 );
1039
1040 crash_tx
1041 .send(CrashNotification {
1042 route_id: "sup-1".to_string(),
1043 error: "simulated".to_string(),
1044 })
1045 .await
1046 .expect("send crash notification");
1047
1048 sleep(Duration::from_millis(150)).await;
1049 drop(crash_tx);
1050 supervision.await.expect("supervision join");
1051
1052 handle.shutdown().await.expect("shutdown send");
1053 join_handle.await.expect("actor join");
1054 }
1055
1056 #[tokio::test]
1057 async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
1058 let (handle, join_handle) = build_actor_with_components();
1059 handle
1060 .add_route(route_def("sup-2", "timer:tick?period=100"))
1061 .await
1062 .expect("add route sup-2");
1063 handle
1064 .start_route("sup-2")
1065 .await
1066 .expect("start_route sup-2");
1067
1068 let (crash_tx, crash_rx) = mpsc::channel(8);
1069 let supervision = spawn_supervision_task(
1070 handle.clone(),
1071 SupervisionConfig {
1072 initial_delay: Duration::from_millis(10),
1073 max_attempts: Some(1),
1074 ..SupervisionConfig::default()
1075 },
1076 None,
1077 crash_rx,
1078 );
1079
1080 crash_tx
1081 .send(CrashNotification {
1082 route_id: "sup-2".to_string(),
1083 error: "attempt-1".to_string(),
1084 })
1085 .await
1086 .expect("send crash attempt-1");
1087 crash_tx
1088 .send(CrashNotification {
1089 route_id: "sup-2".to_string(),
1090 error: "attempt-2".to_string(),
1091 })
1092 .await
1093 .expect("send crash attempt-2");
1094
1095 sleep(Duration::from_millis(200)).await;
1096 drop(crash_tx);
1097 supervision.await.expect("supervision join");
1098
1099 handle.shutdown().await.expect("shutdown send");
1100 join_handle.await.expect("actor join");
1101 }
1102
1103 #[tokio::test]
1104 async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
1105 let (handle, join_handle) = build_empty_actor();
1106
1107 handle
1108 .try_set_runtime_handle(Arc::new(NoopRuntime))
1109 .expect("try_set_runtime_handle should succeed");
1110
1111 handle.shutdown().await.expect("shutdown send");
1112 join_handle.await.expect("actor join");
1113 }
1114
1115 #[tokio::test]
1116 async fn shutdown_returns_error_when_actor_stopped() {
1117 let (tx, rx) = mpsc::channel(1);
1118 drop(rx);
1119
1120 let handle = RouteControllerHandle { tx };
1121 let result = handle.shutdown().await;
1122
1123 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
1124 }
1125}