camel_core/lifecycle/adapters/
route_controller_trait.rs1use std::sync::Arc;
7use std::time::Duration;
8
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11use tower::Service;
12use tracing::{error, info, warn};
13
14use camel_api::{CamelError, NoOpMetrics};
15use camel_component_api::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
16
17use crate::lifecycle::adapters::consumer_management;
18use crate::lifecycle::adapters::controller_component_context::ControllerComponentContext;
19use crate::lifecycle::adapters::route_controller::DefaultRouteController;
20#[cfg(test)]
21use crate::lifecycle::adapters::route_helpers::emit_start_route_event;
22use crate::lifecycle::adapters::route_helpers::{
23 handle_is_running, inferred_lifecycle_label, ready_with_backoff,
24};
25use crate::lifecycle::adapters::route_registry::DEFAULT_SHUTDOWN_TIMEOUT;
26
27#[async_trait::async_trait]
28impl camel_api::RouteController for DefaultRouteController {
29 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
30 {
32 let managed = self
33 .routes
34 .get_mut(route_id)
35 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
36
37 let consumer_running = handle_is_running(&managed.consumer_handle);
38 let pipeline_running = handle_is_running(&managed.pipeline_handle);
39 if consumer_running && pipeline_running {
40 return Ok(());
41 }
42 if !consumer_running && pipeline_running {
43 return Err(CamelError::RouteError(format!(
44 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
45 route_id
46 )));
47 }
48 if consumer_running && !pipeline_running {
49 return Err(CamelError::RouteError(format!(
50 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
51 route_id
52 )));
53 }
54 }
55
56 info!(route_id = %route_id, "Starting route");
57
58 let (from_uri, pipeline, concurrency) = {
60 let managed = self
61 .routes
62 .get(route_id)
63 .expect("invariant: route must exist after prior existence check"); (
65 managed.from_uri.clone(),
66 Arc::clone(&managed.pipeline),
67 managed.concurrency.clone(),
68 )
69 };
70
71 let crash_notifier = self.crash_notifier.clone();
73 let runtime_for_consumer = self.runtime.clone();
74
75 let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
76 Arc::clone(&self.registry),
77 Arc::clone(&self.languages),
78 self.tracer_metrics
79 .clone()
80 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
81 Arc::clone(&self.platform_service),
82 self.health_registry(),
83 Some(route_id.to_string()),
84 ));
85 let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
86 Arc::clone(&consumer_component_ctx) as Arc<_>;
87 let (mut consumer, consumer_concurrency) = consumer_management::create_route_consumer(
88 consumer_rt,
89 &self.registry,
90 &from_uri,
91 consumer_component_ctx.as_ref(),
92 )?;
93
94 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
96
97 let managed = self
99 .routes
100 .get_mut(route_id)
101 .expect("invariant: route must exist after prior existence check"); if let (Some(sp_config), Some(authenticator)) = (
105 managed.compiled.security_policy.as_ref(),
106 managed.compiled.security_authenticator.as_ref(),
107 ) {
108 use camel_component_api::SecurityContext;
109 let sec_ctx =
110 SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
111 consumer.set_security_context(sec_ctx);
112 }
113
114 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
116 let consumer_cancel = managed.consumer_cancel_token.child_token();
118 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
119 let tx_for_storage = tx.clone();
121 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone(), route_id.to_string());
122
123 let split_clone = managed.aggregate_split.clone();
125 if let Some(split) = split_clone {
126 return self
127 .start_aggregate_route(
128 route_id,
129 split,
130 consumer,
131 consumer_ctx,
132 rx,
133 crash_notifier,
134 runtime_for_consumer,
135 tx_for_storage,
136 pipeline_cancel,
137 )
138 .await;
139 }
140 let pipeline_handle = match effective_concurrency {
144 ConcurrencyModel::Sequential => {
145 tokio::spawn(async move {
146 loop {
147 let envelope = tokio::select! {
149 envelope = rx.recv() => match envelope {
150 Some(e) => e,
151 None => return, },
153 _ = pipeline_cancel.cancelled() => {
154 return;
156 }
157 };
158 let ExchangeEnvelope { exchange, reply_tx } = envelope;
159
160 let mut pipeline = pipeline.load().clone_inner();
162
163 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
164 if let Some(tx) = reply_tx {
165 let _ = tx.send(Err(e));
166 }
167 return;
168 }
169
170 let result = pipeline.call(exchange).await;
171 if let Some(tx) = reply_tx {
172 let _ = tx.send(result);
173 } else if let Err(ref e) = result
174 && !matches!(e, CamelError::Stopped)
175 {
176 error!("Pipeline error: {e}");
178 }
179 }
180 })
181 }
182 ConcurrencyModel::Concurrent { max } => {
183 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
184 tokio::spawn(async move {
185 loop {
186 let envelope = tokio::select! {
188 envelope = rx.recv() => match envelope {
189 Some(e) => e,
190 None => return, },
192 _ = pipeline_cancel.cancelled() => {
193 return;
195 }
196 };
197 let ExchangeEnvelope { exchange, reply_tx } = envelope;
198 let pipe_ref = Arc::clone(&pipeline);
199 let sem = sem.clone();
200 let cancel = pipeline_cancel.clone();
201 tokio::spawn(async move {
202 let _permit = match &sem {
204 Some(s) => Some(s.acquire().await.expect("semaphore closed")), None => None,
206 };
207
208 let mut pipe = pipe_ref.load().clone_inner();
210
211 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
213 if let Some(tx) = reply_tx {
214 let _ = tx.send(Err(e));
215 }
216 return;
217 }
218
219 let result = pipe.call(exchange).await;
220 if let Some(tx) = reply_tx {
221 let _ = tx.send(result);
222 } else if let Err(ref e) = result
223 && !matches!(e, CamelError::Stopped)
224 {
225 error!("Pipeline error: {e}");
227 }
228 });
229 }
230 })
231 }
232 };
233 #[cfg(test)]
234 emit_start_route_event("pipeline_spawned");
235
236 let consumer_handle = consumer_management::spawn_consumer_task(
239 route_id.to_string(),
240 consumer,
241 consumer_ctx,
242 crash_notifier,
243 runtime_for_consumer,
244 false,
245 );
246 #[cfg(test)]
247 emit_start_route_event("consumer_spawned");
248
249 let managed = self
251 .routes
252 .get_mut(route_id)
253 .expect("invariant: route must exist after prior existence check"); managed.consumer_handle = Some(consumer_handle);
255 managed.pipeline_handle = Some(pipeline_handle);
256 managed.channel_sender = Some(tx_for_storage);
257
258 info!(route_id = %route_id, "Route started");
259 self.health_registry().mark_route_started(route_id);
260 Ok(())
261 }
262
263 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
264 self.stop_route_internal(route_id).await?;
265 self.health_registry().mark_route_stopped(route_id);
266 Ok(())
267 }
268
269 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
270 self.stop_route(route_id).await?;
271 tokio::time::sleep(Duration::from_millis(100)).await;
272 self.start_route(route_id).await
273 }
274
275 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
276 let managed = self
278 .routes
279 .get_mut(route_id)
280 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
281
282 let consumer_running = handle_is_running(&managed.consumer_handle);
283 let pipeline_running = handle_is_running(&managed.pipeline_handle);
284
285 if !consumer_running || !pipeline_running {
287 return Err(CamelError::RouteError(format!(
288 "Cannot suspend route '{}' with execution lifecycle {}",
289 route_id,
290 inferred_lifecycle_label(managed)
291 )));
292 }
293
294 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
295
296 let managed = self
298 .routes
299 .get_mut(route_id)
300 .expect("invariant: route must exist after prior existence check"); managed.consumer_cancel_token.cancel();
302
303 let managed = self
305 .routes
306 .get_mut(route_id)
307 .expect("invariant: route must exist after prior existence check"); let consumer_handle = managed.consumer_handle.take();
309
310 let timeout_result = tokio::time::timeout(DEFAULT_SHUTDOWN_TIMEOUT, async {
312 if let Some(handle) = consumer_handle {
313 let _ = handle.await;
314 }
315 })
316 .await;
317
318 if timeout_result.is_err() {
319 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
320 }
321
322 let managed = self
324 .routes
325 .get_mut(route_id)
326 .expect("invariant: route must exist after prior existence check"); managed.consumer_cancel_token = CancellationToken::new();
330
331 info!(route_id = %route_id, "Route suspended (pipeline still running)");
332 self.health_registry().mark_route_stopped(route_id);
333 Ok(())
334 }
335
336 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
337 let managed = self
339 .routes
340 .get(route_id)
341 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
342
343 let consumer_running = handle_is_running(&managed.consumer_handle);
344 let pipeline_running = handle_is_running(&managed.pipeline_handle);
345 if consumer_running || !pipeline_running {
346 return Err(CamelError::RouteError(format!(
347 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
348 route_id,
349 inferred_lifecycle_label(managed)
350 )));
351 }
352
353 let sender = managed.channel_sender.clone().ok_or_else(|| {
355 CamelError::RouteError("Suspended route has no channel sender".into())
356 })?;
357
358 let from_uri = managed.from_uri.clone();
360
361 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
362
363 let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
364 Arc::clone(&self.registry),
365 Arc::clone(&self.languages),
366 self.tracer_metrics
367 .clone()
368 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
369 Arc::clone(&self.platform_service),
370 self.health_registry(),
371 Some(route_id.to_string()),
372 ));
373 let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
374 Arc::clone(&consumer_component_ctx) as Arc<_>;
375 let (mut consumer, _) = consumer_management::create_route_consumer(
376 consumer_rt,
377 &self.registry,
378 &from_uri,
379 consumer_component_ctx.as_ref(),
380 )?;
381
382 let managed = self
384 .routes
385 .get(route_id)
386 .expect("invariant: route must exist after prior existence check"); if let (Some(sp_config), Some(authenticator)) = (
388 managed.compiled.security_policy.as_ref(),
389 managed.compiled.security_authenticator.as_ref(),
390 ) {
391 use camel_component_api::SecurityContext;
392 let sec_ctx =
393 SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
394 consumer.set_security_context(sec_ctx);
395 }
396
397 let managed = self
399 .routes
400 .get_mut(route_id)
401 .expect("invariant: route must exist after prior existence check"); let consumer_cancel = managed.consumer_cancel_token.child_token();
405
406 let crash_notifier = self.crash_notifier.clone();
407 let runtime_for_consumer = self.runtime.clone();
408
409 let consumer_ctx =
411 ConsumerContext::new(sender, consumer_cancel.clone(), route_id.to_string());
412
413 let consumer_handle = consumer_management::spawn_consumer_task(
415 route_id.to_string(),
416 consumer,
417 consumer_ctx,
418 crash_notifier,
419 runtime_for_consumer,
420 true,
421 );
422
423 let managed = self
425 .routes
426 .get_mut(route_id)
427 .expect("invariant: route must exist after prior existence check"); managed.consumer_handle = Some(consumer_handle);
429
430 info!(route_id = %route_id, "Route resumed");
431 self.health_registry().mark_route_started(route_id);
432 Ok(())
433 }
434
435 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
436 let route_ids: Vec<String> = {
439 let pairs = self.routes.auto_startup_sorted();
440 pairs.into_iter().map(|(id, _)| id).collect()
441 };
442
443 info!("Starting {} auto-startup routes", route_ids.len());
444
445 let mut errors: Vec<String> = Vec::new();
447 for route_id in route_ids {
448 if let Err(e) = self.start_route(&route_id).await {
449 errors.push(format!("Route '{}': {}", route_id, e));
450 }
451 }
452
453 if !errors.is_empty() {
454 return Err(CamelError::RouteError(format!(
455 "Failed to start routes: {}",
456 errors.join(", ")
457 )));
458 }
459
460 info!("All auto-startup routes started");
461 Ok(())
462 }
463
464 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
465 let route_ids: Vec<String> = {
467 let pairs = self.routes.shutdown_sorted();
468 pairs.into_iter().map(|(id, _)| id).collect()
469 };
470
471 info!("Stopping {} routes", route_ids.len());
472
473 for route_id in route_ids {
474 let _ = self.stop_route(&route_id).await;
475 }
476
477 info!("All routes stopped");
478 Ok(())
479 }
480}