1use std::sync::Arc;
2use std::time::Instant;
3
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::{
6 BoxProcessor, CamelError, MetricsCollector, RouteController, RuntimeHandle, SupervisionConfig,
7};
8use tokio::sync::{mpsc, oneshot};
9use tokio::task::JoinHandle;
10use tracing::{error, info};
11
12use super::route_controller::{CrashNotification, DefaultRouteController};
13use crate::lifecycle::application::route_definition::RouteDefinition;
14use crate::shared::observability::domain::TracerConfig;
15
16pub(crate) enum RouteControllerCommand {
17 StartRoute {
18 route_id: String,
19 reply: oneshot::Sender<Result<(), CamelError>>,
20 },
21 StopRoute {
22 route_id: String,
23 reply: oneshot::Sender<Result<(), CamelError>>,
24 },
25 RestartRoute {
26 route_id: String,
27 reply: oneshot::Sender<Result<(), CamelError>>,
28 },
29 SuspendRoute {
30 route_id: String,
31 reply: oneshot::Sender<Result<(), CamelError>>,
32 },
33 ResumeRoute {
34 route_id: String,
35 reply: oneshot::Sender<Result<(), CamelError>>,
36 },
37 StartAllRoutes {
38 reply: oneshot::Sender<Result<(), CamelError>>,
39 },
40 StopAllRoutes {
41 reply: oneshot::Sender<Result<(), CamelError>>,
42 },
43 AddRoute {
44 definition: RouteDefinition,
45 reply: oneshot::Sender<Result<(), CamelError>>,
46 },
47 RemoveRoute {
48 route_id: String,
49 reply: oneshot::Sender<Result<(), CamelError>>,
50 },
51 SwapPipeline {
52 route_id: String,
53 pipeline: BoxProcessor,
54 reply: oneshot::Sender<Result<(), CamelError>>,
55 },
56 CompileRouteDefinition {
57 definition: RouteDefinition,
58 reply: oneshot::Sender<Result<BoxProcessor, CamelError>>,
59 },
60 RouteFromUri {
61 route_id: String,
62 reply: oneshot::Sender<Option<String>>,
63 },
64 SetErrorHandler {
65 config: ErrorHandlerConfig,
66 },
67 SetTracerConfig {
68 config: TracerConfig,
69 },
70 RouteCount {
71 reply: oneshot::Sender<usize>,
72 },
73 InFlightCount {
74 route_id: String,
75 reply: oneshot::Sender<Option<u64>>,
76 },
77 RouteExists {
78 route_id: String,
79 reply: oneshot::Sender<bool>,
80 },
81 RouteIds {
82 reply: oneshot::Sender<Vec<String>>,
83 },
84 AutoStartupRouteIds {
85 reply: oneshot::Sender<Vec<String>>,
86 },
87 ShutdownRouteIds {
88 reply: oneshot::Sender<Vec<String>>,
89 },
90 GetPipeline {
91 route_id: String,
92 reply: oneshot::Sender<Option<BoxProcessor>>,
93 },
94 StartRouteReload {
95 route_id: String,
96 reply: oneshot::Sender<Result<(), CamelError>>,
97 },
98 StopRouteReload {
99 route_id: String,
100 reply: oneshot::Sender<Result<(), CamelError>>,
101 },
102 SetRuntimeHandle {
103 runtime: Arc<dyn RuntimeHandle>,
104 },
105 RouteSourceHash {
106 route_id: String,
107 reply: oneshot::Sender<Option<u64>>,
108 },
109 Shutdown,
110}
111
112#[derive(Clone)]
113pub struct RouteControllerHandle {
114 tx: mpsc::Sender<RouteControllerCommand>,
115}
116
117impl RouteControllerHandle {
118 pub async fn start_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
119 let (reply_tx, reply_rx) = oneshot::channel();
120 self.tx
121 .send(RouteControllerCommand::StartRoute {
122 route_id: route_id.into(),
123 reply: reply_tx,
124 })
125 .await
126 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
127 reply_rx
128 .await
129 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
130 }
131
132 pub async fn stop_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
133 let (reply_tx, reply_rx) = oneshot::channel();
134 self.tx
135 .send(RouteControllerCommand::StopRoute {
136 route_id: route_id.into(),
137 reply: reply_tx,
138 })
139 .await
140 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
141 reply_rx
142 .await
143 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
144 }
145
146 pub async fn restart_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
147 let (reply_tx, reply_rx) = oneshot::channel();
148 self.tx
149 .send(RouteControllerCommand::RestartRoute {
150 route_id: route_id.into(),
151 reply: reply_tx,
152 })
153 .await
154 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
155 reply_rx
156 .await
157 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
158 }
159
160 pub async fn suspend_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
161 let (reply_tx, reply_rx) = oneshot::channel();
162 self.tx
163 .send(RouteControllerCommand::SuspendRoute {
164 route_id: route_id.into(),
165 reply: reply_tx,
166 })
167 .await
168 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
169 reply_rx
170 .await
171 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
172 }
173
174 pub async fn resume_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
175 let (reply_tx, reply_rx) = oneshot::channel();
176 self.tx
177 .send(RouteControllerCommand::ResumeRoute {
178 route_id: route_id.into(),
179 reply: reply_tx,
180 })
181 .await
182 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
183 reply_rx
184 .await
185 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
186 }
187
188 pub async fn start_all_routes(&self) -> Result<(), CamelError> {
189 let (reply_tx, reply_rx) = oneshot::channel();
190 self.tx
191 .send(RouteControllerCommand::StartAllRoutes { reply: reply_tx })
192 .await
193 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
194 reply_rx
195 .await
196 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
197 }
198
199 pub async fn stop_all_routes(&self) -> Result<(), CamelError> {
200 let (reply_tx, reply_rx) = oneshot::channel();
201 self.tx
202 .send(RouteControllerCommand::StopAllRoutes { reply: reply_tx })
203 .await
204 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
205 reply_rx
206 .await
207 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
208 }
209
210 pub async fn add_route(&self, definition: RouteDefinition) -> Result<(), CamelError> {
211 let (reply_tx, reply_rx) = oneshot::channel();
212 self.tx
213 .send(RouteControllerCommand::AddRoute {
214 definition,
215 reply: reply_tx,
216 })
217 .await
218 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
219 reply_rx
220 .await
221 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
222 }
223
224 pub async fn remove_route(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
225 let (reply_tx, reply_rx) = oneshot::channel();
226 self.tx
227 .send(RouteControllerCommand::RemoveRoute {
228 route_id: route_id.into(),
229 reply: reply_tx,
230 })
231 .await
232 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
233 reply_rx
234 .await
235 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
236 }
237
238 pub async fn swap_pipeline(
239 &self,
240 route_id: impl Into<String>,
241 pipeline: BoxProcessor,
242 ) -> Result<(), CamelError> {
243 let (reply_tx, reply_rx) = oneshot::channel();
244 self.tx
245 .send(RouteControllerCommand::SwapPipeline {
246 route_id: route_id.into(),
247 pipeline,
248 reply: reply_tx,
249 })
250 .await
251 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
252 reply_rx
253 .await
254 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
255 }
256
257 pub async fn compile_route_definition(
258 &self,
259 definition: RouteDefinition,
260 ) -> Result<BoxProcessor, CamelError> {
261 let (reply_tx, reply_rx) = oneshot::channel();
262 self.tx
263 .send(RouteControllerCommand::CompileRouteDefinition {
264 definition,
265 reply: reply_tx,
266 })
267 .await
268 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
269 reply_rx
270 .await
271 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
272 }
273
274 pub async fn route_from_uri(
275 &self,
276 route_id: impl Into<String>,
277 ) -> Result<Option<String>, CamelError> {
278 let (reply_tx, reply_rx) = oneshot::channel();
279 self.tx
280 .send(RouteControllerCommand::RouteFromUri {
281 route_id: route_id.into(),
282 reply: reply_tx,
283 })
284 .await
285 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
286 reply_rx
287 .await
288 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
289 }
290
291 pub async fn route_count(&self) -> Result<usize, CamelError> {
292 let (reply_tx, reply_rx) = oneshot::channel();
293 self.tx
294 .send(RouteControllerCommand::RouteCount { reply: reply_tx })
295 .await
296 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
297 reply_rx
298 .await
299 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
300 }
301
302 pub async fn in_flight_count(
303 &self,
304 route_id: impl Into<String>,
305 ) -> Result<Option<u64>, CamelError> {
306 let (reply_tx, reply_rx) = oneshot::channel();
307 self.tx
308 .send(RouteControllerCommand::InFlightCount {
309 route_id: route_id.into(),
310 reply: reply_tx,
311 })
312 .await
313 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
314 reply_rx
315 .await
316 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
317 }
318
319 pub async fn route_exists(&self, route_id: impl Into<String>) -> Result<bool, CamelError> {
320 let (reply_tx, reply_rx) = oneshot::channel();
321 self.tx
322 .send(RouteControllerCommand::RouteExists {
323 route_id: route_id.into(),
324 reply: reply_tx,
325 })
326 .await
327 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
328 reply_rx
329 .await
330 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
331 }
332
333 pub async fn route_ids(&self) -> Result<Vec<String>, CamelError> {
334 let (reply_tx, reply_rx) = oneshot::channel();
335 self.tx
336 .send(RouteControllerCommand::RouteIds { reply: reply_tx })
337 .await
338 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
339 reply_rx
340 .await
341 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
342 }
343
344 pub async fn auto_startup_route_ids(&self) -> Result<Vec<String>, CamelError> {
345 let (reply_tx, reply_rx) = oneshot::channel();
346 self.tx
347 .send(RouteControllerCommand::AutoStartupRouteIds { reply: reply_tx })
348 .await
349 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
350 reply_rx
351 .await
352 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
353 }
354
355 pub async fn shutdown_route_ids(&self) -> Result<Vec<String>, CamelError> {
356 let (reply_tx, reply_rx) = oneshot::channel();
357 self.tx
358 .send(RouteControllerCommand::ShutdownRouteIds { reply: reply_tx })
359 .await
360 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
361 reply_rx
362 .await
363 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
364 }
365
366 pub async fn start_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
367 let (reply_tx, reply_rx) = oneshot::channel();
368 self.tx
369 .send(RouteControllerCommand::StartRouteReload {
370 route_id: route_id.into(),
371 reply: reply_tx,
372 })
373 .await
374 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
375 reply_rx
376 .await
377 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
378 }
379
380 pub async fn stop_route_reload(&self, route_id: impl Into<String>) -> Result<(), CamelError> {
381 let (reply_tx, reply_rx) = oneshot::channel();
382 self.tx
383 .send(RouteControllerCommand::StopRouteReload {
384 route_id: route_id.into(),
385 reply: reply_tx,
386 })
387 .await
388 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
389 reply_rx
390 .await
391 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))?
392 }
393
394 pub async fn get_pipeline(
395 &self,
396 route_id: impl Into<String>,
397 ) -> Result<Option<BoxProcessor>, CamelError> {
398 let (reply_tx, reply_rx) = oneshot::channel();
399 self.tx
400 .send(RouteControllerCommand::GetPipeline {
401 route_id: route_id.into(),
402 reply: reply_tx,
403 })
404 .await
405 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))?;
406 reply_rx
407 .await
408 .map_err(|_| CamelError::ProcessorError("controller actor dropped reply".into()))
409 }
410
411 pub async fn set_error_handler(&self, config: ErrorHandlerConfig) -> Result<(), CamelError> {
412 self.tx
413 .send(RouteControllerCommand::SetErrorHandler { config })
414 .await
415 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
416 }
417
418 pub async fn set_tracer_config(&self, config: TracerConfig) -> Result<(), CamelError> {
419 self.tx
420 .send(RouteControllerCommand::SetTracerConfig { config })
421 .await
422 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
423 }
424
425 pub async fn set_runtime_handle(
426 &self,
427 runtime: Arc<dyn RuntimeHandle>,
428 ) -> Result<(), CamelError> {
429 self.tx
430 .send(RouteControllerCommand::SetRuntimeHandle { runtime })
431 .await
432 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
433 }
434
435 pub fn try_set_runtime_handle(
436 &self,
437 runtime: Arc<dyn RuntimeHandle>,
438 ) -> Result<(), CamelError> {
439 self.tx
440 .try_send(RouteControllerCommand::SetRuntimeHandle { runtime })
441 .map_err(|err| {
442 CamelError::ProcessorError(format!("controller actor mailbox full: {err}"))
443 })
444 }
445
446 pub async fn route_source_hash(&self, route_id: impl Into<String>) -> Option<u64> {
447 let (reply_tx, reply_rx) = oneshot::channel();
448 self.tx
449 .send(RouteControllerCommand::RouteSourceHash {
450 route_id: route_id.into(),
451 reply: reply_tx,
452 })
453 .await
454 .ok()?;
455 reply_rx.await.ok()?
456 }
457
458 pub async fn shutdown(&self) -> Result<(), CamelError> {
459 self.tx
460 .send(RouteControllerCommand::Shutdown)
461 .await
462 .map_err(|_| CamelError::ProcessorError("controller actor stopped".into()))
463 }
464}
465
466pub fn spawn_controller_actor(
467 controller: DefaultRouteController,
468) -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
469 let (tx, mut rx) = mpsc::channel::<RouteControllerCommand>(256);
470 let handle = tokio::spawn(async move {
471 let mut controller = controller;
472 while let Some(cmd) = rx.recv().await {
473 match cmd {
474 RouteControllerCommand::StartRoute { route_id, reply } => {
475 let _ = reply.send(controller.start_route(&route_id).await);
476 }
477 RouteControllerCommand::StopRoute { route_id, reply } => {
478 let _ = reply.send(controller.stop_route(&route_id).await);
479 }
480 RouteControllerCommand::RestartRoute { route_id, reply } => {
481 let _ = reply.send(controller.restart_route(&route_id).await);
482 }
483 RouteControllerCommand::SuspendRoute { route_id, reply } => {
484 let _ = reply.send(controller.suspend_route(&route_id).await);
485 }
486 RouteControllerCommand::ResumeRoute { route_id, reply } => {
487 let _ = reply.send(controller.resume_route(&route_id).await);
488 }
489 RouteControllerCommand::StartAllRoutes { reply } => {
490 let _ = reply.send(controller.start_all_routes().await);
491 }
492 RouteControllerCommand::StopAllRoutes { reply } => {
493 let _ = reply.send(controller.stop_all_routes().await);
494 }
495 RouteControllerCommand::AddRoute { definition, reply } => {
496 let _ = reply.send(controller.add_route(definition));
497 }
498 RouteControllerCommand::RemoveRoute { route_id, reply } => {
499 let _ = reply.send(controller.remove_route(&route_id));
500 }
501 RouteControllerCommand::SwapPipeline {
502 route_id,
503 pipeline,
504 reply,
505 } => {
506 let _ = reply.send(controller.swap_pipeline(&route_id, pipeline));
507 }
508 RouteControllerCommand::CompileRouteDefinition { definition, reply } => {
509 let _ = reply.send(controller.compile_route_definition(definition));
510 }
511 RouteControllerCommand::RouteFromUri { route_id, reply } => {
512 let _ = reply.send(controller.route_from_uri(&route_id));
513 }
514 RouteControllerCommand::SetErrorHandler { config } => {
515 controller.set_error_handler(config);
516 }
517 RouteControllerCommand::SetTracerConfig { config } => {
518 controller.set_tracer_config(&config);
519 }
520 RouteControllerCommand::RouteCount { reply } => {
521 let _ = reply.send(controller.route_count());
522 }
523 RouteControllerCommand::InFlightCount { route_id, reply } => {
524 let _ = reply.send(controller.in_flight_count(&route_id));
525 }
526 RouteControllerCommand::RouteExists { route_id, reply } => {
527 let _ = reply.send(controller.route_exists(&route_id));
528 }
529 RouteControllerCommand::RouteIds { reply } => {
530 let _ = reply.send(controller.route_ids());
531 }
532 RouteControllerCommand::AutoStartupRouteIds { reply } => {
533 let _ = reply.send(controller.auto_startup_route_ids());
534 }
535 RouteControllerCommand::ShutdownRouteIds { reply } => {
536 let _ = reply.send(controller.shutdown_route_ids());
537 }
538 RouteControllerCommand::GetPipeline { route_id, reply } => {
539 let _ = reply.send(controller.get_pipeline(&route_id));
540 }
541 RouteControllerCommand::StartRouteReload { route_id, reply } => {
542 let _ = reply.send(controller.start_route_reload(&route_id).await);
543 }
544 RouteControllerCommand::StopRouteReload { route_id, reply } => {
545 let _ = reply.send(controller.stop_route_reload(&route_id).await);
546 }
547 RouteControllerCommand::SetRuntimeHandle { runtime } => {
548 controller.set_runtime_handle(runtime);
549 }
550 RouteControllerCommand::RouteSourceHash { route_id, reply } => {
551 let _ = reply.send(controller.route_source_hash(&route_id));
552 }
553 RouteControllerCommand::Shutdown => {
554 break;
555 }
556 }
557 }
558 });
559 (RouteControllerHandle { tx }, handle)
560}
561
562pub fn spawn_supervision_task(
563 controller: RouteControllerHandle,
564 config: SupervisionConfig,
565 _metrics: Option<Arc<dyn MetricsCollector>>,
566 mut crash_rx: mpsc::Receiver<CrashNotification>,
567) -> JoinHandle<()> {
568 tokio::spawn(async move {
569 let mut attempts: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
570 let mut last_restart_time: std::collections::HashMap<String, Instant> =
571 std::collections::HashMap::new();
572 let mut currently_restarting: std::collections::HashSet<String> =
573 std::collections::HashSet::new();
574
575 info!("Supervision loop started");
576
577 while let Some(notification) = crash_rx.recv().await {
578 let route_id = notification.route_id;
579 if currently_restarting.contains(&route_id) {
580 continue;
581 }
582
583 if let Some(last_time) = last_restart_time.get(&route_id)
584 && last_time.elapsed() >= config.initial_delay
585 {
586 attempts.insert(route_id.clone(), 0);
587 }
588
589 let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
590 *current_attempt += 1;
591
592 if config
593 .max_attempts
594 .is_some_and(|max| *current_attempt > max)
595 {
596 error!(
597 route_id = %route_id,
598 attempts = *current_attempt,
599 "Route exceeded max restart attempts, giving up"
600 );
601 continue;
602 }
603
604 let delay = config.next_delay(*current_attempt);
605 currently_restarting.insert(route_id.clone());
606 tokio::time::sleep(delay).await;
607
608 match controller.restart_route(route_id.clone()).await {
609 Ok(()) => {
610 info!(route_id = %route_id, "Route restarted successfully");
611 last_restart_time.insert(route_id.clone(), Instant::now());
612 }
613 Err(err) => {
614 error!(route_id = %route_id, error = %err, "Failed to restart route");
615 }
616 }
617
618 currently_restarting.remove(&route_id);
619 }
620
621 info!("Supervision loop ended");
622 })
623}
624
625#[cfg(test)]
626mod tests {
627 use super::{
628 RouteControllerCommand, RouteControllerHandle, spawn_controller_actor,
629 spawn_supervision_task,
630 };
631 use crate::lifecycle::adapters::route_controller::{CrashNotification, DefaultRouteController};
632 use crate::lifecycle::application::route_definition::RouteDefinition;
633 use crate::shared::components::domain::Registry;
634 use crate::shared::observability::domain::TracerConfig;
635 use camel_api::{
636 CamelError, ErrorHandlerConfig, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult,
637 RuntimeQuery, RuntimeQueryBus, RuntimeQueryResult, SupervisionConfig,
638 };
639 use std::sync::Arc;
640 use std::time::Duration;
641 use tokio::sync::mpsc;
642 use tokio::time::sleep;
643
644 fn build_actor_with_components() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
645 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
646 {
647 let mut guard = registry.lock().expect("lock");
648 guard.register(std::sync::Arc::new(
649 camel_component_timer::TimerComponent::new(),
650 ));
651 guard.register(std::sync::Arc::new(
652 camel_component_mock::MockComponent::new(),
653 ));
654 }
655 let controller = DefaultRouteController::new(Arc::clone(®istry));
656 spawn_controller_actor(controller)
657 }
658
659 fn build_empty_actor() -> (RouteControllerHandle, tokio::task::JoinHandle<()>) {
660 let controller =
661 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new())));
662 spawn_controller_actor(controller)
663 }
664
665 fn route_def(route_id: &str, from_uri: &str) -> RouteDefinition {
666 RouteDefinition::new(from_uri, vec![]).with_route_id(route_id)
667 }
668
669 struct NoopRuntime;
670
671 #[async_trait::async_trait]
672 impl RuntimeCommandBus for NoopRuntime {
673 async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
674 Ok(RuntimeCommandResult::Accepted)
675 }
676 }
677
678 #[async_trait::async_trait]
679 impl RuntimeQueryBus for NoopRuntime {
680 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
681 Ok(match query {
682 RuntimeQuery::GetRouteStatus { route_id }
683 | RuntimeQuery::InFlightCount { route_id } => {
684 RuntimeQueryResult::RouteNotFound { route_id }
685 }
686 RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
687 route_ids: Vec::new(),
688 },
689 })
690 }
691 }
692
693 #[tokio::test]
694 async fn start_route_sends_command_and_returns_reply() {
695 let (tx, mut rx) = mpsc::channel(1);
696 let handle = RouteControllerHandle { tx };
697
698 let task = tokio::spawn(async move { handle.start_route("route-a").await });
699
700 let command = rx.recv().await.expect("command should be received");
701 match command {
702 RouteControllerCommand::StartRoute { route_id, reply } => {
703 assert_eq!(route_id, "route-a");
704 let _ = reply.send(Ok(()));
705 }
706 _ => panic!("unexpected command variant"),
707 }
708
709 let result = task.await.expect("join should succeed");
710 assert!(result.is_ok());
711 }
712
713 #[tokio::test]
714 async fn start_route_returns_error_when_actor_stops() {
715 let (tx, rx) = mpsc::channel(1);
716 drop(rx);
717
718 let handle = RouteControllerHandle { tx };
719 let result = handle.start_route("route-a").await;
720
721 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
722 }
723
724 #[tokio::test]
725 async fn spawn_controller_actor_processes_commands_and_shutdown() {
726 let controller =
727 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new())));
728 let (handle, join_handle) = spawn_controller_actor(controller);
729
730 assert_eq!(handle.route_count().await.expect("route_count"), 0);
731 assert_eq!(
732 handle.route_ids().await.expect("route_ids"),
733 Vec::<String>::new()
734 );
735
736 handle.shutdown().await.expect("shutdown send");
737 join_handle.await.expect("actor join");
738 }
739
740 #[tokio::test]
741 async fn actor_handle_introspection_and_mutation_commands() {
742 let (handle, join_handle) = build_actor_with_components();
743 let definition = route_def("h-1", "timer:tick?period=100");
744
745 handle.add_route(definition).await.expect("add route");
746 assert!(handle.route_exists("h-1").await.expect("route exists h-1"));
747 assert!(
748 !handle
749 .route_exists("no-such")
750 .await
751 .expect("route exists no-such")
752 );
753
754 let from_uri = handle.route_from_uri("h-1").await.expect("route_from_uri");
755 assert_eq!(from_uri.as_deref(), Some("timer:tick?period=100"));
756 assert_eq!(handle.route_count().await.expect("route_count"), 1);
757
758 let auto_ids = handle
759 .auto_startup_route_ids()
760 .await
761 .expect("auto_startup_route_ids");
762 assert!(auto_ids.iter().any(|id| id == "h-1"));
763
764 let shutdown_ids = handle
765 .shutdown_route_ids()
766 .await
767 .expect("shutdown_route_ids");
768 assert!(shutdown_ids.iter().any(|id| id == "h-1"));
769
770 let compiled = handle
771 .compile_route_definition(route_def("h-1", "timer:tick?period=100"))
772 .await
773 .expect("compile_route_definition");
774
775 assert!(
776 handle
777 .get_pipeline("h-1")
778 .await
779 .expect("get_pipeline")
780 .is_some()
781 );
782 handle
783 .swap_pipeline("h-1", compiled)
784 .await
785 .expect("swap_pipeline");
786
787 let _ = handle
788 .in_flight_count("h-1")
789 .await
790 .expect("in_flight_count");
791 let _ = handle.route_source_hash("h-1").await;
792
793 handle
794 .set_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"))
795 .await
796 .expect("set_error_handler");
797 handle
798 .set_tracer_config(TracerConfig::default())
799 .await
800 .expect("set_tracer_config");
801 handle
802 .set_runtime_handle(Arc::new(NoopRuntime))
803 .await
804 .expect("set_runtime_handle");
805
806 handle.remove_route("h-1").await.expect("remove_route");
807 assert_eq!(
808 handle
809 .route_count()
810 .await
811 .expect("route_count after remove"),
812 0
813 );
814 handle
815 .stop_all_routes()
816 .await
817 .expect("stop_all_routes on empty");
818
819 handle.shutdown().await.expect("shutdown send");
820 join_handle.await.expect("actor join");
821 }
822
823 #[tokio::test]
824 async fn actor_handle_lifecycle_start_stop_restart_suspend_resume() {
825 let (handle, join_handle) = build_actor_with_components();
826 handle
827 .add_route(route_def("lc-1", "timer:tick?period=50"))
828 .await
829 .expect("add route lc-1");
830
831 handle.start_route("lc-1").await.expect("start_route");
832 sleep(Duration::from_millis(20)).await;
833
834 handle.restart_route("lc-1").await.expect("restart_route");
835 sleep(Duration::from_millis(20)).await;
836
837 handle.suspend_route("lc-1").await.expect("suspend_route");
838 handle.resume_route("lc-1").await.expect("resume_route");
839 sleep(Duration::from_millis(20)).await;
840
841 handle.stop_route("lc-1").await.expect("stop_route");
842 handle.start_all_routes().await.expect("start_all_routes");
843 sleep(Duration::from_millis(20)).await;
844 handle.stop_all_routes().await.expect("stop_all_routes");
845
846 handle
847 .start_route_reload("lc-1")
848 .await
849 .expect("start_route_reload");
850 handle
851 .stop_route_reload("lc-1")
852 .await
853 .expect("stop_route_reload");
854
855 handle.shutdown().await.expect("shutdown send");
856 join_handle.await.expect("actor join");
857 }
858
859 #[tokio::test]
860 async fn spawn_supervision_restarts_route_on_crash() {
861 let (handle, join_handle) = build_actor_with_components();
862 handle
863 .add_route(route_def("sup-1", "timer:tick?period=100"))
864 .await
865 .expect("add route sup-1");
866 handle
867 .start_route("sup-1")
868 .await
869 .expect("start_route sup-1");
870
871 let (crash_tx, crash_rx) = mpsc::channel(8);
872 let supervision = spawn_supervision_task(
873 handle.clone(),
874 SupervisionConfig {
875 initial_delay: Duration::from_millis(10),
876 max_attempts: Some(2),
877 ..SupervisionConfig::default()
878 },
879 None,
880 crash_rx,
881 );
882
883 crash_tx
884 .send(CrashNotification {
885 route_id: "sup-1".to_string(),
886 error: "simulated".to_string(),
887 })
888 .await
889 .expect("send crash notification");
890
891 sleep(Duration::from_millis(150)).await;
892 drop(crash_tx);
893 supervision.await.expect("supervision join");
894
895 handle.shutdown().await.expect("shutdown send");
896 join_handle.await.expect("actor join");
897 }
898
899 #[tokio::test]
900 async fn supervision_skips_duplicate_and_gives_up_after_max_attempts() {
901 let (handle, join_handle) = build_actor_with_components();
902 handle
903 .add_route(route_def("sup-2", "timer:tick?period=100"))
904 .await
905 .expect("add route sup-2");
906 handle
907 .start_route("sup-2")
908 .await
909 .expect("start_route sup-2");
910
911 let (crash_tx, crash_rx) = mpsc::channel(8);
912 let supervision = spawn_supervision_task(
913 handle.clone(),
914 SupervisionConfig {
915 initial_delay: Duration::from_millis(10),
916 max_attempts: Some(1),
917 ..SupervisionConfig::default()
918 },
919 None,
920 crash_rx,
921 );
922
923 crash_tx
924 .send(CrashNotification {
925 route_id: "sup-2".to_string(),
926 error: "attempt-1".to_string(),
927 })
928 .await
929 .expect("send crash attempt-1");
930 crash_tx
931 .send(CrashNotification {
932 route_id: "sup-2".to_string(),
933 error: "attempt-2".to_string(),
934 })
935 .await
936 .expect("send crash attempt-2");
937
938 sleep(Duration::from_millis(200)).await;
939 drop(crash_tx);
940 supervision.await.expect("supervision join");
941
942 handle.shutdown().await.expect("shutdown send");
943 join_handle.await.expect("actor join");
944 }
945
946 #[tokio::test]
947 async fn try_set_runtime_handle_succeeds_on_fresh_actor() {
948 let (handle, join_handle) = build_empty_actor();
949
950 handle
951 .try_set_runtime_handle(Arc::new(NoopRuntime))
952 .expect("try_set_runtime_handle should succeed");
953
954 handle.shutdown().await.expect("shutdown send");
955 join_handle.await.expect("actor join");
956 }
957
958 #[tokio::test]
959 async fn shutdown_returns_error_when_actor_stopped() {
960 let (tx, rx) = mpsc::channel(1);
961 drop(rx);
962
963 let handle = RouteControllerHandle { tx };
964 let result = handle.shutdown().await;
965
966 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
967 }
968}