camel_core/lifecycle/adapters/
route_controller_trait.rs1use std::sync::Arc;
7use std::time::Duration;
8
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11use tower::Service;
12use tracing::{error, info, warn};
13
14use camel_api::{CamelError, NoOpMetrics};
15use camel_component_api::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
16
17use crate::lifecycle::adapters::consumer_management;
18use crate::lifecycle::adapters::controller_component_context::ControllerComponentContext;
19use crate::lifecycle::adapters::route_controller::DefaultRouteController;
20#[cfg(test)]
21use crate::lifecycle::adapters::route_helpers::emit_start_route_event;
22use crate::lifecycle::adapters::route_helpers::{
23 handle_is_running, inferred_lifecycle_label, ready_with_backoff,
24};
25use crate::lifecycle::adapters::route_registry::DEFAULT_SHUTDOWN_TIMEOUT;
26
27#[async_trait::async_trait]
28impl camel_api::RouteController for DefaultRouteController {
29 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
30 {
32 let managed = self
33 .routes
34 .get_mut(route_id)
35 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
36
37 let consumer_running = handle_is_running(&managed.consumer_handle);
38 let pipeline_running = handle_is_running(&managed.pipeline_handle);
39 if consumer_running && pipeline_running {
40 return Ok(());
41 }
42 if !consumer_running && pipeline_running {
43 return Err(CamelError::RouteError(format!(
44 "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
45 route_id
46 )));
47 }
48 if consumer_running && !pipeline_running {
49 return Err(CamelError::RouteError(format!(
50 "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
51 route_id
52 )));
53 }
54 }
55
56 info!(route_id = %route_id, "Starting route");
57
58 let (from_uri, pipeline, concurrency) = {
60 let managed = self
61 .routes
62 .get(route_id)
63 .expect("invariant: route must exist after prior existence check"); (
65 managed.from_uri.clone(),
66 Arc::clone(&managed.pipeline),
67 managed.concurrency.clone(),
68 )
69 };
70
71 let crash_notifier = self.crash_notifier.clone();
73 let runtime_for_consumer = self.runtime.clone();
74
75 let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
76 Arc::clone(&self.registry),
77 Arc::clone(&self.languages),
78 self.tracer_metrics
79 .clone()
80 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
81 Arc::clone(&self.platform_service),
82 self.health_registry(),
83 Some(route_id.to_string()),
84 ));
85 let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
86 Arc::clone(&consumer_component_ctx) as Arc<_>;
87 let (mut consumer, consumer_concurrency) = consumer_management::create_route_consumer(
88 consumer_rt,
89 &self.registry,
90 &from_uri,
91 consumer_component_ctx.as_ref(),
92 )?;
93
94 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
96
97 let managed = self
99 .routes
100 .get_mut(route_id)
101 .expect("invariant: route must exist after prior existence check"); if let (Some(sp_config), Some(authenticator)) = (
105 managed.compiled.security_policy.as_ref(),
106 managed.compiled.security_authenticator.as_ref(),
107 ) {
108 use camel_component_api::SecurityContext;
109 let sec_ctx =
110 SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
111 consumer.set_security_context(sec_ctx);
112 }
113
114 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
116 let consumer_cancel = managed.consumer_cancel_token.child_token();
118 let pipeline_cancel = managed.pipeline_cancel_token.child_token();
119 let tx_for_storage = tx.clone();
121 let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone(), route_id.to_string());
122
123 let split_clone = managed.aggregate_split.clone();
125 if let Some(split) = split_clone {
126 return self
127 .start_aggregate_route(
128 route_id,
129 split,
130 consumer,
131 consumer_ctx,
132 rx,
133 crash_notifier,
134 runtime_for_consumer,
135 tx_for_storage,
136 pipeline_cancel,
137 )
138 .await;
139 }
140 let pipeline_handle = match effective_concurrency {
144 ConcurrencyModel::Sequential => {
145 tokio::spawn(async move {
146 loop {
147 let envelope = tokio::select! {
149 envelope = rx.recv() => match envelope {
150 Some(e) => e,
151 None => return, },
153 _ = pipeline_cancel.cancelled() => {
154 return;
156 }
157 };
158 let ExchangeEnvelope { exchange, reply_tx } = envelope;
159
160 let mut pipeline = pipeline.load().clone_inner();
162
163 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
164 if let Some(tx) = reply_tx {
165 let _ = tx.send(Err(e));
166 }
167 return;
168 }
169
170 let result = pipeline.call(exchange).await;
171 if let Some(tx) = reply_tx {
172 let _ = tx.send(result);
173 } else if let Err(ref e) = result {
174 error!("Pipeline error: {e}");
176 }
177 }
178 })
179 }
180 ConcurrencyModel::Concurrent { max } => {
181 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
182 tokio::spawn(async move {
183 loop {
184 let envelope = tokio::select! {
186 envelope = rx.recv() => match envelope {
187 Some(e) => e,
188 None => return, },
190 _ = pipeline_cancel.cancelled() => {
191 return;
193 }
194 };
195 let ExchangeEnvelope { exchange, reply_tx } = envelope;
196 let pipe_ref = Arc::clone(&pipeline);
197 let sem = sem.clone();
198 let cancel = pipeline_cancel.clone();
199 tokio::spawn(async move {
200 let _permit = match &sem {
202 Some(s) => Some(s.acquire().await.expect("semaphore closed")), None => None,
204 };
205
206 let mut pipe = pipe_ref.load().clone_inner();
208
209 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
211 if let Some(tx) = reply_tx {
212 let _ = tx.send(Err(e));
213 }
214 return;
215 }
216
217 let result = pipe.call(exchange).await;
218 if let Some(tx) = reply_tx {
219 let _ = tx.send(result);
220 } else if let Err(ref e) = result {
221 error!("Pipeline error: {e}");
223 }
224 });
225 }
226 })
227 }
228 };
229 #[cfg(test)]
230 emit_start_route_event("pipeline_spawned");
231
232 let consumer_handle = consumer_management::spawn_consumer_task(
235 route_id.to_string(),
236 consumer,
237 consumer_ctx,
238 crash_notifier,
239 runtime_for_consumer,
240 false,
241 );
242 #[cfg(test)]
243 emit_start_route_event("consumer_spawned");
244
245 let managed = self
247 .routes
248 .get_mut(route_id)
249 .expect("invariant: route must exist after prior existence check"); managed.consumer_handle = Some(consumer_handle);
251 managed.pipeline_handle = Some(pipeline_handle);
252 managed.channel_sender = Some(tx_for_storage);
253
254 info!(route_id = %route_id, "Route started");
255 self.health_registry().mark_route_started(route_id);
256 Ok(())
257 }
258
259 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
260 self.stop_route_internal(route_id).await?;
261 self.health_registry().mark_route_stopped(route_id);
262 Ok(())
263 }
264
265 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
266 self.stop_route(route_id).await?;
267 tokio::time::sleep(Duration::from_millis(100)).await;
268 self.start_route(route_id).await
269 }
270
271 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
272 let managed = self
274 .routes
275 .get_mut(route_id)
276 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
277
278 let consumer_running = handle_is_running(&managed.consumer_handle);
279 let pipeline_running = handle_is_running(&managed.pipeline_handle);
280
281 if !consumer_running || !pipeline_running {
283 return Err(CamelError::RouteError(format!(
284 "Cannot suspend route '{}' with execution lifecycle {}",
285 route_id,
286 inferred_lifecycle_label(managed)
287 )));
288 }
289
290 info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
291
292 let managed = self
294 .routes
295 .get_mut(route_id)
296 .expect("invariant: route must exist after prior existence check"); managed.consumer_cancel_token.cancel();
298
299 let managed = self
301 .routes
302 .get_mut(route_id)
303 .expect("invariant: route must exist after prior existence check"); let consumer_handle = managed.consumer_handle.take();
305
306 let timeout_result = tokio::time::timeout(DEFAULT_SHUTDOWN_TIMEOUT, async {
308 if let Some(handle) = consumer_handle {
309 let _ = handle.await;
310 }
311 })
312 .await;
313
314 if timeout_result.is_err() {
315 warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
316 }
317
318 let managed = self
320 .routes
321 .get_mut(route_id)
322 .expect("invariant: route must exist after prior existence check"); managed.consumer_cancel_token = CancellationToken::new();
326
327 info!(route_id = %route_id, "Route suspended (pipeline still running)");
328 self.health_registry().mark_route_stopped(route_id);
329 Ok(())
330 }
331
332 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
333 let managed = self
335 .routes
336 .get(route_id)
337 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
338
339 let consumer_running = handle_is_running(&managed.consumer_handle);
340 let pipeline_running = handle_is_running(&managed.pipeline_handle);
341 if consumer_running || !pipeline_running {
342 return Err(CamelError::RouteError(format!(
343 "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
344 route_id,
345 inferred_lifecycle_label(managed)
346 )));
347 }
348
349 let sender = managed.channel_sender.clone().ok_or_else(|| {
351 CamelError::RouteError("Suspended route has no channel sender".into())
352 })?;
353
354 let from_uri = managed.from_uri.clone();
356
357 info!(route_id = %route_id, "Resuming route (spawning consumer only)");
358
359 let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
360 Arc::clone(&self.registry),
361 Arc::clone(&self.languages),
362 self.tracer_metrics
363 .clone()
364 .unwrap_or_else(|| Arc::new(NoOpMetrics)),
365 Arc::clone(&self.platform_service),
366 self.health_registry(),
367 Some(route_id.to_string()),
368 ));
369 let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
370 Arc::clone(&consumer_component_ctx) as Arc<_>;
371 let (mut consumer, _) = consumer_management::create_route_consumer(
372 consumer_rt,
373 &self.registry,
374 &from_uri,
375 consumer_component_ctx.as_ref(),
376 )?;
377
378 let managed = self
380 .routes
381 .get(route_id)
382 .expect("invariant: route must exist after prior existence check"); if let (Some(sp_config), Some(authenticator)) = (
384 managed.compiled.security_policy.as_ref(),
385 managed.compiled.security_authenticator.as_ref(),
386 ) {
387 use camel_component_api::SecurityContext;
388 let sec_ctx =
389 SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
390 consumer.set_security_context(sec_ctx);
391 }
392
393 let managed = self
395 .routes
396 .get_mut(route_id)
397 .expect("invariant: route must exist after prior existence check"); let consumer_cancel = managed.consumer_cancel_token.child_token();
401
402 let crash_notifier = self.crash_notifier.clone();
403 let runtime_for_consumer = self.runtime.clone();
404
405 let consumer_ctx =
407 ConsumerContext::new(sender, consumer_cancel.clone(), route_id.to_string());
408
409 let consumer_handle = consumer_management::spawn_consumer_task(
411 route_id.to_string(),
412 consumer,
413 consumer_ctx,
414 crash_notifier,
415 runtime_for_consumer,
416 true,
417 );
418
419 let managed = self
421 .routes
422 .get_mut(route_id)
423 .expect("invariant: route must exist after prior existence check"); managed.consumer_handle = Some(consumer_handle);
425
426 info!(route_id = %route_id, "Route resumed");
427 self.health_registry().mark_route_started(route_id);
428 Ok(())
429 }
430
431 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
432 let route_ids: Vec<String> = {
435 let pairs = self.routes.auto_startup_sorted();
436 pairs.into_iter().map(|(id, _)| id).collect()
437 };
438
439 info!("Starting {} auto-startup routes", route_ids.len());
440
441 let mut errors: Vec<String> = Vec::new();
443 for route_id in route_ids {
444 if let Err(e) = self.start_route(&route_id).await {
445 errors.push(format!("Route '{}': {}", route_id, e));
446 }
447 }
448
449 if !errors.is_empty() {
450 return Err(CamelError::RouteError(format!(
451 "Failed to start routes: {}",
452 errors.join(", ")
453 )));
454 }
455
456 info!("All auto-startup routes started");
457 Ok(())
458 }
459
460 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
461 let route_ids: Vec<String> = {
463 let pairs = self.routes.shutdown_sorted();
464 pairs.into_iter().map(|(id, _)| id).collect()
465 };
466
467 info!("Stopping {} routes", route_ids.len());
468
469 for route_id in route_ids {
470 let _ = self.stop_route(&route_id).await;
471 }
472
473 info!("All routes stopped");
474 Ok(())
475 }
476}