1use std::sync::Arc;
2use std::time::Instant;
3
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::{
6 BoxProcessor, CamelError, FunctionInvoker, MetricsCollector, RouteController, RuntimeHandle,
7 SupervisionConfig,
8};
9use tokio::sync::{mpsc, oneshot};
10use tokio::task::JoinHandle;
11use tracing::{error, info};
12
13use super::route_controller::{CrashNotification, DefaultRouteController, PreparedRoute};
14use crate::lifecycle::application::route_definition::RouteDefinition;
15use crate::shared::observability::domain::TracerConfig;
16
17pub(crate) enum RouteControllerCommand {
18 StartRoute {
19 route_id: String,
20 reply: oneshot::Sender<Result<(), CamelError>>,
21 },
22 StopRoute {
23 route_id: String,
24 reply: oneshot::Sender<Result<(), CamelError>>,
25 },
26 RestartRoute {
27 route_id: String,
28 reply: oneshot::Sender<Result<(), CamelError>>,
29 },
30 SuspendRoute {
31 route_id: String,
32 reply: oneshot::Sender<Result<(), CamelError>>,
33 },
34 ResumeRoute {
35 route_id: String,
36 reply: oneshot::Sender<Result<(), CamelError>>,
37 },
38 StartAllRoutes {
39 reply: oneshot::Sender<Result<(), CamelError>>,
40 },
41 StopAllRoutes {
42 reply: oneshot::Sender<Result<(), CamelError>>,
43 },
44 AddRoute {
45 definition: RouteDefinition,
46 reply: oneshot::Sender<Result<(), CamelError>>,
47 },
48 RemoveRoute {
49 route_id: String,
50 reply: oneshot::Sender<Result<(), CamelError>>,
51 },
52 SwapPipeline {
53 route_id: String,
54 pipeline: BoxProcessor,
55 reply: oneshot::Sender<Result<(), CamelError>>,
56 },
57 CompileRouteDefinition {
58 definition: RouteDefinition,
59 reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
60 },
61 CompileRouteDefinitionWithGeneration {
62 definition: RouteDefinition,
63 generation: u64,
64 reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
65 },
66 PrepareRouteDefinitionWithGeneration {
67 definition: RouteDefinition,
68 generation: u64,
69 reply: oneshot::Sender<Result<PreparedRoute, CamelError>>,
70 },
71 InsertPreparedRoute {
72 prepared: PreparedRoute,
73 reply: oneshot::Sender<Result<(), CamelError>>,
74 },
75 RemoveRoutePreservingFunctions {
76 route_id: String,
77 reply: oneshot::Sender<Result<(), CamelError>>,
78 },
79 RouteFromUri {
80 route_id: String,
81 reply: oneshot::Sender<Option<String>>,
82 },
83 SetErrorHandler {
84 config: ErrorHandlerConfig,
85 },
86 SetTracerConfig {
87 config: TracerConfig,
88 },
89 RouteCount {
90 reply: oneshot::Sender<usize>,
91 },
92 InFlightCount {
93 route_id: String,
94 reply: oneshot::Sender<Option<u64>>,
95 },
96 RouteExists {
97 route_id: String,
98 reply: oneshot::Sender<bool>,
99 },
100 RouteIds {
101 reply: oneshot::Sender<Vec<String>>,
102 },
103 AutoStartupRouteIds {
104 reply: oneshot::Sender<Vec<String>>,
105 },
106 ShutdownRouteIds {
107 reply: oneshot::Sender<Vec<String>>,
108 },
109 GetPipeline {
110 route_id: String,
111 reply: oneshot::Sender<Option<BoxProcessor>>,
112 },
113 StartRouteReload {
114 route_id: String,
115 reply: oneshot::Sender<Result<(), CamelError>>,
116 },
117 StopRouteReload {
118 route_id: String,
119 reply: oneshot::Sender<Result<(), CamelError>>,
120 },
121 SetRuntimeHandle {
122 runtime: Arc<dyn RuntimeHandle>,
123 },
124 SetFunctionInvoker {
125 invoker: Arc<dyn FunctionInvoker>,
126 },
127 RouteSourceHash {
128 route_id: String,
129 reply: oneshot::Sender<Option<u64>>,
130 },
131 Shutdown,
132}
133
134#[derive(Clone)]
135pub struct RouteControllerHandle {
136 tx: mpsc::Sender<RouteControllerCommand>,
137}
138
139impl RouteControllerHandle {
140 pub async fn start_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
141 let (reply_tx, reply_rx) = oneshot::channel();
142 self.tx
143 .send(RouteControllerCommand::StartRoute {
144 route_id: route_id.into(),
145 reply: reply_tx,
146 })
147 .await
148 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
149 reply_rx
150 .await
151 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
152 }
153
154 pub async fn stop_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
155 let (reply_tx, reply_rx) = oneshot::channel();
156 self.tx
157 .send(RouteControllerCommand::StopRoute {
158 route_id: route_id.into(),
159 reply: reply_tx,
160 })
161 .await
162 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
163 reply_rx
164 .await
165 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
166 }
167
168 pub async fn restart_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
169 let (reply_tx, reply_rx) = oneshot::channel();
170 self.tx
171 .send(RouteControllerCommand::RestartRoute {
172 route_id: route_id.into(),
173 reply: reply_tx,
174 })
175 .await
176 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
177 reply_rx
178 .await
179 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
180 }
181
182 pub async fn suspend_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
183 let (reply_tx, reply_rx) = oneshot::channel();
184 self.tx
185 .send(RouteControllerCommand::SuspendRoute {
186 route_id: route_id.into(),
187 reply: reply_tx,
188 })
189 .await
190 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
191 reply_rx
192 .await
193 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
194 }
195
196 pub async fn resume_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
197 let (reply_tx, reply_rx) = oneshot::channel();
198 self.tx
199 .send(RouteControllerCommand::ResumeRoute {
200 route_id: route_id.into(),
201 reply: reply_tx,
202 })
203 .await
204 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
205 reply_rx
206 .await
207 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
208 }
209
210 pub async fn start_all_routes(&self) -> Result<(), CamelError> {
211 let (reply_tx, reply_rx) = oneshot::channel();
212 self.tx
213 .send(RouteControllerCommand::StartAllRoutes { reply: reply_tx })
214 .await
215 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
216 reply_rx
217 .await
218 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
219 }
220
221 pub async fn stop_all_routes(&self) -> Result<(), CamelError> {
222 let (reply_tx, reply_rx) = oneshot::channel();
223 self.tx
224 .send(RouteControllerCommand::StopAllRoutes { reply: reply_tx })
225 .await
226 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
227 reply_rx
228 .await
229 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
230 }
231
232 pub async fn add_route(&self, definition: RouteDefinition) -> Result<(), CamelError> {
233 let (reply_tx, reply_rx) = oneshot::channel();
234 self.tx
235 .send(RouteControllerCommand::AddRoute {
236 definition,
237 reply: reply_tx,
238 })
239 .await
240 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
241 reply_rx
242 .await
243 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
244 }
245
246 pub async fn remove_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
247 let (reply_tx, reply_rx) = oneshot::channel();
248 self.tx
249 .send(RouteControllerCommand::RemoveRoute {
250 route_id: route_id.into(),
251 reply: reply_tx,
252 })
253 .await
254 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
255 reply_rx
256 .await
257 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
258 }
259
260 pub async fn swap_pipeline(
261 &self,
262 route_id: impl Into<String>,
263 pipeline: BoxProcessor,
264 ) -> Result<(), CamelError> {
265 let (reply_tx, reply_rx) = oneshot::channel();
266 self.tx
267 .send(RouteControllerCommand::SwapPipeline {
268 route_id: route_id.into(),
269 pipeline,
270 reply: reply_tx,
271 })
272 .await
273 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
274 reply_rx
275 .await
276 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
277 }
278
279 pub async fn compile_route_definition(
280 &self,
281 definition: RouteDefinition,
282 ) -> Result<BoxProcessor, CamelError> {
283 let (reply_tx, reply_rx) = oneshot::channel();
284 self.tx
285 .send(RouteControllerCommand::CompileRouteDefinition {
286 definition,
287 reply: reply_tx,
288 })
289 .await
290 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
291 reply_rx
292 .await
293 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
294 }
295
296 pub async fn compile_route_definition_with_generation(
297 &self,
298 definition: RouteDefinition,
299 generation: u64,
300 ) -> Result<BoxProcessor, CamelError> {
301 let (reply_tx, reply_rx) = oneshot::channel();
302 self.tx
303 .send(
304 RouteControllerCommand::CompileRouteDefinitionWithGeneration {
305 definition,
306 generation,
307 reply: reply_tx,
308 },
309 )
310 .await
311 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
312 reply_rx
313 .await
314 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
315 }
316
317 pub(crate) async fn prepare_route_definition_with_generation(
318 &self,
319 definition: RouteDefinition,
320 generation: u64,
321 ) -> Result<PreparedRoute, CamelError> {
322 let (reply_tx, reply_rx) = oneshot::channel();
323 self.tx
324 .send(
325 RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
326 definition,
327 generation,
328 reply: reply_tx,
329 },
330 )
331 .await
332 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
333 reply_rx
334 .await
335 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
336 }
337
338 pub(crate) async fn insert_prepared_route(
339 &self,
340 prepared: PreparedRoute,
341 ) -> Result<(), CamelError> {
342 let (reply_tx, reply_rx) = oneshot::channel();
343 self.tx
344 .send(RouteControllerCommand::InsertPreparedRoute {
345 prepared,
346 reply: reply_tx,
347 })
348 .await
349 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
350 reply_rx
351 .await
352 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
353 }
354
355 pub async fn remove_route_preserving_functions(
356 &self,
357 route_id: String,
358 ) -> Result<(), CamelError> {
359 let (reply_tx, reply_rx) = oneshot::channel();
360 self.tx
361 .send(RouteControllerCommand::RemoveRoutePreservingFunctions {
362 route_id,
363 reply: reply_tx,
364 })
365 .await
366 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
367 reply_rx
368 .await
369 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
370 }
371
372 pub async fn route_from_uri(
373 &self,
374 route_id: impl Into<String>,
375 ) -> Result<Option<String>, CamelError> {
376 let (reply_tx, reply_rx) = oneshot::channel();
377 self.tx
378 .send(RouteControllerCommand::RouteFromUri {
379 route_id: route_id.into(),
380 reply: reply_tx,
381 })
382 .await
383 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
384 reply_rx
385 .await
386 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
387 }
388
389 pub async fn route_count(&self) -> Result<usize, CamelError> {
390 let (reply_tx, reply_rx) = oneshot::channel();
391 self.tx
392 .send(RouteControllerCommand::RouteCount { reply: reply_tx })
393 .await
394 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
395 reply_rx
396 .await
397 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
398 }
399
400 pub async fn in_flight_count(
401 &self,
402 route_id: impl Into<String>,
403 ) -> Result<Option<u64>, CamelError> {
404 let (reply_tx, reply_rx) = oneshot::channel();
405 self.tx
406 .send(RouteControllerCommand::InFlightCount {
407 route_id: route_id.into(),
408 reply: reply_tx,
409 })
410 .await
411 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
412 reply_rx
413 .await
414 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
415 }
416
417 pub async fn route_exists(&self, route_id: impl Into<String>) -> Result<bool, CamelError> {
418 let (reply_tx, reply_rx) = oneshot::channel();
419 self.tx
420 .send(RouteControllerCommand::RouteExists {
421 route_id: route_id.into(),
422 reply: reply_tx,
423 })
424 .await
425 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
426 reply_rx
427 .await
428 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
429 }
430
431 pub async fn route_ids(&self) -> Result<Vec<String>, CamelError> {
432 let (reply_tx, reply_rx) = oneshot::channel();
433 self.tx
434 .send(RouteControllerCommand::RouteIds { reply: reply_tx })
435 .await
436 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
437 reply_rx
438 .await
439 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
440 }
441
442 pub async fn auto_startup_route_ids(&self) -> Result<Vec<String>, CamelError> {
443 let (reply_tx, reply_rx) = oneshot::channel();
444 self.tx
445 .send(RouteControllerCommand::AutoStartupRouteIds { reply: reply_tx })
446 .await
447 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
448 reply_rx
449 .await
450 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
451 }
452
453 pub async fn shutdown_route_ids(&self) -> Result<Vec<String>, CamelError> {
454 let (reply_tx, reply_rx) = oneshot::channel();
455 self.tx
456 .send(RouteControllerCommand::ShutdownRouteIds { reply: reply_tx })
457 .await
458 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
459 reply_rx
460 .await
461 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
462 }
463
464 pub async fn start_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
465 let (reply_tx, reply_rx) = oneshot::channel();
466 self.tx
467 .send(RouteControllerCommand::StartRouteReload {
468 route_id: route_id.into(),
469 reply: reply_tx,
470 })
471 .await
472 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
473 reply_rx
474 .await
475 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
476 }
477
478 pub async fn stop_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
479 let (reply_tx, reply_rx) = oneshot::channel();
480 self.tx
481 .send(RouteControllerCommand::StopRouteReload {
482 route_id: route_id.into(),
483 reply: reply_tx,
484 })
485 .await
486 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
487 reply_rx
488 .await
489 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
490 }
491
492 pub async fn get_pipeline(
493 &self,
494 route_id: impl Into<String>,
495 ) -> Result<Option<BoxProcessor>, CamelError> {
496 let (reply_tx, reply_rx) = oneshot::channel();
497 self.tx
498 .send(RouteControllerCommand::GetPipeline {
499 route_id: route_id.into(),
500 reply: reply_tx,
501 })
502 .await
503 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
504 reply_rx
505 .await
506 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
507 }
508
509 pub async fn set_error_handler(&self, config: ErrorHandlerConfig) -> Result<(), CamelError> {
510 self.tx
511 .send(RouteControllerCommand::SetErrorHandler { config })
512 .await
513 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
514 }
515
516 pub async fn set_tracer_config(&self, config: TracerConfig) -> Result<(), CamelError> {
517 self.tx
518 .send(RouteControllerCommand::SetTracerConfig { config })
519 .await
520 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
521 }
522
523 pub async fn set_runtime_handle(
524 &self,
525 runtime: Arc<dyn RuntimeHandle>,
526 ) -> Result<(), CamelError> {
527 self.tx
528 .send(RouteControllerCommand::SetRuntimeHandle { runtime })
529 .await
530 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
531 }
532
533 pub fn try_set_runtime_handle(
534 &self,
535 runtime: Arc<dyn RuntimeHandle>,
536 ) -> Result<(), CamelError> {
537 self.tx
538 .try_send(RouteControllerCommand::SetRuntimeHandle { runtime })
539 .map_err(|err| {
540 CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
541 })
542 }
543
544 pub async fn set_function_invoker(
545 &self,
546 invoker: Arc<dyn FunctionInvoker>,
547 ) -> Result<(), CamelError> {
548 self.tx
549 .send(RouteControllerCommand::SetFunctionInvoker { invoker })
550 .await
551 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
552 }
553
554 pub fn try_set_function_invoker(
555 &self,
556 invoker: Arc<dyn FunctionInvoker>,
557 ) -> Result<(), CamelError> {
558 self.tx
559 .try_send(RouteControllerCommand::SetFunctionInvoker { invoker })
560 .map_err(|err| {
561 CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
562 })
563 }
564
565 pub async fn route_source_hash(&self, route_id: impl Into<String>) -> Option<u64> {
566 let (reply_tx, reply_rx) = oneshot::channel();
567 self.tx
568 .send(RouteControllerCommand::RouteSourceHash {
569 route_id: route_id.into(),
570 reply: reply_tx,
571 })
572 .await
573 .ok()?;
574 reply_rx.await.ok()?
575 }
576
577 pub async fn shutdown(&self) -> Result<(), CamelError> {
578 self.tx
579 .send(RouteControllerCommand::Shutdown)
580 .await
581 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
582 }
583}
584
585pub fn spawn_controller_actor(
586 controller: DefaultRouteController,
587) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
588 let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
589 let handle = tokio::spawn(async move {
590 let mut controller = controller;
591 while let Some(cmd) = rx.recv().await {
592 match cmd {
593 RouteControllerCommand::StartRoute { route_id, reply } => {
594 let _ = reply.send(controller.start_route(&route_id).await);
595 }
596 RouteControllerCommand::StopRoute { route_id, reply } => {
597 let _ = reply.send(controller.stop_route(&route_id).await);
598 }
599 RouteControllerCommand::RestartRoute { route_id, reply } => {
600 let _ = reply.send(controller.restart_route(&route_id).await);
601 }
602 RouteControllerCommand::SuspendRoute { route_id, reply } => {
603 let _ = reply.send(controller.suspend_route(&route_id).await);
604 }
605 RouteControllerCommand::ResumeRoute { route_id, reply } => {
606 let _ = reply.send(controller.resume_route(&route_id).await);
607 }
608 RouteControllerCommand::StartAllRoutes { reply } => {
609 let _ = reply.send(controller.start_all_routes().await);
610 }
611 RouteControllerCommand::StopAllRoutes { reply } => {
612 let _ = reply.send(controller.stop_all_routes().await);
613 }
614 RouteControllerCommand::AddRoute { definition, reply } => {
615 let _ = reply.send(controller.add_route(definition).await);
616 }
617 RouteControllerCommand::RemoveRoute { route_id, reply } => {
618 let _ = reply.send(controller.remove_route(&route_id).await);
619 }
620 RouteControllerCommand::SwapPipeline {
621 route_id,
622 pipeline,
623 reply,
624 } => {
625 let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
626 }
627 RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
628 let _ = reply.send(controller.compile_route_definition(definition));
629 }
630 RouteControllerCommand::CompileRouteDefinitionWithGeneration {
631 definition,
632 generation,
633 reply,
634 } => {
635 let _ = reply.send(
636 controller.compile_route_definition_with_generation(definition, generation),
637 );
638 }
639 RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
640 definition,
641 generation,
642 reply,
643 } => {
644 let _ = reply.send(
645 controller.prepare_route_definition_with_generation(definition, generation),
646 );
647 }
648 RouteControllerCommand::InsertPreparedRoute { prepared, reply } => {
649 let _ = reply.send(controller.insert_prepared_route(prepared));
650 }
651 RouteControllerCommand::RemoveRoutePreservingFunctions { route_id, reply } => {
652 let _ = reply.send(
653 controller
654 .remove_route_preserving_functions(&route_id)
655 .await,
656 );
657 }
658 RouteControllerCommand::RouteFromUri { route_id, reply } => {
659 let _ = reply.send(controller.route_from_uri(&route_id));
660 }
661 RouteControllerCommand::SetErrorHandler { config } => {
662 controller.set_error_handler(config);
663 }
664 RouteControllerCommand::SetTracerConfig { config } => {
665 controller.set_tracer_config(&config);
666 }
667 RouteControllerCommand::RouteCount { reply } => {
668 let _ = reply.send(controller.route_count());
669 }
670 RouteControllerCommand::InFlightCount { route_id, reply } => {
671 let _ = reply.send(controller.in_flight_count(&route_id));
672 }
673 RouteControllerCommand::RouteExists { route_id, reply } => {
674 let _ = reply.send(controller.route_exists(&route_id));
675 }
676 RouteControllerCommand::RouteIds { reply } => {
677 let _ = reply.send(controller.route_ids());
678 }
679 RouteControllerCommand::AutoStartupRouteIds { reply } => {
680 let _ = reply.send(controller.auto_startup_route_ids());
681 }
682 RouteControllerCommand::ShutdownRouteIds { reply } => {
683 let _ = reply.send(controller.shutdown_route_ids());
684 }
685 RouteControllerCommand::GetPipeline { route_id, reply } => {
686 let _ = reply.send(controller.get_pipeline(&route_id));
687 }
688 RouteControllerCommand::StartRouteReload { route_id, reply } => {
689 let _ = reply.send(controller.start_route_reload(&route_id).await);
690 }
691 RouteControllerCommand::StopRouteReload { route_id, reply } => {
692 let _ = reply.send(controller.stop_route_reload(&route_id).await);
693 }
694 RouteControllerCommand::SetRuntimeHandle { runtime } => {
695 controller.set_runtime_handle(runtime);
696 }
697 RouteControllerCommand::SetFunctionInvoker { invoker } => {
698 controller.set_function_invoker(invoker);
699 }
700 RouteControllerCommand::RouteSourceHash { route_id, reply } => {
701 let _ = reply.send(controller.route_source_hash(&route_id));
702 }
703 RouteControllerCommand::Shutdown => {
704 break;
705 }
706 }
707 }
708 });
709 (RouteControllerHandle { tx }, handle)
710}
711
712pub fn spawn_supervision_task(
713 controller: RouteControllerHandle,
714 config: SupervisionConfig,
715 _metrics: Option<Arc<dyn MetricsCollector>>,
716 mut crash_rx: mpsc::Receiver<CrashNotification>,
717) -> JoinHandle<()> {
718 tokio::spawn(async move {
719 let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
720 let mut last_restart_time: std::collections::HashMap<String, Instant> =
721 std::collections::HashMap::new();
722 let mut currently_restarting: std::collections::HashSet<String> =
723 std::collections::HashSet::new();
724
725 info!("Supervision loop started");
726
727 while let Some(notification) = crash_rx.recv().await {
728 let route_id = notification.route_id;
729 if currently_restarting.contains(&route_id) {
730 continue;
731 }
732
733 if let Some(last_time) = last_restart_time.get(&route_id)
734 && last_time.elapsed() >= config.initial_delay
735 {
736 attempts.insert(route_id.clone(), 0);
737 }
738
739 let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
740 *current_attempt += 1;
741
742 if config
743 .max_attempts
744 .is_some_and(|max| *current_attempt > max)
745 {
746 error!(
748 route_id = %route_id,
749 attempts = *current_attempt,
750 "Route exceeded max restart attempts, giving up"
751 );
752 continue;
753 }
754
755 let delay = config.next_delay(*current_attempt);
756 currently_restarting.insert(route_id.clone());
757 tokio::time::sleep(delay).await;
758
759 match controller.restart_route(route_id.clone()).await {
760 Ok(()) => {
761 info!(route_id = %route_id, "Route restarted successfully");
762 last_restart_time.insert(route_id.clone(), Instant::now());
763 }
764 Err(err) => {
765 error!(route_id = %route_id, error = %err, "Failed to restart route");
767 }
768 }
769
770 currently_restarting.remove(&route_id);
771 }
772
773 info!("Supervision loop ended");
774 })
775}
776
777#[cfg(test)]
778mod tests {
779 use super::{
780 RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
781 spawn_supervision_task,
782 };
783 use crate::lifecycle::adapters::route_controller::{CrashNotification, DefaultRouteController};
784 use crate::lifecycle::application::route_definition::RouteDefinition;
785 use crate::shared::components::domain::Registry;
786 use crate::shared::observability::domain::TracerConfig;
787 use camel_api::function::PrepareToken;
788 use camel_api::{
789 CamelError, ErrorHandlerConfig, Exchange, ExchangePatch, FunctionDefinition, FunctionDiff,
790 FunctionId, FunctionInvocationError, FunctionInvoker, FunctionInvokerSync, RuntimeCommand,
791 RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult,
792 SupervisionConfig,
793 };
794 use std::sync::Arc;
795 use std::time::Duration;
796 use tokio::sync::mpsc;
797 use tokio::time::sleep;
798
799 fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
800 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
801 {
802 let mut guard = registry.lock().expect("lock");
803 guard.register(std::sync::Arc::new(
804 camel_component_timer::TimerComponent::new(),
805 ));
806 guard.register(std::sync::Arc::new(
807 camel_component_mock::MockComponent::new(),
808 ));
809 }
810 let controller = DefaultRouteController::new(
811 Arc::clone(®istry),
812 Arc::new(camel_api::NoopPlatformService::default()),
813 );
814 spawn_controller_actor(controller)
815 }
816
817 fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
818 let controller = DefaultRouteController::new(
819 Arc::new(std::sync::Mutex::new(Registry::new())),
820 Arc::new(camel_api::NoopPlatformService::default()),
821 );
822 spawn_controller_actor(controller)
823 }
824
825 fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
826 RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
827 }
828
829 struct NoopRuntime;
830 struct NoopInvoker;
831
832 #[async_trait::async_trait]
833 impl RuntimeCommandBus for NoopRuntime {
834 async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
835 Ok(RuntimeCommandResult::Accepted)
836 }
837 }
838
839 #[async_trait::async_trait]
840 impl RuntimeQueryBus for NoopRuntime {
841 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
842 Ok(match query {
843 RuntimeQuery::GetRouteStatus { route_id }
844 | RuntimeQuery::InFlightCount { route_id } => {
845 RuntimeQueryResult::RouteNotFound { route_id }
846 }
847 RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
848 route_ids: Vec::new(),
849 },
850 })
851 }
852 }
853
854 impl FunctionInvokerSync for NoopInvoker {
855 fn stage_pending(
856 &self,
857 _def: FunctionDefinition,
858 _route_id: Option<&str>,
859 _generation: u64,
860 ) {
861 }
862 fn discard_staging(&self, _generation: u64) {}
863 fn begin_reload(&self) -> u64 {
864 1
865 }
866 fn function_refs_for_route(&self, _route_id: &str) -> Vec<(FunctionId, Option<String>)> {
867 vec![]
868 }
869 fn staged_refs_for_route(
870 &self,
871 _route_id: &str,
872 _generation: u64,
873 ) -> Vec<(FunctionId, Option<String>)> {
874 vec![]
875 }
876 fn staged_defs_for_route(
877 &self,
878 _route_id: &str,
879 _generation: u64,
880 ) -> Vec<(FunctionDefinition, Option<String>)> {
881 vec![]
882 }
883 }
884
885 #[async_trait::async_trait]
886 impl FunctionInvoker for NoopInvoker {
887 async fn register(
888 &self,
889 _def: FunctionDefinition,
890 _route_id: Option<&str>,
891 ) -> Result<(), FunctionInvocationError> {
892 Ok(())
893 }
894 async fn unregister(
895 &self,
896 _id: &FunctionId,
897 _route_id: Option<&str>,
898 ) -> Result<(), FunctionInvocationError> {
899 Ok(())
900 }
901 async fn invoke(
902 &self,
903 _id: &FunctionId,
904 _exchange: &Exchange,
905 ) -> Result<ExchangePatch, FunctionInvocationError> {
906 Ok(ExchangePatch::default())
907 }
908 async fn prepare_reload(
909 &self,
910 _diff: FunctionDiff,
911 _generation: u64,
912 ) -> Result<PrepareToken, FunctionInvocationError> {
913 Ok(PrepareToken::default())
914 }
915 async fn finalize_reload(
916 &self,
917 _diff: &FunctionDiff,
918 _generation: u64,
919 ) -> Result<(), FunctionInvocationError> {
920 Ok(())
921 }
922 async fn rollback_reload(
923 &self,
924 _token: PrepareToken,
925 _generation: u64,
926 ) -> Result<(), FunctionInvocationError> {
927 Ok(())
928 }
929 async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
930 Ok(())
931 }
932 }
933
934 #[tokio::test]
935 async fn start_route_sends_command_and_returns_reply() {
936 let (tx, mut rx) = mpsc::channel(1);
937 let handle = RouteControllerHandle { tx };
938
939 let task = tokio::spawn(async move { handle.start_route("route-a").await });
940
941 let command = rx.recv().await.expect("command should be received");
942 match command {
943 RouteControllerCommand::StartRoute { route_id, reply } => {
944 assert_eq!(route_id, "route-a");
945 let _ = reply.send(Ok(()));
946 }
947 _ => panic!("unexpected command variant"),
948 }
949
950 let result = task.await.expect("join should succeed");
951 assert!(result.is_ok());
952 }
953
954 #[tokio::test]
955 async fn start_route_returns_error_when_actor_stops() {
956 let (tx, rx) = mpsc::channel(1);
957 drop(rx);
958
959 let handle = RouteControllerHandle { tx };
960 let result = handle.start_route("route-a").await;
961
962 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
963 }
964
965 #[tokio::test]
966 async fn spawn_controller_actor_processes_commands_and_shutdown() {
967 let controller = DefaultRouteController::new(
968 Arc::new(std::sync::Mutex::new(Registry::new())),
969 Arc::new(camel_api::NoopPlatformService::default()),
970 );
971 let (handle, join_handle) = spawn_controller_actor(controller);
972
973 assert_eq!(handle.route_count().await.expect("route_count"), 0);
974 assert_eq!(
975 handle.route_ids().await.expect("route_ids"),
976 Vec::<String>::new()
977 );
978
979 handle.shutdown().await.expect("shutdown send");
980 join_handle.await.expect("actor join");
981 }
982
983 #[tokio::test]
984 async fn actor_handle_introspection_and_mutation_commands() {
985 let (handle, join_handle) = build_actor_with_components();
986 let definition = route_def("h-1", "timer:tick?period=100");
987
988 handle.add_route(definition).await.expect("add route");
989 assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
990 assert!(
991 !handle
992 .route_exists("no-such")
993 .await
994 .expect("route exists no-such")
995 );
996
997 let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
998 assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
999 assert_eq!(handle.route_count().await.expect("route_count"), 1);
1000
1001 let auto_ids = handle
1002 .auto_startup_route_ids()
1003 .await
1004 .expect("auto_startup_route_ids");
1005 assert!(auto_ids.iter().any(|id| id == "h-1"));
1006
1007 let shutdown_ids = handle
1008 .shutdown_route_ids()
1009 .await
1010 .expect("shutdown_route_ids");
1011 assert!(shutdown_ids.iter().any(|id| id == "h-1"));
1012
1013 let compiled = handle
1014 .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
1015 .await
1016 .expect("compile_route_definition");
1017
1018 assert!(
1019 handle
1020 .get_pipeline("h-1")
1021 .await
1022 .expect("get_pipeline")
1023 .is_some()
1024 );
1025 handle
1026 .swap_pipeline("h-1", compiled)
1027 .await
1028 .expect("swap_pipeline");
1029
1030 let _ = handle
1031 .in_flight_count("h-1")
1032 .await
1033 .expect("in_flight_count");
1034 let _ = handle.route_source_hash("h-1").await;
1035
1036 handle
1037 .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
1038 .await
1039 .expect("set_error_handler");
1040 handle
1041 .set_tracer_config(TracerConfig::default())
1042 .await
1043 .expect("set_tracer_config");
1044 handle
1045 .set_runtime_handle(Arc::new(NoopRuntime))
1046 .await
1047 .expect("set_runtime_handle");
1048
1049 handle.remove_route("h-1").await.expect("remove_route");
1050 assert_eq!(
1051 handle
1052 .route_count()
1053 .await
1054 .expect("route_count after remove"),
1055 0
1056 );
1057 handle
1058 .stop_all_routes()
1059 .await
1060 .expect("stop_all_routes on empty");
1061
1062 handle.shutdown().await.expect("shutdown send");
1063 join_handle.await.expect("actor join");
1064 }
1065
1066 #[tokio::test]
1067 async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
1068 let (handle, join_handle) = build_actor_with_components();
1069 handle
1070 .add_route(route_def("lc-1", "timer:tick?period=50"))
1071 .await
1072 .expect("add route lc-1");
1073
1074 handle.start_route("lc-1").await.expect("start_route");
1075 sleep(Duration::from_millis(20)).await;
1076
1077 handle.restart_route("lc-1").await.expect("restart_route");
1078 sleep(Duration::from_millis(20)).await;
1079
1080 handle.suspend_route("lc-1").await.expect("suspend_route");
1081 handle.resume_route("lc-1").await.expect("resume_route");
1082 sleep(Duration::from_millis(20)).await;
1083
1084 handle.stop_route("lc-1").await.expect("stop_route");
1085 handle.start_all_routes().await.expect("start_all_routes");
1086 sleep(Duration::from_millis(20)).await;
1087 handle.stop_all_routes().await.expect("stop_all_routes");
1088
1089 handle
1090 .start_route_reload("lc-1")
1091 .await
1092 .expect("start_route_reload");
1093 handle
1094 .stop_route_reload("lc-1")
1095 .await
1096 .expect("stop_route_reload");
1097
1098 handle.shutdown().await.expect("shutdown send");
1099 join_handle.await.expect("actor join");
1100 }
1101
1102 #[tokio::test]
1103 async fn spawn_supervision_restarts_route_on_crash() {
1104 let (handle, join_handle) = build_actor_with_components();
1105 handle
1106 .add_route(route_def("sup-1", "timer:tick?period=100"))
1107 .await
1108 .expect("add route sup-1");
1109 handle
1110 .start_route("sup-1")
1111 .await
1112 .expect("start_route sup-1");
1113
1114 let (crash_tx, crash_rx) = mpsc::channel(8);
1115 let supervision = spawn_supervision_task(
1116 handle.clone(),
1117 SupervisionConfig {
1118 initial_delay: Duration::from_millis(10),
1119 max_attempts: Some(2),
1120 ..SupervisionConfig::default()
1121 },
1122 None,
1123 crash_rx,
1124 );
1125
1126 crash_tx
1127 .send(CrashNotification {
1128 route_id: "sup-1".to_string(),
1129 error: "simulated".to_string(),
1130 })
1131 .await
1132 .expect("send crash notification");
1133
1134 sleep(Duration::from_millis(150)).await;
1135 drop(crash_tx);
1136 supervision.await.expect("supervision join");
1137
1138 handle.shutdown().await.expect("shutdown send");
1139 join_handle.await.expect("actor join");
1140 }
1141
1142 #[tokio::test]
1143 async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
1144 let (handle, join_handle) = build_actor_with_components();
1145 handle
1146 .add_route(route_def("sup-2", "timer:tick?period=100"))
1147 .await
1148 .expect("add route sup-2");
1149 handle
1150 .start_route("sup-2")
1151 .await
1152 .expect("start_route sup-2");
1153
1154 let (crash_tx, crash_rx) = mpsc::channel(8);
1155 let supervision = spawn_supervision_task(
1156 handle.clone(),
1157 SupervisionConfig {
1158 initial_delay: Duration::from_millis(10),
1159 max_attempts: Some(1),
1160 ..SupervisionConfig::default()
1161 },
1162 None,
1163 crash_rx,
1164 );
1165
1166 crash_tx
1167 .send(CrashNotification {
1168 route_id: "sup-2".to_string(),
1169 error: "attempt-1".to_string(),
1170 })
1171 .await
1172 .expect("send crash attempt-1");
1173 crash_tx
1174 .send(CrashNotification {
1175 route_id: "sup-2".to_string(),
1176 error: "attempt-2".to_string(),
1177 })
1178 .await
1179 .expect("send crash attempt-2");
1180
1181 sleep(Duration::from_millis(200)).await;
1182 drop(crash_tx);
1183 supervision.await.expect("supervision join");
1184
1185 handle.shutdown().await.expect("shutdown send");
1186 join_handle.await.expect("actor join");
1187 }
1188
1189 #[tokio::test]
1190 async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
1191 let (handle, join_handle) = build_empty_actor();
1192
1193 handle
1194 .try_set_runtime_handle(Arc::new(NoopRuntime))
1195 .expect("try_set_runtime_handle should succeed");
1196
1197 handle.shutdown().await.expect("shutdown send");
1198 join_handle.await.expect("actor join");
1199 }
1200
1201 #[tokio::test]
1202 async fn shutdown_returns_error_when_actor_stopped() {
1203 let (tx, rx) = mpsc::channel(1);
1204 drop(rx);
1205
1206 let handle = RouteControllerHandle { tx };
1207 let result = handle.shutdown().await;
1208
1209 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
1210 }
1211
1212 #[tokio::test]
1213 async fn handle_methods_send_expected_commands_and_receive_replies() {
1214 let (tx, mut rx) = mpsc::channel(16);
1215 let handle = RouteControllerHandle { tx };
1216
1217 let stop_task = tokio::spawn({
1218 let h = handle.clone();
1219 async move { h.stop_route("r-1").await }
1220 });
1221 let cmd = rx.recv().await.expect("stop command");
1222 match cmd {
1223 RouteControllerCommand::StopRoute { route_id, reply } => {
1224 assert_eq!(route_id, "r-1");
1225 let _ = reply.send(Ok(()));
1226 }
1227 _ => panic!("unexpected command"),
1228 }
1229 assert!(stop_task.await.expect("join").is_ok());
1230
1231 let exists_task = tokio::spawn({
1232 let h = handle.clone();
1233 async move { h.route_exists("r-2").await }
1234 });
1235 let cmd = rx.recv().await.expect("exists command");
1236 match cmd {
1237 RouteControllerCommand::RouteExists { route_id, reply } => {
1238 assert_eq!(route_id, "r-2");
1239 let _ = reply.send(true);
1240 }
1241 _ => panic!("unexpected command"),
1242 }
1243 assert!(exists_task.await.expect("join").expect("ok"));
1244
1245 let hash_task = tokio::spawn({
1246 let h = handle.clone();
1247 async move { h.route_source_hash("r-3").await }
1248 });
1249 let cmd = rx.recv().await.expect("hash command");
1250 match cmd {
1251 RouteControllerCommand::RouteSourceHash { route_id, reply } => {
1252 assert_eq!(route_id, "r-3");
1253 let _ = reply.send(Some(77));
1254 }
1255 _ => panic!("unexpected command"),
1256 }
1257 assert_eq!(hash_task.await.expect("join"), Some(77));
1258 }
1259
1260 #[tokio::test]
1261 async fn handle_methods_error_on_dropped_reply_channel() {
1262 let (tx, mut rx) = mpsc::channel(16);
1263 let handle = RouteControllerHandle { tx };
1264
1265 let count_task = tokio::spawn({
1266 let h = handle.clone();
1267 async move { h.route_count().await }
1268 });
1269 let cmd = rx.recv().await.expect("route_count command");
1270 match cmd {
1271 RouteControllerCommand::RouteCount { reply } => drop(reply),
1272 _ => panic!("unexpected command"),
1273 }
1274 assert!(matches!(
1275 count_task.await.expect("join"),
1276 Err(CamelError::ProcessorError(_))
1277 ));
1278
1279 let stop_task = tokio::spawn({
1280 let h = handle.clone();
1281 async move { h.stop_route("x").await }
1282 });
1283 let cmd = rx.recv().await.expect("stop command");
1284 match cmd {
1285 RouteControllerCommand::StopRoute { reply, .. } => drop(reply),
1286 _ => panic!("unexpected command"),
1287 }
1288 assert!(matches!(
1289 stop_task.await.expect("join"),
1290 Err(CamelError::ProcessorError(_))
1291 ));
1292
1293 let maybe_hash = tokio::spawn({
1294 let h = handle.clone();
1295 async move { h.route_source_hash("x").await }
1296 });
1297 let cmd = rx.recv().await.expect("hash command");
1298 match cmd {
1299 RouteControllerCommand::RouteSourceHash { reply, .. } => drop(reply),
1300 _ => panic!("unexpected command"),
1301 }
1302 assert_eq!(maybe_hash.await.expect("join"), None);
1303 }
1304
1305 #[test]
1306 fn try_set_function_invoker_returns_mailbox_full() {
1307 let (tx, mut rx) = mpsc::channel(1);
1308 tx.try_send(RouteControllerCommand::Shutdown)
1309 .expect("fill mailbox");
1310 let handle = RouteControllerHandle { tx };
1311
1312 let result = handle.try_set_function_invoker(Arc::new(NoopInvoker));
1313 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
1314
1315 rx.try_recv().expect("mailbox still has first message");
1316 }
1317}