1use std::sync::Arc;
2use std::time::Instant;
3
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::{
6 BoxProcessor, CamelError, MetricsCollector, RouteController, RuntimeHandle, SupervisionConfig,
7};
8use tokio::sync::{mpsc, oneshot};
9use tokio::task::JoinHandle;
10use tracing::{error, info};
11
12use super::route_controller::{CrashNotification, DefaultRouteController};
13use crate::lifecycle::application::route_definition::RouteDefinition;
14use crate::shared::observability::domain::TracerConfig;
15
16pub(crate) enum RouteControllerCommand {
17 StartRoute {
18 route_id: String,
19 reply: oneshot::Sender<Result<(), CamelError>>,
20 },
21 StopRoute {
22 route_id: String,
23 reply: oneshot::Sender<Result<(), CamelError>>,
24 },
25 RestartRoute {
26 route_id: String,
27 reply: oneshot::Sender<Result<(), CamelError>>,
28 },
29 SuspendRoute {
30 route_id: String,
31 reply: oneshot::Sender<Result<(), CamelError>>,
32 },
33 ResumeRoute {
34 route_id: String,
35 reply: oneshot::Sender<Result<(), CamelError>>,
36 },
37 StartAllRoutes {
38 reply: oneshot::Sender<Result<(), CamelError>>,
39 },
40 StopAllRoutes {
41 reply: oneshot::Sender<Result<(), CamelError>>,
42 },
43 AddRoute {
44 definition: RouteDefinition,
45 reply: oneshot::Sender<Result<(), CamelError>>,
46 },
47 RemoveRoute {
48 route_id: String,
49 reply: oneshot::Sender<Result<(), CamelError>>,
50 },
51 SwapPipeline {
52 route_id: String,
53 pipeline: BoxProcessor,
54 reply: oneshot::Sender<Result<(), CamelError>>,
55 },
56 CompileRouteDefinition {
57 definition: RouteDefinition,
58 reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
59 },
60 RouteFromUri {
61 route_id: String,
62 reply: oneshot::Sender<Option<String>>,
63 },
64 SetErrorHandler {
65 config: ErrorHandlerConfig,
66 },
67 SetTracerConfig {
68 config: TracerConfig,
69 },
70 RouteCount {
71 reply: oneshot::Sender<usize>,
72 },
73 InFlightCount {
74 route_id: String,
75 reply: oneshot::Sender<Option<u64>>,
76 },
77 RouteExists {
78 route_id: String,
79 reply: oneshot::Sender<bool>,
80 },
81 RouteIds {
82 reply: oneshot::Sender<Vec<String>>,
83 },
84 AutoStartupRouteIds {
85 reply: oneshot::Sender<Vec<String>>,
86 },
87 ShutdownRouteIds {
88 reply: oneshot::Sender<Vec<String>>,
89 },
90 GetPipeline {
91 route_id: String,
92 reply: oneshot::Sender<Option<BoxProcessor>>,
93 },
94 StartRouteReload {
95 route_id: String,
96 reply: oneshot::Sender<Result<(), CamelError>>,
97 },
98 StopRouteReload {
99 route_id: String,
100 reply: oneshot::Sender<Result<(), CamelError>>,
101 },
102 SetRuntimeHandle {
103 runtime: Arc<dyn RuntimeHandle>,
104 },
105 RouteSourceHash {
106 route_id: String,
107 reply: oneshot::Sender<Option<u64>>,
108 },
109 Shutdown,
110}
111
112#[derive(Clone)]
113pub struct RouteControllerHandle {
114 tx: mpsc::Sender<RouteControllerCommand>,
115}
116
117impl RouteControllerHandle {
118 pub async fn start_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
119 let (reply_tx, reply_rx) = oneshot::channel();
120 self.tx
121 .send(RouteControllerCommand::StartRoute {
122 route_id: route_id.into(),
123 reply: reply_tx,
124 })
125 .await
126 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
127 reply_rx
128 .await
129 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
130 }
131
132 pub async fn stop_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
133 let (reply_tx, reply_rx) = oneshot::channel();
134 self.tx
135 .send(RouteControllerCommand::StopRoute {
136 route_id: route_id.into(),
137 reply: reply_tx,
138 })
139 .await
140 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
141 reply_rx
142 .await
143 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
144 }
145
146 pub async fn restart_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
147 let (reply_tx, reply_rx) = oneshot::channel();
148 self.tx
149 .send(RouteControllerCommand::RestartRoute {
150 route_id: route_id.into(),
151 reply: reply_tx,
152 })
153 .await
154 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
155 reply_rx
156 .await
157 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
158 }
159
160 pub async fn suspend_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
161 let (reply_tx, reply_rx) = oneshot::channel();
162 self.tx
163 .send(RouteControllerCommand::SuspendRoute {
164 route_id: route_id.into(),
165 reply: reply_tx,
166 })
167 .await
168 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
169 reply_rx
170 .await
171 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
172 }
173
174 pub async fn resume_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
175 let (reply_tx, reply_rx) = oneshot::channel();
176 self.tx
177 .send(RouteControllerCommand::ResumeRoute {
178 route_id: route_id.into(),
179 reply: reply_tx,
180 })
181 .await
182 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
183 reply_rx
184 .await
185 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
186 }
187
188 pub async fn start_all_routes(&self) -> Result<(), CamelError> {
189 let (reply_tx, reply_rx) = oneshot::channel();
190 self.tx
191 .send(RouteControllerCommand::StartAllRoutes { reply: reply_tx })
192 .await
193 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
194 reply_rx
195 .await
196 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
197 }
198
199 pub async fn stop_all_routes(&self) -> Result<(), CamelError> {
200 let (reply_tx, reply_rx) = oneshot::channel();
201 self.tx
202 .send(RouteControllerCommand::StopAllRoutes { reply: reply_tx })
203 .await
204 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
205 reply_rx
206 .await
207 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
208 }
209
210 pub async fn add_route(&self, definition: RouteDefinition) -> Result<(), CamelError> {
211 let (reply_tx, reply_rx) = oneshot::channel();
212 self.tx
213 .send(RouteControllerCommand::AddRoute {
214 definition,
215 reply: reply_tx,
216 })
217 .await
218 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
219 reply_rx
220 .await
221 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
222 }
223
224 pub async fn remove_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
225 let (reply_tx, reply_rx) = oneshot::channel();
226 self.tx
227 .send(RouteControllerCommand::RemoveRoute {
228 route_id: route_id.into(),
229 reply: reply_tx,
230 })
231 .await
232 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
233 reply_rx
234 .await
235 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
236 }
237
238 pub async fn swap_pipeline(
239 &self,
240 route_id: impl Into<String>,
241 pipeline: BoxProcessor,
242 ) -> Result<(), CamelError> {
243 let (reply_tx, reply_rx) = oneshot::channel();
244 self.tx
245 .send(RouteControllerCommand::SwapPipeline {
246 route_id: route_id.into(),
247 pipeline,
248 reply: reply_tx,
249 })
250 .await
251 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
252 reply_rx
253 .await
254 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
255 }
256
257 pub async fn compile_route_definition(
258 &self,
259 definition: RouteDefinition,
260 ) -> Result<BoxProcessor, CamelError> {
261 let (reply_tx, reply_rx) = oneshot::channel();
262 self.tx
263 .send(RouteControllerCommand::CompileRouteDefinition {
264 definition,
265 reply: reply_tx,
266 })
267 .await
268 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
269 reply_rx
270 .await
271 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
272 }
273
274 pub async fn route_from_uri(
275 &self,
276 route_id: impl Into<String>,
277 ) -> Result<Option<String>, CamelError> {
278 let (reply_tx, reply_rx) = oneshot::channel();
279 self.tx
280 .send(RouteControllerCommand::RouteFromUri {
281 route_id: route_id.into(),
282 reply: reply_tx,
283 })
284 .await
285 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
286 reply_rx
287 .await
288 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
289 }
290
291 pub async fn route_count(&self) -> Result<usize, CamelError> {
292 let (reply_tx, reply_rx) = oneshot::channel();
293 self.tx
294 .send(RouteControllerCommand::RouteCount { reply: reply_tx })
295 .await
296 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
297 reply_rx
298 .await
299 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
300 }
301
302 pub async fn in_flight_count(
303 &self,
304 route_id: impl Into<String>,
305 ) -> Result<Option<u64>, CamelError> {
306 let (reply_tx, reply_rx) = oneshot::channel();
307 self.tx
308 .send(RouteControllerCommand::InFlightCount {
309 route_id: route_id.into(),
310 reply: reply_tx,
311 })
312 .await
313 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
314 reply_rx
315 .await
316 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
317 }
318
319 pub async fn route_exists(&self, route_id: impl Into<String>) -> Result<bool, CamelError> {
320 let (reply_tx, reply_rx) = oneshot::channel();
321 self.tx
322 .send(RouteControllerCommand::RouteExists {
323 route_id: route_id.into(),
324 reply: reply_tx,
325 })
326 .await
327 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
328 reply_rx
329 .await
330 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
331 }
332
333 pub async fn route_ids(&self) -> Result<Vec<String>, CamelError> {
334 let (reply_tx, reply_rx) = oneshot::channel();
335 self.tx
336 .send(RouteControllerCommand::RouteIds { reply: reply_tx })
337 .await
338 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
339 reply_rx
340 .await
341 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
342 }
343
344 pub async fn auto_startup_route_ids(&self) -> Result<Vec<String>, CamelError> {
345 let (reply_tx, reply_rx) = oneshot::channel();
346 self.tx
347 .send(RouteControllerCommand::AutoStartupRouteIds { reply: reply_tx })
348 .await
349 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
350 reply_rx
351 .await
352 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
353 }
354
355 pub async fn shutdown_route_ids(&self) -> Result<Vec<String>, CamelError> {
356 let (reply_tx, reply_rx) = oneshot::channel();
357 self.tx
358 .send(RouteControllerCommand::ShutdownRouteIds { reply: reply_tx })
359 .await
360 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
361 reply_rx
362 .await
363 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
364 }
365
366 pub async fn start_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
367 let (reply_tx, reply_rx) = oneshot::channel();
368 self.tx
369 .send(RouteControllerCommand::StartRouteReload {
370 route_id: route_id.into(),
371 reply: reply_tx,
372 })
373 .await
374 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
375 reply_rx
376 .await
377 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
378 }
379
380 pub async fn stop_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
381 let (reply_tx, reply_rx) = oneshot::channel();
382 self.tx
383 .send(RouteControllerCommand::StopRouteReload {
384 route_id: route_id.into(),
385 reply: reply_tx,
386 })
387 .await
388 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
389 reply_rx
390 .await
391 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
392 }
393
394 pub async fn get_pipeline(
395 &self,
396 route_id: impl Into<String>,
397 ) -> Result<Option<BoxProcessor>, CamelError> {
398 let (reply_tx, reply_rx) = oneshot::channel();
399 self.tx
400 .send(RouteControllerCommand::GetPipeline {
401 route_id: route_id.into(),
402 reply: reply_tx,
403 })
404 .await
405 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
406 reply_rx
407 .await
408 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
409 }
410
411 pub async fn set_error_handler(&self, config: ErrorHandlerConfig) -> Result<(), CamelError> {
412 self.tx
413 .send(RouteControllerCommand::SetErrorHandler { config })
414 .await
415 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
416 }
417
418 pub async fn set_tracer_config(&self, config: TracerConfig) -> Result<(), CamelError> {
419 self.tx
420 .send(RouteControllerCommand::SetTracerConfig { config })
421 .await
422 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
423 }
424
425 pub async fn set_runtime_handle(
426 &self,
427 runtime: Arc<dyn RuntimeHandle>,
428 ) -> Result<(), CamelError> {
429 self.tx
430 .send(RouteControllerCommand::SetRuntimeHandle { runtime })
431 .await
432 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
433 }
434
435 pub fn try_set_runtime_handle(
436 &self,
437 runtime: Arc<dyn RuntimeHandle>,
438 ) -> Result<(), CamelError> {
439 self.tx
440 .try_send(RouteControllerCommand::SetRuntimeHandle { runtime })
441 .map_err(|err| {
442 CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
443 })
444 }
445
446 pub async fn route_source_hash(&self, route_id: impl Into<String>) -> Option<u64> {
447 let (reply_tx, reply_rx) = oneshot::channel();
448 self.tx
449 .send(RouteControllerCommand::RouteSourceHash {
450 route_id: route_id.into(),
451 reply: reply_tx,
452 })
453 .await
454 .ok()?;
455 reply_rx.await.ok()?
456 }
457
458 pub async fn shutdown(&self) -> Result<(), CamelError> {
459 self.tx
460 .send(RouteControllerCommand::Shutdown)
461 .await
462 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
463 }
464}
465
466pub fn spawn_controller_actor(
467 controller: DefaultRouteController,
468) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
469 let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
470 let handle = tokio::spawn(async move {
471 let mut controller = controller;
472 while let Some(cmd) = rx.recv().await {
473 match cmd {
474 RouteControllerCommand::StartRoute { route_id, reply } => {
475 let _ = reply.send(controller.start_route(&route_id).await);
476 }
477 RouteControllerCommand::StopRoute { route_id, reply } => {
478 let _ = reply.send(controller.stop_route(&route_id).await);
479 }
480 RouteControllerCommand::RestartRoute { route_id, reply } => {
481 let _ = reply.send(controller.restart_route(&route_id).await);
482 }
483 RouteControllerCommand::SuspendRoute { route_id, reply } => {
484 let _ = reply.send(controller.suspend_route(&route_id).await);
485 }
486 RouteControllerCommand::ResumeRoute { route_id, reply } => {
487 let _ = reply.send(controller.resume_route(&route_id).await);
488 }
489 RouteControllerCommand::StartAllRoutes { reply } => {
490 let _ = reply.send(controller.start_all_routes().await);
491 }
492 RouteControllerCommand::StopAllRoutes { reply } => {
493 let _ = reply.send(controller.stop_all_routes().await);
494 }
495 RouteControllerCommand::AddRoute { definition, reply } => {
496 let _ = reply.send(controller.add_route(definition));
497 }
498 RouteControllerCommand::RemoveRoute { route_id, reply } => {
499 let _ = reply.send(controller.remove_route(&route_id));
500 }
501 RouteControllerCommand::SwapPipeline {
502 route_id,
503 pipeline,
504 reply,
505 } => {
506 let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
507 }
508 RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
509 let _ = reply.send(controller.compile_route_definition(definition));
510 }
511 RouteControllerCommand::RouteFromUri { route_id, reply } => {
512 let _ = reply.send(controller.route_from_uri(&route_id));
513 }
514 RouteControllerCommand::SetErrorHandler { config } => {
515 controller.set_error_handler(config);
516 }
517 RouteControllerCommand::SetTracerConfig { config } => {
518 controller.set_tracer_config(&config);
519 }
520 RouteControllerCommand::RouteCount { reply } => {
521 let _ = reply.send(controller.route_count());
522 }
523 RouteControllerCommand::InFlightCount { route_id, reply } => {
524 let _ = reply.send(controller.in_flight_count(&route_id));
525 }
526 RouteControllerCommand::RouteExists { route_id, reply } => {
527 let _ = reply.send(controller.route_exists(&route_id));
528 }
529 RouteControllerCommand::RouteIds { reply } => {
530 let _ = reply.send(controller.route_ids());
531 }
532 RouteControllerCommand::AutoStartupRouteIds { reply } => {
533 let _ = reply.send(controller.auto_startup_route_ids());
534 }
535 RouteControllerCommand::ShutdownRouteIds { reply } => {
536 let _ = reply.send(controller.shutdown_route_ids());
537 }
538 RouteControllerCommand::GetPipeline { route_id, reply } => {
539 let _ = reply.send(controller.get_pipeline(&route_id));
540 }
541 RouteControllerCommand::StartRouteReload { route_id, reply } => {
542 let _ = reply.send(controller.start_route_reload(&route_id).await);
543 }
544 RouteControllerCommand::StopRouteReload { route_id, reply } => {
545 let _ = reply.send(controller.stop_route_reload(&route_id).await);
546 }
547 RouteControllerCommand::SetRuntimeHandle { runtime } => {
548 controller.set_runtime_handle(runtime);
549 }
550 RouteControllerCommand::RouteSourceHash { route_id, reply } => {
551 let _ = reply.send(controller.route_source_hash(&route_id));
552 }
553 RouteControllerCommand::Shutdown => {
554 break;
555 }
556 }
557 }
558 });
559 (RouteControllerHandle { tx }, handle)
560}
561
562pub fn spawn_supervision_task(
563 controller: RouteControllerHandle,
564 config: SupervisionConfig,
565 _metrics: Option<Arc<dyn MetricsCollector>>,
566 mut crash_rx: mpsc::Receiver<CrashNotification>,
567) -> JoinHandle<()> {
568 tokio::spawn(async move {
569 let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
570 let mut last_restart_time: std::collections::HashMap<String, Instant> =
571 std::collections::HashMap::new();
572 let mut currently_restarting: std::collections::HashSet<String> =
573 std::collections::HashSet::new();
574
575 info!("Supervision loop started");
576
577 while let Some(notification) = crash_rx.recv().await {
578 let route_id = notification.route_id;
579 if currently_restarting.contains(&route_id) {
580 continue;
581 }
582
583 if let Some(last_time) = last_restart_time.get(&route_id)
584 && last_time.elapsed() >= config.initial_delay
585 {
586 attempts.insert(route_id.clone(), 0);
587 }
588
589 let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
590 *current_attempt += 1;
591
592 if config
593 .max_attempts
594 .is_some_and(|max| *current_attempt > max)
595 {
596 error!(
597 route_id = %route_id,
598 attempts = *current_attempt,
599 "Route exceeded max restart attempts, giving up"
600 );
601 continue;
602 }
603
604 let delay = config.next_delay(*current_attempt);
605 currently_restarting.insert(route_id.clone());
606 tokio::time::sleep(delay).await;
607
608 match controller.restart_route(route_id.clone()).await {
609 Ok(()) => {
610 info!(route_id = %route_id, "Route restarted successfully");
611 last_restart_time.insert(route_id.clone(), Instant::now());
612 }
613 Err(err) => {
614 error!(route_id = %route_id, error = %err, "Failed to restart route");
615 }
616 }
617
618 currently_restarting.remove(&route_id);
619 }
620
621 info!("Supervision loop ended");
622 })
623}
624
625#[cfg(test)]
626mod tests {
627 use super::{
628 RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
629 spawn_supervision_task,
630 };
631 use crate::lifecycle::adapters::route_controller::{CrashNotification, DefaultRouteController};
632 use crate::lifecycle::application::route_definition::RouteDefinition;
633 use crate::shared::components::domain::Registry;
634 use crate::shared::observability::domain::TracerConfig;
635 use camel_api::{
636 CamelError, ErrorHandlerConfig, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult,
637 RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult, SupervisionConfig,
638 };
639 use std::sync::Arc;
640 use std::time::Duration;
641 use tokio::sync::mpsc;
642 use tokio::time::sleep;
643
644 fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
645 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
646 {
647 let mut guard = registry.lock().expect("lock");
648 guard.register(std::sync::Arc::new(
649 camel_component_timer::TimerComponent::new(),
650 ));
651 guard.register(std::sync::Arc::new(
652 camel_component_mock::MockComponent::new(),
653 ));
654 }
655 let controller = DefaultRouteController::new(
656 Arc::clone(®istry),
657 Arc::new(camel_api::NoopLeaderElector),
658 );
659 spawn_controller_actor(controller)
660 }
661
662 fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
663 let controller = DefaultRouteController::new(
664 Arc::new(std::sync::Mutex::new(Registry::new())),
665 Arc::new(camel_api::NoopLeaderElector),
666 );
667 spawn_controller_actor(controller)
668 }
669
670 fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
671 RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
672 }
673
674 struct NoopRuntime;
675
676 #[async_trait::async_trait]
677 impl RuntimeCommandBus for NoopRuntime {
678 async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
679 Ok(RuntimeCommandResult::Accepted)
680 }
681 }
682
683 #[async_trait::async_trait]
684 impl RuntimeQueryBus for NoopRuntime {
685 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
686 Ok(match query {
687 RuntimeQuery::GetRouteStatus { route_id }
688 | RuntimeQuery::InFlightCount { route_id } => {
689 RuntimeQueryResult::RouteNotFound { route_id }
690 }
691 RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
692 route_ids: Vec::new(),
693 },
694 })
695 }
696 }
697
698 #[tokio::test]
699 async fn start_route_sends_command_and_returns_reply() {
700 let (tx, mut rx) = mpsc::channel(1);
701 let handle = RouteControllerHandle { tx };
702
703 let task = tokio::spawn(async move { handle.start_route("route-a").await });
704
705 let command = rx.recv().await.expect("command should be received");
706 match command {
707 RouteControllerCommand::StartRoute { route_id, reply } => {
708 assert_eq!(route_id, "route-a");
709 let _ = reply.send(Ok(()));
710 }
711 _ => panic!("unexpected command variant"),
712 }
713
714 let result = task.await.expect("join should succeed");
715 assert!(result.is_ok());
716 }
717
718 #[tokio::test]
719 async fn start_route_returns_error_when_actor_stops() {
720 let (tx, rx) = mpsc::channel(1);
721 drop(rx);
722
723 let handle = RouteControllerHandle { tx };
724 let result = handle.start_route("route-a").await;
725
726 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
727 }
728
729 #[tokio::test]
730 async fn spawn_controller_actor_processes_commands_and_shutdown() {
731 let controller = DefaultRouteController::new(
732 Arc::new(std::sync::Mutex::new(Registry::new())),
733 Arc::new(camel_api::NoopLeaderElector),
734 );
735 let (handle, join_handle) = spawn_controller_actor(controller);
736
737 assert_eq!(handle.route_count().await.expect("route_count"), 0);
738 assert_eq!(
739 handle.route_ids().await.expect("route_ids"),
740 Vec::<String>::new()
741 );
742
743 handle.shutdown().await.expect("shutdown send");
744 join_handle.await.expect("actor join");
745 }
746
747 #[tokio::test]
748 async fn actor_handle_introspection_and_mutation_commands() {
749 let (handle, join_handle) = build_actor_with_components();
750 let definition = route_def("h-1", "timer:tick?period=100");
751
752 handle.add_route(definition).await.expect("add route");
753 assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
754 assert!(
755 !handle
756 .route_exists("no-such")
757 .await
758 .expect("route exists no-such")
759 );
760
761 let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
762 assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
763 assert_eq!(handle.route_count().await.expect("route_count"), 1);
764
765 let auto_ids = handle
766 .auto_startup_route_ids()
767 .await
768 .expect("auto_startup_route_ids");
769 assert!(auto_ids.iter().any(|id| id == "h-1"));
770
771 let shutdown_ids = handle
772 .shutdown_route_ids()
773 .await
774 .expect("shutdown_route_ids");
775 assert!(shutdown_ids.iter().any(|id| id == "h-1"));
776
777 let compiled = handle
778 .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
779 .await
780 .expect("compile_route_definition");
781
782 assert!(
783 handle
784 .get_pipeline("h-1")
785 .await
786 .expect("get_pipeline")
787 .is_some()
788 );
789 handle
790 .swap_pipeline("h-1", compiled)
791 .await
792 .expect("swap_pipeline");
793
794 let _ = handle
795 .in_flight_count("h-1")
796 .await
797 .expect("in_flight_count");
798 let _ = handle.route_source_hash("h-1").await;
799
800 handle
801 .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
802 .await
803 .expect("set_error_handler");
804 handle
805 .set_tracer_config(TracerConfig::default())
806 .await
807 .expect("set_tracer_config");
808 handle
809 .set_runtime_handle(Arc::new(NoopRuntime))
810 .await
811 .expect("set_runtime_handle");
812
813 handle.remove_route("h-1").await.expect("remove_route");
814 assert_eq!(
815 handle
816 .route_count()
817 .await
818 .expect("route_count after remove"),
819 0
820 );
821 handle
822 .stop_all_routes()
823 .await
824 .expect("stop_all_routes on empty");
825
826 handle.shutdown().await.expect("shutdown send");
827 join_handle.await.expect("actor join");
828 }
829
830 #[tokio::test]
831 async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
832 let (handle, join_handle) = build_actor_with_components();
833 handle
834 .add_route(route_def("lc-1", "timer:tick?period=50"))
835 .await
836 .expect("add route lc-1");
837
838 handle.start_route("lc-1").await.expect("start_route");
839 sleep(Duration::from_millis(20)).await;
840
841 handle.restart_route("lc-1").await.expect("restart_route");
842 sleep(Duration::from_millis(20)).await;
843
844 handle.suspend_route("lc-1").await.expect("suspend_route");
845 handle.resume_route("lc-1").await.expect("resume_route");
846 sleep(Duration::from_millis(20)).await;
847
848 handle.stop_route("lc-1").await.expect("stop_route");
849 handle.start_all_routes().await.expect("start_all_routes");
850 sleep(Duration::from_millis(20)).await;
851 handle.stop_all_routes().await.expect("stop_all_routes");
852
853 handle
854 .start_route_reload("lc-1")
855 .await
856 .expect("start_route_reload");
857 handle
858 .stop_route_reload("lc-1")
859 .await
860 .expect("stop_route_reload");
861
862 handle.shutdown().await.expect("shutdown send");
863 join_handle.await.expect("actor join");
864 }
865
866 #[tokio::test]
867 async fn spawn_supervision_restarts_route_on_crash() {
868 let (handle, join_handle) = build_actor_with_components();
869 handle
870 .add_route(route_def("sup-1", "timer:tick?period=100"))
871 .await
872 .expect("add route sup-1");
873 handle
874 .start_route("sup-1")
875 .await
876 .expect("start_route sup-1");
877
878 let (crash_tx, crash_rx) = mpsc::channel(8);
879 let supervision = spawn_supervision_task(
880 handle.clone(),
881 SupervisionConfig {
882 initial_delay: Duration::from_millis(10),
883 max_attempts: Some(2),
884 ..SupervisionConfig::default()
885 },
886 None,
887 crash_rx,
888 );
889
890 crash_tx
891 .send(CrashNotification {
892 route_id: "sup-1".to_string(),
893 error: "simulated".to_string(),
894 })
895 .await
896 .expect("send crash notification");
897
898 sleep(Duration::from_millis(150)).await;
899 drop(crash_tx);
900 supervision.await.expect("supervision join");
901
902 handle.shutdown().await.expect("shutdown send");
903 join_handle.await.expect("actor join");
904 }
905
906 #[tokio::test]
907 async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
908 let (handle, join_handle) = build_actor_with_components();
909 handle
910 .add_route(route_def("sup-2", "timer:tick?period=100"))
911 .await
912 .expect("add route sup-2");
913 handle
914 .start_route("sup-2")
915 .await
916 .expect("start_route sup-2");
917
918 let (crash_tx, crash_rx) = mpsc::channel(8);
919 let supervision = spawn_supervision_task(
920 handle.clone(),
921 SupervisionConfig {
922 initial_delay: Duration::from_millis(10),
923 max_attempts: Some(1),
924 ..SupervisionConfig::default()
925 },
926 None,
927 crash_rx,
928 );
929
930 crash_tx
931 .send(CrashNotification {
932 route_id: "sup-2".to_string(),
933 error: "attempt-1".to_string(),
934 })
935 .await
936 .expect("send crash attempt-1");
937 crash_tx
938 .send(CrashNotification {
939 route_id: "sup-2".to_string(),
940 error: "attempt-2".to_string(),
941 })
942 .await
943 .expect("send crash attempt-2");
944
945 sleep(Duration::from_millis(200)).await;
946 drop(crash_tx);
947 supervision.await.expect("supervision join");
948
949 handle.shutdown().await.expect("shutdown send");
950 join_handle.await.expect("actor join");
951 }
952
953 #[tokio::test]
954 async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
955 let (handle, join_handle) = build_empty_actor();
956
957 handle
958 .try_set_runtime_handle(Arc::new(NoopRuntime))
959 .expect("try_set_runtime_handle should succeed");
960
961 handle.shutdown().await.expect("shutdown send");
962 join_handle.await.expect("actor join");
963 }
964
965 #[tokio::test]
966 async fn shutdown_returns_error_when_actor_stopped() {
967 let (tx, rx) = mpsc::channel(1);
968 drop(rx);
969
970 let handle = RouteControllerHandle { tx };
971 let result = handle.shutdown().await;
972
973 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
974 }
975}