1use std::sync::Arc;
2use std::time::Instant;
3
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::{
6 BoxProcessor, CamelError, FunctionInvoker, MetricsCollector, RouteController, RuntimeHandle,
7 SupervisionConfig,
8};
9use tokio::sync::{mpsc, oneshot};
10use tokio::task::JoinHandle;
11use tracing::{error, info};
12
13use super::route_controller::{CrashNotification, DefaultRouteController, PreparedRoute};
14use crate::lifecycle::application::route_definition::RouteDefinition;
15use crate::shared::observability::domain::TracerConfig;
16
17pub(crate) enum RouteControllerCommand {
18 StartRoute {
19 route_id: String,
20 reply: oneshot::Sender<Result<(), CamelError>>,
21 },
22 StopRoute {
23 route_id: String,
24 reply: oneshot::Sender<Result<(), CamelError>>,
25 },
26 RestartRoute {
27 route_id: String,
28 reply: oneshot::Sender<Result<(), CamelError>>,
29 },
30 SuspendRoute {
31 route_id: String,
32 reply: oneshot::Sender<Result<(), CamelError>>,
33 },
34 ResumeRoute {
35 route_id: String,
36 reply: oneshot::Sender<Result<(), CamelError>>,
37 },
38 StartAllRoutes {
39 reply: oneshot::Sender<Result<(), CamelError>>,
40 },
41 StopAllRoutes {
42 reply: oneshot::Sender<Result<(), CamelError>>,
43 },
44 AddRoute {
45 definition: RouteDefinition,
46 reply: oneshot::Sender<Result<(), CamelError>>,
47 },
48 RemoveRoute {
49 route_id: String,
50 reply: oneshot::Sender<Result<(), CamelError>>,
51 },
52 SwapPipeline {
53 route_id: String,
54 pipeline: BoxProcessor,
55 reply: oneshot::Sender<Result<(), CamelError>>,
56 },
57 CompileRouteDefinition {
58 definition: RouteDefinition,
59 reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
60 },
61 CompileRouteDefinitionWithGeneration {
62 definition: RouteDefinition,
63 generation: u64,
64 reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
65 },
66 PrepareRouteDefinitionWithGeneration {
67 definition: RouteDefinition,
68 generation: u64,
69 reply: oneshot::Sender<Result<PreparedRoute, CamelError>>,
70 },
71 InsertPreparedRoute {
72 prepared: PreparedRoute,
73 reply: oneshot::Sender<Result<(), CamelError>>,
74 },
75 RemoveRoutePreservingFunctions {
76 route_id: String,
77 reply: oneshot::Sender<Result<(), CamelError>>,
78 },
79 RouteFromUri {
80 route_id: String,
81 reply: oneshot::Sender<Option<String>>,
82 },
83 SetErrorHandler {
84 config: ErrorHandlerConfig,
85 },
86 SetTracerConfig {
87 config: TracerConfig,
88 },
89 RouteCount {
90 reply: oneshot::Sender<usize>,
91 },
92 InFlightCount {
93 route_id: String,
94 reply: oneshot::Sender<Option<u64>>,
95 },
96 RouteExists {
97 route_id: String,
98 reply: oneshot::Sender<bool>,
99 },
100 RouteIds {
101 reply: oneshot::Sender<Vec<String>>,
102 },
103 AutoStartupRouteIds {
104 reply: oneshot::Sender<Vec<String>>,
105 },
106 ShutdownRouteIds {
107 reply: oneshot::Sender<Vec<String>>,
108 },
109 GetPipeline {
110 route_id: String,
111 reply: oneshot::Sender<Option<BoxProcessor>>,
112 },
113 StartRouteReload {
114 route_id: String,
115 reply: oneshot::Sender<Result<(), CamelError>>,
116 },
117 StopRouteReload {
118 route_id: String,
119 reply: oneshot::Sender<Result<(), CamelError>>,
120 },
121 SetRuntimeHandle {
122 runtime: Arc<dyn RuntimeHandle>,
123 },
124 SetFunctionInvoker {
125 invoker: Arc<dyn FunctionInvoker>,
126 },
127 RouteSourceHash {
128 route_id: String,
129 reply: oneshot::Sender<Option<u64>>,
130 },
131 Shutdown,
132}
133
134#[derive(Clone)]
135pub struct RouteControllerHandle {
136 tx: mpsc::Sender<RouteControllerCommand>,
137}
138
139impl RouteControllerHandle {
140 pub async fn start_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
141 let (reply_tx, reply_rx) = oneshot::channel();
142 self.tx
143 .send(RouteControllerCommand::StartRoute {
144 route_id: route_id.into(),
145 reply: reply_tx,
146 })
147 .await
148 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
149 reply_rx
150 .await
151 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
152 }
153
154 pub async fn stop_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
155 let (reply_tx, reply_rx) = oneshot::channel();
156 self.tx
157 .send(RouteControllerCommand::StopRoute {
158 route_id: route_id.into(),
159 reply: reply_tx,
160 })
161 .await
162 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
163 reply_rx
164 .await
165 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
166 }
167
168 pub async fn restart_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
169 let (reply_tx, reply_rx) = oneshot::channel();
170 self.tx
171 .send(RouteControllerCommand::RestartRoute {
172 route_id: route_id.into(),
173 reply: reply_tx,
174 })
175 .await
176 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
177 reply_rx
178 .await
179 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
180 }
181
182 pub async fn suspend_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
183 let (reply_tx, reply_rx) = oneshot::channel();
184 self.tx
185 .send(RouteControllerCommand::SuspendRoute {
186 route_id: route_id.into(),
187 reply: reply_tx,
188 })
189 .await
190 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
191 reply_rx
192 .await
193 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
194 }
195
196 pub async fn resume_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
197 let (reply_tx, reply_rx) = oneshot::channel();
198 self.tx
199 .send(RouteControllerCommand::ResumeRoute {
200 route_id: route_id.into(),
201 reply: reply_tx,
202 })
203 .await
204 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
205 reply_rx
206 .await
207 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
208 }
209
210 pub async fn start_all_routes(&self) -> Result<(), CamelError> {
211 let (reply_tx, reply_rx) = oneshot::channel();
212 self.tx
213 .send(RouteControllerCommand::StartAllRoutes { reply: reply_tx })
214 .await
215 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
216 reply_rx
217 .await
218 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
219 }
220
221 pub async fn stop_all_routes(&self) -> Result<(), CamelError> {
222 let (reply_tx, reply_rx) = oneshot::channel();
223 self.tx
224 .send(RouteControllerCommand::StopAllRoutes { reply: reply_tx })
225 .await
226 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
227 reply_rx
228 .await
229 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
230 }
231
232 pub async fn add_route(&self, definition: RouteDefinition) -> Result<(), CamelError> {
233 let (reply_tx, reply_rx) = oneshot::channel();
234 self.tx
235 .send(RouteControllerCommand::AddRoute {
236 definition,
237 reply: reply_tx,
238 })
239 .await
240 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
241 reply_rx
242 .await
243 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
244 }
245
246 pub async fn remove_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
247 let (reply_tx, reply_rx) = oneshot::channel();
248 self.tx
249 .send(RouteControllerCommand::RemoveRoute {
250 route_id: route_id.into(),
251 reply: reply_tx,
252 })
253 .await
254 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
255 reply_rx
256 .await
257 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
258 }
259
260 pub async fn swap_pipeline(
261 &self,
262 route_id: impl Into<String>,
263 pipeline: BoxProcessor,
264 ) -> Result<(), CamelError> {
265 let (reply_tx, reply_rx) = oneshot::channel();
266 self.tx
267 .send(RouteControllerCommand::SwapPipeline {
268 route_id: route_id.into(),
269 pipeline,
270 reply: reply_tx,
271 })
272 .await
273 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
274 reply_rx
275 .await
276 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
277 }
278
279 pub async fn compile_route_definition(
280 &self,
281 definition: RouteDefinition,
282 ) -> Result<BoxProcessor, CamelError> {
283 let (reply_tx, reply_rx) = oneshot::channel();
284 self.tx
285 .send(RouteControllerCommand::CompileRouteDefinition {
286 definition,
287 reply: reply_tx,
288 })
289 .await
290 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
291 reply_rx
292 .await
293 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
294 }
295
296 pub async fn compile_route_definition_with_generation(
297 &self,
298 definition: RouteDefinition,
299 generation: u64,
300 ) -> Result<BoxProcessor, CamelError> {
301 let (reply_tx, reply_rx) = oneshot::channel();
302 self.tx
303 .send(
304 RouteControllerCommand::CompileRouteDefinitionWithGeneration {
305 definition,
306 generation,
307 reply: reply_tx,
308 },
309 )
310 .await
311 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
312 reply_rx
313 .await
314 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
315 }
316
317 pub(crate) async fn prepare_route_definition_with_generation(
318 &self,
319 definition: RouteDefinition,
320 generation: u64,
321 ) -> Result<PreparedRoute, CamelError> {
322 let (reply_tx, reply_rx) = oneshot::channel();
323 self.tx
324 .send(
325 RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
326 definition,
327 generation,
328 reply: reply_tx,
329 },
330 )
331 .await
332 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
333 reply_rx
334 .await
335 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
336 }
337
338 pub(crate) async fn insert_prepared_route(
339 &self,
340 prepared: PreparedRoute,
341 ) -> Result<(), CamelError> {
342 let (reply_tx, reply_rx) = oneshot::channel();
343 self.tx
344 .send(RouteControllerCommand::InsertPreparedRoute {
345 prepared,
346 reply: reply_tx,
347 })
348 .await
349 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
350 reply_rx
351 .await
352 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
353 }
354
355 pub async fn remove_route_preserving_functions(
356 &self,
357 route_id: String,
358 ) -> Result<(), CamelError> {
359 let (reply_tx, reply_rx) = oneshot::channel();
360 self.tx
361 .send(RouteControllerCommand::RemoveRoutePreservingFunctions {
362 route_id,
363 reply: reply_tx,
364 })
365 .await
366 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
367 reply_rx
368 .await
369 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
370 }
371
372 pub async fn route_from_uri(
373 &self,
374 route_id: impl Into<String>,
375 ) -> Result<Option<String>, CamelError> {
376 let (reply_tx, reply_rx) = oneshot::channel();
377 self.tx
378 .send(RouteControllerCommand::RouteFromUri {
379 route_id: route_id.into(),
380 reply: reply_tx,
381 })
382 .await
383 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
384 reply_rx
385 .await
386 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
387 }
388
389 pub async fn route_count(&self) -> Result<usize, CamelError> {
390 let (reply_tx, reply_rx) = oneshot::channel();
391 self.tx
392 .send(RouteControllerCommand::RouteCount { reply: reply_tx })
393 .await
394 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
395 reply_rx
396 .await
397 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
398 }
399
400 pub async fn in_flight_count(
401 &self,
402 route_id: impl Into<String>,
403 ) -> Result<Option<u64>, CamelError> {
404 let (reply_tx, reply_rx) = oneshot::channel();
405 self.tx
406 .send(RouteControllerCommand::InFlightCount {
407 route_id: route_id.into(),
408 reply: reply_tx,
409 })
410 .await
411 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
412 reply_rx
413 .await
414 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
415 }
416
417 pub async fn route_exists(&self, route_id: impl Into<String>) -> Result<bool, CamelError> {
418 let (reply_tx, reply_rx) = oneshot::channel();
419 self.tx
420 .send(RouteControllerCommand::RouteExists {
421 route_id: route_id.into(),
422 reply: reply_tx,
423 })
424 .await
425 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
426 reply_rx
427 .await
428 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
429 }
430
431 pub async fn route_ids(&self) -> Result<Vec<String>, CamelError> {
432 let (reply_tx, reply_rx) = oneshot::channel();
433 self.tx
434 .send(RouteControllerCommand::RouteIds { reply: reply_tx })
435 .await
436 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
437 reply_rx
438 .await
439 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
440 }
441
442 pub async fn auto_startup_route_ids(&self) -> Result<Vec<String>, CamelError> {
443 let (reply_tx, reply_rx) = oneshot::channel();
444 self.tx
445 .send(RouteControllerCommand::AutoStartupRouteIds { reply: reply_tx })
446 .await
447 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
448 reply_rx
449 .await
450 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
451 }
452
453 pub async fn shutdown_route_ids(&self) -> Result<Vec<String>, CamelError> {
454 let (reply_tx, reply_rx) = oneshot::channel();
455 self.tx
456 .send(RouteControllerCommand::ShutdownRouteIds { reply: reply_tx })
457 .await
458 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
459 reply_rx
460 .await
461 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
462 }
463
464 pub async fn start_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
465 let (reply_tx, reply_rx) = oneshot::channel();
466 self.tx
467 .send(RouteControllerCommand::StartRouteReload {
468 route_id: route_id.into(),
469 reply: reply_tx,
470 })
471 .await
472 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
473 reply_rx
474 .await
475 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
476 }
477
478 pub async fn stop_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
479 let (reply_tx, reply_rx) = oneshot::channel();
480 self.tx
481 .send(RouteControllerCommand::StopRouteReload {
482 route_id: route_id.into(),
483 reply: reply_tx,
484 })
485 .await
486 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
487 reply_rx
488 .await
489 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
490 }
491
492 pub async fn get_pipeline(
493 &self,
494 route_id: impl Into<String>,
495 ) -> Result<Option<BoxProcessor>, CamelError> {
496 let (reply_tx, reply_rx) = oneshot::channel();
497 self.tx
498 .send(RouteControllerCommand::GetPipeline {
499 route_id: route_id.into(),
500 reply: reply_tx,
501 })
502 .await
503 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
504 reply_rx
505 .await
506 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
507 }
508
509 pub async fn set_error_handler(&self, config: ErrorHandlerConfig) -> Result<(), CamelError> {
510 self.tx
511 .send(RouteControllerCommand::SetErrorHandler { config })
512 .await
513 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
514 }
515
516 pub async fn set_tracer_config(&self, config: TracerConfig) -> Result<(), CamelError> {
517 self.tx
518 .send(RouteControllerCommand::SetTracerConfig { config })
519 .await
520 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
521 }
522
523 pub async fn set_runtime_handle(
524 &self,
525 runtime: Arc<dyn RuntimeHandle>,
526 ) -> Result<(), CamelError> {
527 self.tx
528 .send(RouteControllerCommand::SetRuntimeHandle { runtime })
529 .await
530 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
531 }
532
533 pub fn try_set_runtime_handle(
534 &self,
535 runtime: Arc<dyn RuntimeHandle>,
536 ) -> Result<(), CamelError> {
537 self.tx
538 .try_send(RouteControllerCommand::SetRuntimeHandle { runtime })
539 .map_err(|err| {
540 CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
541 })
542 }
543
544 pub async fn set_function_invoker(
545 &self,
546 invoker: Arc<dyn FunctionInvoker>,
547 ) -> Result<(), CamelError> {
548 self.tx
549 .send(RouteControllerCommand::SetFunctionInvoker { invoker })
550 .await
551 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
552 }
553
554 pub fn try_set_function_invoker(
555 &self,
556 invoker: Arc<dyn FunctionInvoker>,
557 ) -> Result<(), CamelError> {
558 self.tx
559 .try_send(RouteControllerCommand::SetFunctionInvoker { invoker })
560 .map_err(|err| {
561 CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
562 })
563 }
564
565 pub async fn route_source_hash(&self, route_id: impl Into<String>) -> Option<u64> {
566 let (reply_tx, reply_rx) = oneshot::channel();
567 self.tx
568 .send(RouteControllerCommand::RouteSourceHash {
569 route_id: route_id.into(),
570 reply: reply_tx,
571 })
572 .await
573 .ok()?;
574 reply_rx.await.ok()?
575 }
576
577 pub async fn shutdown(&self) -> Result<(), CamelError> {
578 self.tx
579 .send(RouteControllerCommand::Shutdown)
580 .await
581 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
582 }
583}
584
585pub fn spawn_controller_actor(
586 controller: DefaultRouteController,
587) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
588 let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
589 let handle = tokio::spawn(async move {
590 let mut controller = controller;
591 while let Some(cmd) = rx.recv().await {
592 match cmd {
593 RouteControllerCommand::StartRoute { route_id, reply } => {
594 let _ = reply.send(controller.start_route(&route_id).await);
595 }
596 RouteControllerCommand::StopRoute { route_id, reply } => {
597 let _ = reply.send(controller.stop_route(&route_id).await);
598 }
599 RouteControllerCommand::RestartRoute { route_id, reply } => {
600 let _ = reply.send(controller.restart_route(&route_id).await);
601 }
602 RouteControllerCommand::SuspendRoute { route_id, reply } => {
603 let _ = reply.send(controller.suspend_route(&route_id).await);
604 }
605 RouteControllerCommand::ResumeRoute { route_id, reply } => {
606 let _ = reply.send(controller.resume_route(&route_id).await);
607 }
608 RouteControllerCommand::StartAllRoutes { reply } => {
609 let _ = reply.send(controller.start_all_routes().await);
610 }
611 RouteControllerCommand::StopAllRoutes { reply } => {
612 let _ = reply.send(controller.stop_all_routes().await);
613 }
614 RouteControllerCommand::AddRoute { definition, reply } => {
615 let _ = reply.send(controller.add_route(definition).await);
616 }
617 RouteControllerCommand::RemoveRoute { route_id, reply } => {
618 let _ = reply.send(controller.remove_route(&route_id).await);
619 }
620 RouteControllerCommand::SwapPipeline {
621 route_id,
622 pipeline,
623 reply,
624 } => {
625 let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
626 }
627 RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
628 let _ = reply.send(controller.compile_route_definition(definition));
629 }
630 RouteControllerCommand::CompileRouteDefinitionWithGeneration {
631 definition,
632 generation,
633 reply,
634 } => {
635 let _ = reply.send(
636 controller.compile_route_definition_with_generation(definition, generation),
637 );
638 }
639 RouteControllerCommand::PrepareRouteDefinitionWithGeneration {
640 definition,
641 generation,
642 reply,
643 } => {
644 let _ = reply.send(
645 controller.prepare_route_definition_with_generation(definition, generation),
646 );
647 }
648 RouteControllerCommand::InsertPreparedRoute { prepared, reply } => {
649 let _ = reply.send(controller.insert_prepared_route(prepared));
650 }
651 RouteControllerCommand::RemoveRoutePreservingFunctions { route_id, reply } => {
652 let _ = reply.send(
653 controller
654 .remove_route_preserving_functions(&route_id)
655 .await,
656 );
657 }
658 RouteControllerCommand::RouteFromUri { route_id, reply } => {
659 let _ = reply.send(controller.route_from_uri(&route_id));
660 }
661 RouteControllerCommand::SetErrorHandler { config } => {
662 controller.set_error_handler(config);
663 }
664 RouteControllerCommand::SetTracerConfig { config } => {
665 controller.set_tracer_config(&config);
666 }
667 RouteControllerCommand::RouteCount { reply } => {
668 let _ = reply.send(controller.route_count());
669 }
670 RouteControllerCommand::InFlightCount { route_id, reply } => {
671 let _ = reply.send(controller.in_flight_count(&route_id));
672 }
673 RouteControllerCommand::RouteExists { route_id, reply } => {
674 let _ = reply.send(controller.route_exists(&route_id));
675 }
676 RouteControllerCommand::RouteIds { reply } => {
677 let _ = reply.send(controller.route_ids());
678 }
679 RouteControllerCommand::AutoStartupRouteIds { reply } => {
680 let _ = reply.send(controller.auto_startup_route_ids());
681 }
682 RouteControllerCommand::ShutdownRouteIds { reply } => {
683 let _ = reply.send(controller.shutdown_route_ids());
684 }
685 RouteControllerCommand::GetPipeline { route_id, reply } => {
686 let _ = reply.send(controller.get_pipeline(&route_id));
687 }
688 RouteControllerCommand::StartRouteReload { route_id, reply } => {
689 let _ = reply.send(controller.start_route_reload(&route_id).await);
690 }
691 RouteControllerCommand::StopRouteReload { route_id, reply } => {
692 let _ = reply.send(controller.stop_route_reload(&route_id).await);
693 }
694 RouteControllerCommand::SetRuntimeHandle { runtime } => {
695 controller.set_runtime_handle(runtime);
696 }
697 RouteControllerCommand::SetFunctionInvoker { invoker } => {
698 controller.set_function_invoker(invoker);
699 }
700 RouteControllerCommand::RouteSourceHash { route_id, reply } => {
701 let _ = reply.send(controller.route_source_hash(&route_id));
702 }
703 RouteControllerCommand::Shutdown => {
704 break;
705 }
706 }
707 }
708 });
709 (RouteControllerHandle { tx }, handle)
710}
711
712pub fn spawn_supervision_task(
713 controller: RouteControllerHandle,
714 config: SupervisionConfig,
715 _metrics: Option<Arc<dyn MetricsCollector>>,
716 mut crash_rx: mpsc::Receiver<CrashNotification>,
717) -> JoinHandle<()> {
718 tokio::spawn(async move {
719 let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
720 let mut last_restart_time: std::collections::HashMap<String, Instant> =
721 std::collections::HashMap::new();
722 let mut currently_restarting: std::collections::HashSet<String> =
723 std::collections::HashSet::new();
724
725 info!("Supervision loop started");
726
727 while let Some(notification) = crash_rx.recv().await {
728 let route_id = notification.route_id;
729 if currently_restarting.contains(&route_id) {
730 continue;
731 }
732
733 if let Some(last_time) = last_restart_time.get(&route_id)
734 && last_time.elapsed() >= config.initial_delay
735 {
736 attempts.insert(route_id.clone(), 0);
737 }
738
739 let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
740 *current_attempt += 1;
741
742 if config
743 .max_attempts
744 .is_some_and(|max| *current_attempt > max)
745 {
746 error!(
747 route_id = %route_id,
748 attempts = *current_attempt,
749 "Route exceeded max restart attempts, giving up"
750 );
751 continue;
752 }
753
754 let delay = config.next_delay(*current_attempt);
755 currently_restarting.insert(route_id.clone());
756 tokio::time::sleep(delay).await;
757
758 match controller.restart_route(route_id.clone()).await {
759 Ok(()) => {
760 info!(route_id = %route_id, "Route restarted successfully");
761 last_restart_time.insert(route_id.clone(), Instant::now());
762 }
763 Err(err) => {
764 error!(route_id = %route_id, error = %err, "Failed to restart route");
765 }
766 }
767
768 currently_restarting.remove(&route_id);
769 }
770
771 info!("Supervision loop ended");
772 })
773}
774
775#[cfg(test)]
776mod tests {
777 use super::{
778 RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
779 spawn_supervision_task,
780 };
781 use crate::lifecycle::adapters::route_controller::{CrashNotification, DefaultRouteController};
782 use crate::lifecycle::application::route_definition::RouteDefinition;
783 use crate::shared::components::domain::Registry;
784 use crate::shared::observability::domain::TracerConfig;
785 use camel_api::function::PrepareToken;
786 use camel_api::{
787 CamelError, ErrorHandlerConfig, Exchange, ExchangePatch, FunctionDefinition, FunctionDiff,
788 FunctionId, FunctionInvocationError, FunctionInvoker, FunctionInvokerSync, RuntimeCommand,
789 RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult,
790 SupervisionConfig,
791 };
792 use std::sync::Arc;
793 use std::time::Duration;
794 use tokio::sync::mpsc;
795 use tokio::time::sleep;
796
797 fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
798 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
799 {
800 let mut guard = registry.lock().expect("lock");
801 guard.register(std::sync::Arc::new(
802 camel_component_timer::TimerComponent::new(),
803 ));
804 guard.register(std::sync::Arc::new(
805 camel_component_mock::MockComponent::new(),
806 ));
807 }
808 let controller = DefaultRouteController::new(
809 Arc::clone(®istry),
810 Arc::new(camel_api::NoopPlatformService::default()),
811 );
812 spawn_controller_actor(controller)
813 }
814
815 fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
816 let controller = DefaultRouteController::new(
817 Arc::new(std::sync::Mutex::new(Registry::new())),
818 Arc::new(camel_api::NoopPlatformService::default()),
819 );
820 spawn_controller_actor(controller)
821 }
822
823 fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
824 RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
825 }
826
827 struct NoopRuntime;
828 struct NoopInvoker;
829
830 #[async_trait::async_trait]
831 impl RuntimeCommandBus for NoopRuntime {
832 async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
833 Ok(RuntimeCommandResult::Accepted)
834 }
835 }
836
837 #[async_trait::async_trait]
838 impl RuntimeQueryBus for NoopRuntime {
839 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
840 Ok(match query {
841 RuntimeQuery::GetRouteStatus { route_id }
842 | RuntimeQuery::InFlightCount { route_id } => {
843 RuntimeQueryResult::RouteNotFound { route_id }
844 }
845 RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
846 route_ids: Vec::new(),
847 },
848 })
849 }
850 }
851
852 impl FunctionInvokerSync for NoopInvoker {
853 fn stage_pending(
854 &self,
855 _def: FunctionDefinition,
856 _route_id: Option<&str>,
857 _generation: u64,
858 ) {
859 }
860 fn discard_staging(&self, _generation: u64) {}
861 fn begin_reload(&self) -> u64 {
862 1
863 }
864 fn function_refs_for_route(&self, _route_id: &str) -> Vec<(FunctionId, Option<String>)> {
865 vec![]
866 }
867 fn staged_refs_for_route(
868 &self,
869 _route_id: &str,
870 _generation: u64,
871 ) -> Vec<(FunctionId, Option<String>)> {
872 vec![]
873 }
874 fn staged_defs_for_route(
875 &self,
876 _route_id: &str,
877 _generation: u64,
878 ) -> Vec<(FunctionDefinition, Option<String>)> {
879 vec![]
880 }
881 }
882
883 #[async_trait::async_trait]
884 impl FunctionInvoker for NoopInvoker {
885 async fn register(
886 &self,
887 _def: FunctionDefinition,
888 _route_id: Option<&str>,
889 ) -> Result<(), FunctionInvocationError> {
890 Ok(())
891 }
892 async fn unregister(
893 &self,
894 _id: &FunctionId,
895 _route_id: Option<&str>,
896 ) -> Result<(), FunctionInvocationError> {
897 Ok(())
898 }
899 async fn invoke(
900 &self,
901 _id: &FunctionId,
902 _exchange: &Exchange,
903 ) -> Result<ExchangePatch, FunctionInvocationError> {
904 Ok(ExchangePatch::default())
905 }
906 async fn prepare_reload(
907 &self,
908 _diff: FunctionDiff,
909 _generation: u64,
910 ) -> Result<PrepareToken, FunctionInvocationError> {
911 Ok(PrepareToken::default())
912 }
913 async fn finalize_reload(
914 &self,
915 _diff: &FunctionDiff,
916 _generation: u64,
917 ) -> Result<(), FunctionInvocationError> {
918 Ok(())
919 }
920 async fn rollback_reload(
921 &self,
922 _token: PrepareToken,
923 _generation: u64,
924 ) -> Result<(), FunctionInvocationError> {
925 Ok(())
926 }
927 async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
928 Ok(())
929 }
930 }
931
932 #[tokio::test]
933 async fn start_route_sends_command_and_returns_reply() {
934 let (tx, mut rx) = mpsc::channel(1);
935 let handle = RouteControllerHandle { tx };
936
937 let task = tokio::spawn(async move { handle.start_route("route-a").await });
938
939 let command = rx.recv().await.expect("command should be received");
940 match command {
941 RouteControllerCommand::StartRoute { route_id, reply } => {
942 assert_eq!(route_id, "route-a");
943 let _ = reply.send(Ok(()));
944 }
945 _ => panic!("unexpected command variant"),
946 }
947
948 let result = task.await.expect("join should succeed");
949 assert!(result.is_ok());
950 }
951
952 #[tokio::test]
953 async fn start_route_returns_error_when_actor_stops() {
954 let (tx, rx) = mpsc::channel(1);
955 drop(rx);
956
957 let handle = RouteControllerHandle { tx };
958 let result = handle.start_route("route-a").await;
959
960 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
961 }
962
963 #[tokio::test]
964 async fn spawn_controller_actor_processes_commands_and_shutdown() {
965 let controller = DefaultRouteController::new(
966 Arc::new(std::sync::Mutex::new(Registry::new())),
967 Arc::new(camel_api::NoopPlatformService::default()),
968 );
969 let (handle, join_handle) = spawn_controller_actor(controller);
970
971 assert_eq!(handle.route_count().await.expect("route_count"), 0);
972 assert_eq!(
973 handle.route_ids().await.expect("route_ids"),
974 Vec::<String>::new()
975 );
976
977 handle.shutdown().await.expect("shutdown send");
978 join_handle.await.expect("actor join");
979 }
980
981 #[tokio::test]
982 async fn actor_handle_introspection_and_mutation_commands() {
983 let (handle, join_handle) = build_actor_with_components();
984 let definition = route_def("h-1", "timer:tick?period=100");
985
986 handle.add_route(definition).await.expect("add route");
987 assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
988 assert!(
989 !handle
990 .route_exists("no-such")
991 .await
992 .expect("route exists no-such")
993 );
994
995 let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
996 assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
997 assert_eq!(handle.route_count().await.expect("route_count"), 1);
998
999 let auto_ids = handle
1000 .auto_startup_route_ids()
1001 .await
1002 .expect("auto_startup_route_ids");
1003 assert!(auto_ids.iter().any(|id| id == "h-1"));
1004
1005 let shutdown_ids = handle
1006 .shutdown_route_ids()
1007 .await
1008 .expect("shutdown_route_ids");
1009 assert!(shutdown_ids.iter().any(|id| id == "h-1"));
1010
1011 let compiled = handle
1012 .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
1013 .await
1014 .expect("compile_route_definition");
1015
1016 assert!(
1017 handle
1018 .get_pipeline("h-1")
1019 .await
1020 .expect("get_pipeline")
1021 .is_some()
1022 );
1023 handle
1024 .swap_pipeline("h-1", compiled)
1025 .await
1026 .expect("swap_pipeline");
1027
1028 let _ = handle
1029 .in_flight_count("h-1")
1030 .await
1031 .expect("in_flight_count");
1032 let _ = handle.route_source_hash("h-1").await;
1033
1034 handle
1035 .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
1036 .await
1037 .expect("set_error_handler");
1038 handle
1039 .set_tracer_config(TracerConfig::default())
1040 .await
1041 .expect("set_tracer_config");
1042 handle
1043 .set_runtime_handle(Arc::new(NoopRuntime))
1044 .await
1045 .expect("set_runtime_handle");
1046
1047 handle.remove_route("h-1").await.expect("remove_route");
1048 assert_eq!(
1049 handle
1050 .route_count()
1051 .await
1052 .expect("route_count after remove"),
1053 0
1054 );
1055 handle
1056 .stop_all_routes()
1057 .await
1058 .expect("stop_all_routes on empty");
1059
1060 handle.shutdown().await.expect("shutdown send");
1061 join_handle.await.expect("actor join");
1062 }
1063
1064 #[tokio::test]
1065 async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
1066 let (handle, join_handle) = build_actor_with_components();
1067 handle
1068 .add_route(route_def("lc-1", "timer:tick?period=50"))
1069 .await
1070 .expect("add route lc-1");
1071
1072 handle.start_route("lc-1").await.expect("start_route");
1073 sleep(Duration::from_millis(20)).await;
1074
1075 handle.restart_route("lc-1").await.expect("restart_route");
1076 sleep(Duration::from_millis(20)).await;
1077
1078 handle.suspend_route("lc-1").await.expect("suspend_route");
1079 handle.resume_route("lc-1").await.expect("resume_route");
1080 sleep(Duration::from_millis(20)).await;
1081
1082 handle.stop_route("lc-1").await.expect("stop_route");
1083 handle.start_all_routes().await.expect("start_all_routes");
1084 sleep(Duration::from_millis(20)).await;
1085 handle.stop_all_routes().await.expect("stop_all_routes");
1086
1087 handle
1088 .start_route_reload("lc-1")
1089 .await
1090 .expect("start_route_reload");
1091 handle
1092 .stop_route_reload("lc-1")
1093 .await
1094 .expect("stop_route_reload");
1095
1096 handle.shutdown().await.expect("shutdown send");
1097 join_handle.await.expect("actor join");
1098 }
1099
1100 #[tokio::test]
1101 async fn spawn_supervision_restarts_route_on_crash() {
1102 let (handle, join_handle) = build_actor_with_components();
1103 handle
1104 .add_route(route_def("sup-1", "timer:tick?period=100"))
1105 .await
1106 .expect("add route sup-1");
1107 handle
1108 .start_route("sup-1")
1109 .await
1110 .expect("start_route sup-1");
1111
1112 let (crash_tx, crash_rx) = mpsc::channel(8);
1113 let supervision = spawn_supervision_task(
1114 handle.clone(),
1115 SupervisionConfig {
1116 initial_delay: Duration::from_millis(10),
1117 max_attempts: Some(2),
1118 ..SupervisionConfig::default()
1119 },
1120 None,
1121 crash_rx,
1122 );
1123
1124 crash_tx
1125 .send(CrashNotification {
1126 route_id: "sup-1".to_string(),
1127 error: "simulated".to_string(),
1128 })
1129 .await
1130 .expect("send crash notification");
1131
1132 sleep(Duration::from_millis(150)).await;
1133 drop(crash_tx);
1134 supervision.await.expect("supervision join");
1135
1136 handle.shutdown().await.expect("shutdown send");
1137 join_handle.await.expect("actor join");
1138 }
1139
1140 #[tokio::test]
1141 async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
1142 let (handle, join_handle) = build_actor_with_components();
1143 handle
1144 .add_route(route_def("sup-2", "timer:tick?period=100"))
1145 .await
1146 .expect("add route sup-2");
1147 handle
1148 .start_route("sup-2")
1149 .await
1150 .expect("start_route sup-2");
1151
1152 let (crash_tx, crash_rx) = mpsc::channel(8);
1153 let supervision = spawn_supervision_task(
1154 handle.clone(),
1155 SupervisionConfig {
1156 initial_delay: Duration::from_millis(10),
1157 max_attempts: Some(1),
1158 ..SupervisionConfig::default()
1159 },
1160 None,
1161 crash_rx,
1162 );
1163
1164 crash_tx
1165 .send(CrashNotification {
1166 route_id: "sup-2".to_string(),
1167 error: "attempt-1".to_string(),
1168 })
1169 .await
1170 .expect("send crash attempt-1");
1171 crash_tx
1172 .send(CrashNotification {
1173 route_id: "sup-2".to_string(),
1174 error: "attempt-2".to_string(),
1175 })
1176 .await
1177 .expect("send crash attempt-2");
1178
1179 sleep(Duration::from_millis(200)).await;
1180 drop(crash_tx);
1181 supervision.await.expect("supervision join");
1182
1183 handle.shutdown().await.expect("shutdown send");
1184 join_handle.await.expect("actor join");
1185 }
1186
1187 #[tokio::test]
1188 async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
1189 let (handle, join_handle) = build_empty_actor();
1190
1191 handle
1192 .try_set_runtime_handle(Arc::new(NoopRuntime))
1193 .expect("try_set_runtime_handle should succeed");
1194
1195 handle.shutdown().await.expect("shutdown send");
1196 join_handle.await.expect("actor join");
1197 }
1198
1199 #[tokio::test]
1200 async fn shutdown_returns_error_when_actor_stopped() {
1201 let (tx, rx) = mpsc::channel(1);
1202 drop(rx);
1203
1204 let handle = RouteControllerHandle { tx };
1205 let result = handle.shutdown().await;
1206
1207 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
1208 }
1209
1210 #[tokio::test]
1211 async fn handle_methods_send_expected_commands_and_receive_replies() {
1212 let (tx, mut rx) = mpsc::channel(16);
1213 let handle = RouteControllerHandle { tx };
1214
1215 let stop_task = tokio::spawn({
1216 let h = handle.clone();
1217 async move { h.stop_route("r-1").await }
1218 });
1219 let cmd = rx.recv().await.expect("stop command");
1220 match cmd {
1221 RouteControllerCommand::StopRoute { route_id, reply } => {
1222 assert_eq!(route_id, "r-1");
1223 let _ = reply.send(Ok(()));
1224 }
1225 _ => panic!("unexpected command"),
1226 }
1227 assert!(stop_task.await.expect("join").is_ok());
1228
1229 let exists_task = tokio::spawn({
1230 let h = handle.clone();
1231 async move { h.route_exists("r-2").await }
1232 });
1233 let cmd = rx.recv().await.expect("exists command");
1234 match cmd {
1235 RouteControllerCommand::RouteExists { route_id, reply } => {
1236 assert_eq!(route_id, "r-2");
1237 let _ = reply.send(true);
1238 }
1239 _ => panic!("unexpected command"),
1240 }
1241 assert_eq!(exists_task.await.expect("join").expect("ok"), true);
1242
1243 let hash_task = tokio::spawn({
1244 let h = handle.clone();
1245 async move { h.route_source_hash("r-3").await }
1246 });
1247 let cmd = rx.recv().await.expect("hash command");
1248 match cmd {
1249 RouteControllerCommand::RouteSourceHash { route_id, reply } => {
1250 assert_eq!(route_id, "r-3");
1251 let _ = reply.send(Some(77));
1252 }
1253 _ => panic!("unexpected command"),
1254 }
1255 assert_eq!(hash_task.await.expect("join"), Some(77));
1256 }
1257
1258 #[tokio::test]
1259 async fn handle_methods_error_on_dropped_reply_channel() {
1260 let (tx, mut rx) = mpsc::channel(16);
1261 let handle = RouteControllerHandle { tx };
1262
1263 let count_task = tokio::spawn({
1264 let h = handle.clone();
1265 async move { h.route_count().await }
1266 });
1267 let cmd = rx.recv().await.expect("route_count command");
1268 match cmd {
1269 RouteControllerCommand::RouteCount { reply } => drop(reply),
1270 _ => panic!("unexpected command"),
1271 }
1272 assert!(matches!(
1273 count_task.await.expect("join"),
1274 Err(CamelError::ProcessorError(_))
1275 ));
1276
1277 let stop_task = tokio::spawn({
1278 let h = handle.clone();
1279 async move { h.stop_route("x").await }
1280 });
1281 let cmd = rx.recv().await.expect("stop command");
1282 match cmd {
1283 RouteControllerCommand::StopRoute { reply, .. } => drop(reply),
1284 _ => panic!("unexpected command"),
1285 }
1286 assert!(matches!(
1287 stop_task.await.expect("join"),
1288 Err(CamelError::ProcessorError(_))
1289 ));
1290
1291 let maybe_hash = tokio::spawn({
1292 let h = handle.clone();
1293 async move { h.route_source_hash("x").await }
1294 });
1295 let cmd = rx.recv().await.expect("hash command");
1296 match cmd {
1297 RouteControllerCommand::RouteSourceHash { reply, .. } => drop(reply),
1298 _ => panic!("unexpected command"),
1299 }
1300 assert_eq!(maybe_hash.await.expect("join"), None);
1301 }
1302
1303 #[test]
1304 fn try_set_function_invoker_returns_mailbox_full() {
1305 let (tx, mut rx) = mpsc::channel(1);
1306 tx.try_send(RouteControllerCommand::Shutdown)
1307 .expect("fill mailbox");
1308 let handle = RouteControllerHandle { tx };
1309
1310 let result = handle.try_set_function_invoker(Arc::new(NoopInvoker));
1311 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
1312
1313 rx.try_recv().expect("mailbox still has first message");
1314 }
1315}