1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::Bytes;
6use camel_api::datasource::DatasourceCatalog;
7use futures::TryStreamExt;
8use serde_json::Value as JsonValue;
9use sqlx::AnyPool;
10use sqlx::any::AnyPoolOptions;
11use sqlx::any::AnyRow;
12use tokio::sync::OnceCell;
13use tracing::{debug, error, info, warn};
14
15use camel_component_api::retry_async;
16use camel_component_api::{
17 Body, CamelError, Exchange, Message, RuntimeObservability, StreamBody, StreamMetadata,
18};
19use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
20
21use crate::config::{
22 PollStrategy, ProcessingStrategy, SqlEndpointConfig, SqlOutputType, TransactionMode,
23 enrich_db_url_with_ssl, redact_db_url,
24};
25use crate::headers;
26use crate::query::{QueryTemplate, parse_query_template, resolve_params};
27use crate::utils::{bind_json_values, is_retryable_sqlx_error, row_to_json};
28
29fn record_post_process_failure(
43 runtime: &dyn RuntimeObservability,
44 route_id: &str,
45 label: &str,
46 error: &CamelError,
47 message: &str,
48) {
49 runtime.metrics().increment_errors(route_id, label);
50 error!(error = %error, "{message}");
52}
53
54pub struct SqlConsumer {
55 pub(crate) config: SqlEndpointConfig,
56 pub(crate) pool: Arc<OnceCell<Arc<AnyPool>>>,
57 pub(crate) catalog: Option<Arc<dyn DatasourceCatalog>>,
58 stopped: bool,
59 runtime: Arc<dyn RuntimeObservability>,
62}
63
64impl SqlConsumer {
65 pub fn new(
66 config: SqlEndpointConfig,
67 pool: Arc<OnceCell<Arc<AnyPool>>>,
68 catalog: Option<Arc<dyn DatasourceCatalog>>,
69 runtime: Arc<dyn RuntimeObservability>,
70 ) -> Self {
71 Self {
72 config,
73 pool,
74 catalog,
75 stopped: false,
76 runtime,
77 }
78 }
79
80 async fn poll_database(
82 &self,
83 pool: &AnyPool,
84 context: &ConsumerContext,
85 template: &QueryTemplate,
86 ) -> Result<(), CamelError> {
87 let route_id = context.route_id();
89
90 let empty_exchange = Exchange::new(Message::default());
92
93 let prepared = resolve_params(template, &empty_exchange, &self.config.in_separator)?;
95
96 debug!(query = %prepared.sql, "executing SQL consumer poll");
97
98 if self.config.output_type == SqlOutputType::StreamList {
99 return self.poll_database_stream(pool, context, &prepared).await;
100 }
101
102 let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
103 let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
104 warn!(error = %e, "SQL consumer poll query failed");
105 CamelError::ProcessorError(format!("Query execution failed: {}", e))
106 })?;
107
108 debug!(rows = rows.len(), "SQL consumer poll completed");
109
110 if rows.is_empty() && !self.config.route_empty_result_set {
111 return Ok(());
112 }
113
114 let rows_to_process: Vec<AnyRow> = if let Some(max) = self.config.max_messages_per_poll {
115 if max > 0 {
116 rows.into_iter().take(max as usize).collect()
117 } else {
118 rows
119 }
120 } else {
121 rows
122 };
123
124 if self.config.use_iterator {
125 for row in rows_to_process {
127 let row_json = row_to_json(&row)?;
128
129 let mut msg = Message::new(Body::Json(row_json.clone()));
131
132 if let Some(obj) = row_json.as_object() {
134 for (key, value) in obj {
135 msg.set_header(format!("CamelSql.{}", key), value.clone());
136 }
137 }
138
139 let exchange = Exchange::new(msg);
140
141 let result = context.send_and_wait(exchange).await;
143
144 if let Err(e) = self.handle_post_processing(pool, &result, &row_json).await {
146 record_post_process_failure(
147 self.runtime.as_ref(),
148 route_id,
149 "b-prime:sql:on-consume",
150 &e,
151 "Post-processing failed",
152 );
153 if self.config.break_batch_on_consume_fail {
154 return Err(e);
155 }
156 }
157
158 if let Err(ref consume_err) = result
160 && self.config.break_batch_on_consume_fail
161 {
162 return Err(consume_err.clone());
163 }
164 }
165 } else {
166 let rows_json: Vec<JsonValue> = rows_to_process
168 .iter()
169 .map(row_to_json)
170 .collect::<Result<Vec<_>, CamelError>>()?;
171
172 let row_count = rows_json.len();
173
174 let mut msg = Message::new(Body::Json(JsonValue::Array(rows_json.clone())));
176 msg.set_header(headers::ROW_COUNT, JsonValue::Number(row_count.into()));
177
178 let exchange = Exchange::new(msg);
179
180 let result = context.send_and_wait(exchange).await;
182
183 for row_json in rows_json.iter() {
187 if let Err(e) = self.handle_post_processing(pool, &result, row_json).await {
188 record_post_process_failure(
189 self.runtime.as_ref(),
190 route_id,
191 "b-prime:sql:on-consume-batch",
192 &e,
193 "Post-processing failed for batch row",
194 );
195 if self.config.break_batch_on_consume_fail {
196 return Err(e);
197 }
198 }
199 }
200
201 if let Err(ref consume_err) = result
203 && self.config.break_batch_on_consume_fail
204 {
205 return Err(consume_err.clone());
206 }
207 }
208
209 if let Some(ref batch_query) = self.config.on_consume_batch_complete {
211 let _ = self
212 .execute_post_query(pool, batch_query, &JsonValue::Null)
213 .await;
214 }
215
216 Ok(())
217 }
218
219 async fn poll_database_stream(
220 &self,
221 pool: &AnyPool,
222 context: &ConsumerContext,
223 prepared: &crate::query::PreparedQuery,
224 ) -> Result<(), CamelError> {
225 let pool_clone = pool.clone();
226 let sql_str = prepared.sql.clone();
227 let bindings = prepared.bindings.clone();
228
229 let byte_stream = async_stream::try_stream! {
230 let mut q = sqlx::query(&sql_str);
231 q = bind_json_values(q, &bindings);
232 let mut rows = q.fetch(&pool_clone);
233 while let Some(row) = rows.try_next().await.map_err(|e| {
234 CamelError::ProcessorError(format!("Query execution failed: {}", e))
235 })? {
236 let json_val = row_to_json(&row).map_err(|e| {
237 CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
238 })?;
239 let mut bytes = serde_json::to_vec(&json_val)
240 .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
241 bytes.push(b'\n');
242 yield Bytes::from(bytes);
243 }
244 };
245
246 let msg = Message::new(Body::Stream(StreamBody {
247 stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
248 metadata: StreamMetadata {
249 content_type: Some("application/x-ndjson".to_string()),
250 size_hint: None,
251 origin: None,
252 },
253 }));
254
255 let exchange = Exchange::new(msg);
256 let result = context.send_and_wait(exchange).await;
257 if let Err(e) = result {
258 record_post_process_failure(
259 self.runtime.as_ref(),
260 context.route_id(),
261 "b-prime:sql:stream-list",
262 &e,
263 "StreamList consumer downstream processing failed",
264 );
265 return Err(e);
266 }
267
268 debug!("StreamList: consumer poll completed (lazy stream emitted)");
269 Ok(())
270 }
271
272 async fn handle_post_processing(
274 &self,
275 pool: &AnyPool,
276 result: &Result<Exchange, CamelError>,
277 row_json: &JsonValue,
278 ) -> Result<(), CamelError> {
279 match result {
280 Ok(_) => {
281 if let Some(ref on_consume) = self.config.on_consume {
283 self.execute_post_query(pool, on_consume, row_json).await?;
284 }
285 }
286 Err(_) => {
287 if let Some(ref on_consume_failed) = self.config.on_consume_failed {
289 self.execute_post_query(pool, on_consume_failed, row_json)
290 .await?;
291 }
292 }
293 }
294 Ok(())
295 }
296
297 async fn execute_post_query(
299 &self,
300 pool: &AnyPool,
301 query_str: &str,
302 row_json: &JsonValue,
303 ) -> Result<(), CamelError> {
304 let template = parse_query_template(query_str, self.config.placeholder)?;
306
307 let mut temp_msg = Message::new(Body::Json(row_json.clone()));
310 if let Some(obj) = row_json.as_object() {
311 for (key, value) in obj {
312 temp_msg.set_header(format!("CamelSql.{}", key), value.clone());
313 }
314 }
315 let temp_exchange = Exchange::new(temp_msg);
316
317 let prepared = resolve_params(&template, &temp_exchange, &self.config.in_separator)?;
319
320 let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
322 let result = query.execute(pool).await.map_err(|e| {
323 CamelError::ProcessorError(format!("Post-query execution failed: {}", e))
324 })?;
325
326 if result.rows_affected() == 0 {
328 warn!(
329 query = query_str,
330 "Post-processing query affected 0 rows — the row may not have been marked correctly"
331 );
332 }
333
334 Ok(())
335 }
336
337 async fn handle_poll_result(
340 &self,
341 pool: &AnyPool,
342 context: &ConsumerContext,
343 template: &QueryTemplate,
344 ) {
345 if let Err(e) = self.poll_database(pool, context, template).await {
346 if self.config.bridge_error_handler {
347 warn!(error = %e, "SQL consumer poll failed (bridged)");
351 if let Err(route_err) = self.bridge_poll_error(context, e).await {
352 error!(error = %route_err, "Failed to bridge SQL consumer error to route");
355 }
356 } else {
357 record_post_process_failure(
358 self.runtime.as_ref(),
359 context.route_id(),
360 "b-prime:sql:poll-failed",
361 &e,
362 "SQL consumer poll failed",
363 );
364 }
365 }
366 }
367
368 async fn bridge_poll_error(
369 &self,
370 context: &ConsumerContext,
371 error: CamelError,
372 ) -> Result<(), CamelError> {
373 if !self.config.bridge_error_handler {
374 return Ok(());
375 }
376 let mut exchange = Exchange::new(Message::default());
377 exchange.set_error(error);
378 context.send_and_wait(exchange).await.map(|_| ())
379 }
380}
381
382#[async_trait]
383impl Consumer for SqlConsumer {
384 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
385 if self.stopped {
387 return Err(CamelError::Config(
388 "SQL consumer cannot be restarted after stop".into(),
389 ));
390 }
391
392 let route_id = context.route_id().to_string();
394 let catalog = self.catalog.clone();
395 let ds_name = self.config.datasource_name.clone();
396
397 self.config.resolve_defaults();
399 self.config.resolve_file_query().await?;
400
401 let pool = self
402 .pool
403 .get_or_try_init(|| async {
404 if let (Some(ref cat), Some(ref name)) = (catalog, ds_name) {
406 let handle = cat.get_pool(name).await?;
407 return handle.downcast::<AnyPool>();
408 }
409
410 sqlx::any::install_default_drivers();
413 let db_url = enrich_db_url_with_ssl(&self.config.db_url, &self.config)?;
414
415 let max_conn = self.config.max_connections.ok_or_else(|| {
416 CamelError::Config("max_connections not resolved for SQL consumer pool".into())
417 })?;
418 let min_conn = self.config.min_connections.ok_or_else(|| {
419 CamelError::Config("min_connections not resolved for SQL consumer pool".into())
420 })?;
421 let idle_timeout = self.config.idle_timeout_secs.ok_or_else(|| {
422 CamelError::Config(
423 "idle_timeout_secs not resolved for SQL consumer pool".into(),
424 )
425 })?;
426 let max_lifetime = self.config.max_lifetime_secs.ok_or_else(|| {
427 CamelError::Config(
428 "max_lifetime_secs not resolved for SQL consumer pool".into(),
429 )
430 })?;
431
432 info!(
433 db_url = %redact_db_url(&self.config.db_url),
434 "SQL consumer pool initializing"
435 );
436 let retry_policy = &self.config.retry;
437 let pool = retry_async::<_, _, _, _, sqlx::Error>(
438 retry_policy,
439 Some("sql-consumer"),
440 || {
441 async {
442 AnyPoolOptions::new()
443 .max_connections(max_conn)
444 .min_connections(min_conn)
445 .idle_timeout(Duration::from_secs(idle_timeout))
446 .max_lifetime(Duration::from_secs(max_lifetime))
447 .connect(&db_url)
448 .await
449 }
450 },
451 is_retryable_sqlx_error,
452 )
453 .await
454 .map_err(|e| {
455 self.runtime.health().force_unhealthy_for_route(
456 &route_id,
457 "g:sql:consumer-pool-init",
458 &e.to_string(),
459 );
460 error!(error = %e, db_url = %redact_db_url(&self.config.db_url), "SQL connect failed, giving up");
462 CamelError::EndpointCreationFailed(format!(
463 "Failed to connect to database: {}",
464 e
465 ))
466 })?;
467 Ok(Arc::new(pool))
468 })
469 .await?;
470
471 if self.config.transaction_mode == TransactionMode::Managed {
473 warn!("transactionManager not yet implemented; using Auto mode");
474 }
475
476 if self.config.processing_strategy == ProcessingStrategy::Scheduled {
478 debug!(
479 "Processing strategy: Scheduled (rows dispatched individually via send_and_wait)"
480 );
481 }
482 if self.config.poll_strategy == PollStrategy::Burst {
483 debug!("Poll strategy: Burst (rapid successive polls)");
484 }
485
486 if self.config.output_type == SqlOutputType::StreamList
487 && (self.config.on_consume.is_some()
488 || self.config.on_consume_failed.is_some()
489 || self.config.on_consume_batch_complete.is_some())
490 {
491 warn!(
492 "onConsume/onConsumeFailed/onConsumeBatchComplete are not executed in StreamList mode \
493 (rows are consumed lazily downstream)"
494 );
495 }
496
497 if self.config.on_consume.is_none() {
499 warn!(
500 "SQL consumer started without onConsume configured — consumed rows will not be marked/deleted"
501 );
502 }
503
504 info!(
505 db_url = %redact_db_url(&self.config.db_url),
506 query_len = self.config.query.len(),
507 "SQL consumer started"
508 );
509
510 let template = parse_query_template(&self.config.query, self.config.placeholder)
512 .map_err(|e| CamelError::Config(format!("Invalid query template: {}", e)))?;
513
514 if self.config.initial_delay_ms > 0 {
516 tokio::select! {
517 _ = context.cancelled() => {
518 info!("SQL consumer stopped during initial delay");
519 return Ok(());
520 }
521 _ = tokio::time::sleep(Duration::from_millis(self.config.initial_delay_ms)) => {}
522 }
523 }
524
525 let mut poll_count: u32 = 0;
541 loop {
542 if let Some(max_repeats) = self.config.repeat_count
544 && poll_count >= max_repeats
545 {
546 info!(
547 repeat_count = max_repeats,
548 "SQL consumer reached repeat_count limit, stopping"
549 );
550 break;
551 }
552
553 tokio::select! {
554 _ = context.cancelled() => {
555 info!("SQL consumer stopped");
556 break;
557 }
558 _ = tokio::time::sleep(Duration::from_millis(self.config.delay_ms)) => {
559 poll_count += 1;
560 self.handle_poll_result(pool.as_ref(), &context, &template).await;
561 }
562 }
563 }
564
565 Ok(())
566 }
567
568 async fn stop(&mut self) -> Result<(), CamelError> {
569 if self.stopped {
571 debug!("SQL consumer stop called on already-stopped consumer");
572 return Ok(());
573 }
574
575 if let Some(pool) = self.pool.get() {
577 debug!("SQL consumer closing connection pool");
578 pool.close().await;
579 debug!("SQL consumer pool closed");
580 }
581
582 self.stopped = true;
583 info!("SQL consumer stopped");
584 Ok(())
585 }
586
587 fn concurrency_model(&self) -> ConcurrencyModel {
588 ConcurrencyModel::Sequential
592 }
593}
594
595#[cfg(test)]
596mod tests {
597 use super::*;
598 use camel_api::MetricsCollector;
599 use camel_component_api::HealthCheckRegistry;
600 use camel_component_api::test_support::PanicRuntimeObservability;
601 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
602 std::sync::Arc::new(PanicRuntimeObservability)
603 }
604 use crate::config::SqlEndpointConfig;
605 use camel_component_api::ExchangeEnvelope;
606 use camel_component_api::UriConfig;
607 use sqlx::any::AnyPoolOptions;
608 use std::sync::Arc;
609 use std::sync::Mutex;
610 use std::time::Duration;
611 use tokio::sync::mpsc;
612 use tokio_util::sync::CancellationToken;
613
614 struct RecordingMetrics {
619 errors: Arc<Mutex<Vec<(String, String)>>>,
620 }
621
622 impl MetricsCollector for RecordingMetrics {
623 fn record_exchange_duration(&self, _: &str, _: Duration) {}
624 fn increment_errors(&self, route_id: &str, error_type: &str) {
625 self.errors
626 .lock()
627 .unwrap()
628 .push((route_id.to_string(), error_type.to_string()));
629 }
630 fn increment_exchanges(&self, _: &str) {}
631 fn set_queue_depth(&self, _: &str, _: usize) {}
632 fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
633 }
634
635 struct RecordingRuntime {
636 metrics_collector: Arc<RecordingMetrics>,
637 }
638
639 impl RecordingRuntime {
640 fn new(errors: Arc<Mutex<Vec<(String, String)>>>) -> Self {
641 Self {
642 metrics_collector: Arc::new(RecordingMetrics { errors }),
643 }
644 }
645 }
646
647 impl RuntimeObservability for RecordingRuntime {
648 fn metrics(&self) -> Arc<dyn MetricsCollector> {
649 self.metrics_collector.clone() as Arc<dyn MetricsCollector>
650 }
651 fn health(&self) -> Arc<dyn HealthCheckRegistry> {
652 panic!("RecordingRuntime::health not used in this test")
653 }
654 }
655
656 #[tracing_test::traced_test]
660 #[test]
661 fn record_post_process_failure_increments_errors_and_emits_error_log() {
662 let errors: Arc<Mutex<Vec<(String, String)>>> = Arc::new(Mutex::new(Vec::new()));
663 let runtime = Arc::new(RecordingRuntime::new(Arc::clone(&errors)));
664 let error = CamelError::ProcessorError("test failure".to_string());
665
666 record_post_process_failure(
668 runtime.as_ref(),
669 "test-route",
670 "b-prime:sql:on-consume",
671 &error,
672 "Post-processing failed",
673 );
674
675 let recorded = errors.lock().unwrap();
677 assert_eq!(recorded.len(), 1, "expected 1 increment_errors call");
678 assert_eq!(recorded[0].0, "test-route");
679 assert_eq!(recorded[0].1, "b-prime:sql:on-consume");
680 drop(recorded);
681
682 assert!(logs_contain("ERROR"), "helper must emit error! log");
684 assert!(
685 logs_contain("Post-processing failed"),
686 "helper must include the message in the log"
687 );
688 }
689
690 async fn sqlite_pool() -> AnyPool {
691 sqlx::any::install_default_drivers();
692 AnyPoolOptions::new()
693 .max_connections(1)
694 .connect("sqlite::memory:")
695 .await
696 .expect("sqlite pool")
697 }
698
699 async fn seed_consumer_table(pool: &AnyPool) {
700 sqlx::query("CREATE TABLE jobs (id INTEGER PRIMARY KEY, processed INTEGER DEFAULT 0, failed INTEGER DEFAULT 0)")
701 .execute(pool)
702 .await
703 .expect("create table");
704 sqlx::query("INSERT INTO jobs (id, processed, failed) VALUES (1, 0, 0), (2, 0, 0)")
705 .execute(pool)
706 .await
707 .expect("seed rows");
708 }
709
710 fn config() -> SqlEndpointConfig {
711 let mut c =
712 SqlEndpointConfig::from_uri("sql:select * from t?db_url=postgres://localhost/test")
713 .unwrap();
714 c.resolve_defaults();
715 c
716 }
717
718 #[test]
719 fn consumer_concurrency_model() {
720 let c = SqlConsumer::new(config(), Arc::new(OnceCell::new()), None, test_rt());
721 assert_eq!(c.concurrency_model(), ConcurrencyModel::Sequential);
722 }
723
724 #[test]
725 fn consumer_stores_config() {
726 let mut config = SqlEndpointConfig::from_uri(
727 "sql:select * from t?db_url=postgres://localhost/test&delay=2000&onConsume=update t set done=true"
728 ).unwrap();
729 config.resolve_defaults();
730 let c = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
731 assert_eq!(c.config.delay_ms, 2000);
732 assert!(c.config.on_consume.is_some());
733 }
734
735 #[tokio::test]
736 async fn poll_database_runs_on_consume_for_successful_rows() {
737 let pool = sqlite_pool().await;
738 seed_consumer_table(&pool).await;
739
740 let mut config = SqlEndpointConfig::from_uri(
741 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
742 )
743 .unwrap();
744 config.resolve_defaults();
745
746 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
747 let template = parse_query_template(&config.query, config.placeholder).unwrap();
748
749 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
750 tokio::spawn(async move {
751 while let Some(env) = rx.recv().await {
752 if let Some(reply_tx) = env.reply_tx {
753 let _ = reply_tx.send(Ok(env.exchange));
754 }
755 }
756 });
757 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
758
759 consumer
760 .poll_database(&pool, &ctx, &template)
761 .await
762 .expect("poll must succeed");
763
764 let row = sqlx::query("select processed from jobs where id = 1")
765 .fetch_one(&pool)
766 .await
767 .expect("row 1");
768 let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
769
770 let row = sqlx::query("select processed from jobs where id = 2")
771 .fetch_one(&pool)
772 .await
773 .expect("row 2");
774 let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
775
776 assert_eq!(processed_1, 1);
777 assert_eq!(processed_2, 1);
778 }
779
780 #[tokio::test]
781 async fn poll_database_runs_on_consume_failed_when_downstream_fails() {
782 let pool = sqlite_pool().await;
783 seed_consumer_table(&pool).await;
784
785 let mut config = SqlEndpointConfig::from_uri(
786 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&initialDelay=0&delay=1",
787 )
788 .unwrap();
789 config.resolve_defaults();
790
791 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
792 let template = parse_query_template(&config.query, config.placeholder).unwrap();
793
794 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
795 tokio::spawn(async move {
796 while let Some(env) = rx.recv().await {
797 if let Some(reply_tx) = env.reply_tx {
798 let _ =
799 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
800 }
801 }
802 });
803 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
804
805 consumer
806 .poll_database(&pool, &ctx, &template)
807 .await
808 .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
809
810 let row = sqlx::query("select failed from jobs where id = 1")
811 .fetch_one(&pool)
812 .await
813 .expect("row 1");
814 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
815
816 let row = sqlx::query("select failed from jobs where id = 2")
817 .fetch_one(&pool)
818 .await
819 .expect("row 2");
820 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
821
822 assert_eq!(failed_1, 1);
823 assert_eq!(failed_2, 1);
824 }
825
826 #[tokio::test]
827 async fn poll_database_breaks_batch_on_consume_fail() {
828 let pool = sqlite_pool().await;
829 seed_consumer_table(&pool).await;
830
831 let mut config = SqlEndpointConfig::from_uri(
832 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&breakBatchOnConsumeFail=true&initialDelay=0&delay=1",
833 )
834 .unwrap();
835 config.resolve_defaults();
836
837 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
838 let template = parse_query_template(&config.query, config.placeholder).unwrap();
839
840 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
841 tokio::spawn(async move {
842 while let Some(env) = rx.recv().await {
843 if let Some(reply_tx) = env.reply_tx {
844 let _ =
845 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
846 }
847 }
848 });
849 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
850
851 let err = consumer
852 .poll_database(&pool, &ctx, &template)
853 .await
854 .expect_err("must stop on first downstream failure");
855 assert!(err.to_string().contains("downstream boom"));
856
857 let row = sqlx::query("select failed from jobs where id = 1")
858 .fetch_one(&pool)
859 .await
860 .expect("row 1");
861 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
862
863 let row = sqlx::query("select failed from jobs where id = 2")
864 .fetch_one(&pool)
865 .await
866 .expect("row 2");
867 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
868
869 assert_eq!(failed_1, 1);
870 assert_eq!(failed_2, 0, "second row must not be processed");
871 }
872
873 #[tokio::test]
879 async fn consumer_no_panic_without_prior_resolve_defaults() {
880 let config = SqlEndpointConfig::from_uri(
881 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
882 )
883 .unwrap();
884 assert!(config.max_connections.is_none());
886
887 let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), None, test_rt());
888 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
889 tokio::spawn(async move {
890 while let Some(env) = rx.recv().await {
891 if let Some(reply_tx) = env.reply_tx {
892 let _ = reply_tx.send(Ok(env.exchange));
893 }
894 }
895 });
896 let token = CancellationToken::new();
897 let ctx = ConsumerContext::new(tx, token.clone(), "sql-test-route".to_string());
898
899 let consumer_handle = tokio::spawn(async move { consumer.start(ctx).await });
901
902 tokio::time::sleep(Duration::from_millis(50)).await;
904 token.cancel();
905
906 let result = consumer_handle.await.expect("task should not panic");
907 let _ = result;
909 }
910
911 #[tokio::test]
913 async fn stop_closes_pool() {
914 let pool = sqlite_pool().await;
915 seed_consumer_table(&pool).await;
916
917 let mut config = SqlEndpointConfig::from_uri(
918 "sql:select id from jobs?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
919 )
920 .unwrap();
921 config.resolve_defaults();
922
923 let pool_cell = Arc::new(OnceCell::new());
924 pool_cell.set(Arc::new(pool.clone())).unwrap();
925
926 let mut consumer = SqlConsumer::new(config, pool_cell, None, test_rt());
927 consumer.stop().await.expect("stop should succeed");
928
929 assert!(
931 pool.is_closed(),
932 "Pool should be closed after consumer.stop()"
933 );
934 }
935
936 #[tokio::test]
938 async fn double_stop_is_safe() {
939 let pool = sqlite_pool().await;
940 let mut config = SqlEndpointConfig::from_uri(
941 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
942 )
943 .unwrap();
944 config.resolve_defaults();
945
946 let pool_cell = Arc::new(OnceCell::new());
947 pool_cell.set(Arc::new(pool.clone())).unwrap();
948
949 let mut consumer = SqlConsumer::new(config, pool_cell, None, test_rt());
950 consumer.stop().await.expect("first stop should succeed");
951 consumer
952 .stop()
953 .await
954 .expect("second stop should also succeed");
955 }
956
957 #[tokio::test]
959 async fn start_after_stop_rejected() {
960 let pool = sqlite_pool().await;
961 let mut config = SqlEndpointConfig::from_uri(
962 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
963 )
964 .unwrap();
965 config.resolve_defaults();
966
967 let pool_cell = Arc::new(OnceCell::new());
968 pool_cell.set(Arc::new(pool.clone())).unwrap();
969
970 let mut consumer = SqlConsumer::new(config, pool_cell, None, test_rt());
971 consumer.stop().await.expect("stop should succeed");
972
973 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
974 tokio::spawn(async move {
975 while let Some(env) = rx.recv().await {
976 if let Some(reply_tx) = env.reply_tx {
977 let _ = reply_tx.send(Ok(env.exchange));
978 }
979 }
980 });
981 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
982
983 let result = consumer.start(ctx).await;
984 assert!(result.is_err());
985 let err_msg = result.unwrap_err().to_string();
986 assert!(
987 err_msg.contains("cannot be restarted") || err_msg.contains("after stop"),
988 "Expected restart error, got: {}",
989 err_msg
990 );
991 }
992
993 #[tokio::test]
995 async fn batch_mode_per_row_post_processing() {
996 let pool = sqlite_pool().await;
997 seed_consumer_table(&pool).await;
998
999 let mut config = SqlEndpointConfig::from_uri(
1000 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
1001 )
1002 .unwrap();
1003 config.resolve_defaults();
1004
1005 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
1006 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1007
1008 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
1009 tokio::spawn(async move {
1010 while let Some(env) = rx.recv().await {
1011 if let Some(reply_tx) = env.reply_tx {
1012 let _ = reply_tx.send(Ok(env.exchange));
1013 }
1014 }
1015 });
1016 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1017
1018 consumer
1019 .poll_database(&pool, &ctx, &template)
1020 .await
1021 .expect("poll must succeed");
1022
1023 let row = sqlx::query("select processed from jobs where id = 1")
1025 .fetch_one(&pool)
1026 .await
1027 .expect("row 1");
1028 let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
1029
1030 let row = sqlx::query("select processed from jobs where id = 2")
1031 .fetch_one(&pool)
1032 .await
1033 .expect("row 2");
1034 let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
1035
1036 assert_eq!(
1037 processed_1, 1,
1038 "row 1 should be marked processed via per-row onConsume"
1039 );
1040 assert_eq!(
1041 processed_2, 1,
1042 "row 2 should be marked processed via per-row onConsume"
1043 );
1044 }
1045
1046 #[tokio::test]
1048 async fn batch_mode_per_row_post_processing_on_failure() {
1049 let pool = sqlite_pool().await;
1050 seed_consumer_table(&pool).await;
1051
1052 let mut config = SqlEndpointConfig::from_uri(
1053 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
1054 )
1055 .unwrap();
1056 config.resolve_defaults();
1057
1058 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
1059 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1060
1061 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
1062 tokio::spawn(async move {
1063 while let Some(env) = rx.recv().await {
1064 if let Some(reply_tx) = env.reply_tx {
1065 let _ =
1066 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
1067 }
1068 }
1069 });
1070 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1071
1072 consumer
1073 .poll_database(&pool, &ctx, &template)
1074 .await
1075 .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
1076
1077 let row = sqlx::query("select failed from jobs where id = 1")
1079 .fetch_one(&pool)
1080 .await
1081 .expect("row 1");
1082 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
1083
1084 let row = sqlx::query("select failed from jobs where id = 2")
1085 .fetch_one(&pool)
1086 .await
1087 .expect("row 2");
1088 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
1089
1090 assert_eq!(
1091 failed_1, 1,
1092 "row 1 should be marked failed via per-row onConsumeFailed"
1093 );
1094 assert_eq!(
1095 failed_2, 1,
1096 "row 2 should be marked failed via per-row onConsumeFailed"
1097 );
1098 }
1099
1100 #[tokio::test]
1101 async fn bridge_error_handler_routes_poll_errors_to_exchange_error() {
1102 let mut config = config();
1103 config.bridge_error_handler = true;
1104 let consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), None, test_rt());
1105
1106 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1107 tokio::spawn(async move {
1108 while let Some(env) = rx.recv().await {
1109 assert!(env.exchange.error.is_some(), "exchange must carry error");
1110 if let Some(reply_tx) = env.reply_tx {
1111 let _ = reply_tx.send(Ok(env.exchange));
1112 }
1113 break;
1114 }
1115 });
1116
1117 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1118 consumer
1119 .bridge_poll_error(&ctx, CamelError::ProcessorError("poll failed".into()))
1120 .await
1121 .expect("bridging should succeed");
1122 }
1123
1124 #[tracing_test::traced_test]
1128 #[tokio::test]
1129 async fn bridged_poll_failure_emits_warn_not_error() {
1130 let pool = sqlite_pool().await;
1131 let mut config = config();
1136 config.bridge_error_handler = true;
1137 config.query = "select * from nonexistent_table".to_string();
1139 config.resolve_defaults();
1140 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
1141 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1142
1143 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1146 tokio::spawn(async move {
1147 while let Some(env) = rx.recv().await {
1148 if let Some(reply_tx) = env.reply_tx {
1149 let _ = reply_tx.send(Ok(env.exchange));
1150 }
1151 }
1152 });
1153 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1154
1155 consumer.handle_poll_result(&pool, &ctx, &template).await;
1157
1158 assert!(
1160 !logs_contain("ERROR"),
1161 "bridged poll failure must not emit ERROR (handler owns it); check captured logs for stray ERROR lines"
1162 );
1163 assert!(
1165 logs_contain("WARN"),
1166 "bridged poll failure should emit warn! for operator visibility"
1167 );
1168 }
1169
1170 #[tracing_test::traced_test]
1180 #[tokio::test]
1181 async fn unbridged_send_and_wait_failure_emits_error_loud() {
1182 let pool = sqlite_pool().await;
1183 sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
1184 .execute(&pool)
1185 .await
1186 .expect("create table");
1187 sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha')")
1188 .execute(&pool)
1189 .await
1190 .expect("seed rows");
1191
1192 let mut config = SqlEndpointConfig::from_uri(
1193 "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1194 )
1195 .unwrap();
1196 config.resolve_defaults();
1197 config.bridge_error_handler = false;
1199 let consumer = SqlConsumer::new(
1200 config.clone(),
1201 Arc::new(OnceCell::new()),
1202 None,
1203 Arc::new(RecordingRuntime::new(Arc::new(Mutex::new(Vec::new())))),
1204 );
1205 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1206
1207 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
1209 tokio::spawn(async move {
1210 while let Some(env) = rx.recv().await {
1211 if let Some(reply_tx) = env.reply_tx {
1212 let _ = reply_tx.send(Err(CamelError::ProcessorError("boom".into())));
1213 }
1214 }
1215 });
1216 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1217
1218 let _ = consumer.poll_database(&pool, &ctx, &template).await;
1219
1220 assert!(
1222 logs_contain("ERROR"),
1223 "unbridged send_and_wait failure MUST emit ERROR (consumer owns the signal)"
1224 );
1225 }
1226
1227 #[tracing_test::traced_test]
1230 #[tokio::test]
1231 async fn unbridged_handle_poll_result_emits_error_loud() {
1232 let pool = sqlite_pool().await;
1233 let mut config = config();
1236 config.bridge_error_handler = false;
1237 config.query = "select * from nonexistent_table".to_string();
1238 config.resolve_defaults();
1239 let consumer = SqlConsumer::new(
1240 config.clone(),
1241 Arc::new(OnceCell::new()),
1242 None,
1243 Arc::new(RecordingRuntime::new(Arc::new(Mutex::new(Vec::new())))),
1244 );
1245 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1246
1247 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1249 tokio::spawn(async move {
1250 while let Some(env) = rx.recv().await {
1251 if let Some(reply_tx) = env.reply_tx {
1252 let _ = reply_tx.send(Ok(env.exchange));
1253 }
1254 }
1255 });
1256 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1257
1258 consumer.handle_poll_result(&pool, &ctx, &template).await;
1259
1260 assert!(
1261 logs_contain("ERROR"),
1262 "unbridged handle_poll_result failure MUST emit ERROR (consumer owns signal)"
1263 );
1264 }
1265
1266 #[tokio::test]
1267 async fn stream_list_consumer_emits_ndjson_body() {
1268 let pool = sqlite_pool().await;
1269 sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
1270 .execute(&pool)
1271 .await
1272 .expect("create table");
1273 sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')")
1274 .execute(&pool)
1275 .await
1276 .expect("seed rows");
1277
1278 let mut config = SqlEndpointConfig::from_uri(
1279 "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1280 )
1281 .unwrap();
1282 config.resolve_defaults();
1283
1284 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
1285 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1286
1287 let (tx, rx) = mpsc::channel::<ExchangeEnvelope>(8);
1288 let (result_tx, result_rx) = tokio::sync::oneshot::channel::<Exchange>();
1289 tokio::spawn(async move {
1290 let mut rx = rx;
1291 if let Some(env) = rx.recv().await {
1292 if let Some(reply_tx) = env.reply_tx {
1293 let _ = reply_tx.send(Ok(env.exchange.clone()));
1294 }
1295 let _ = result_tx.send(env.exchange);
1296 }
1297 });
1298 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1299
1300 consumer
1301 .poll_database(&pool, &ctx, &template)
1302 .await
1303 .expect("poll must succeed");
1304
1305 let exchange = result_rx.await.expect("should have received one exchange");
1306
1307 match exchange.input.body {
1308 Body::Stream(ref stream_body) => {
1309 let stream = stream_body.stream.clone();
1310 let mut guard = stream.lock().await;
1311 let stream_opt = guard.take();
1312 assert!(stream_opt.is_some(), "stream should be present");
1313
1314 use futures::StreamExt;
1315 let mut collected = Vec::new();
1316 let mut stream = stream_opt.unwrap();
1317 while let Some(chunk) = stream.next().await {
1318 let chunk = chunk.expect("stream chunk should not error");
1319 collected.extend_from_slice(&chunk);
1320 }
1321
1322 let ndjson = String::from_utf8(collected).expect("valid utf8");
1323 let lines: Vec<&str> = ndjson.trim().lines().collect();
1324 assert_eq!(lines.len(), 3, "should have 3 NDJSON lines");
1325
1326 let row0: serde_json::Value =
1327 serde_json::from_str(lines[0]).expect("valid json line 0");
1328 assert_eq!(row0["id"], 1);
1329 assert_eq!(row0["name"], "alpha");
1330
1331 let row1: serde_json::Value =
1332 serde_json::from_str(lines[1]).expect("valid json line 1");
1333 assert_eq!(row1["id"], 2);
1334 assert_eq!(row1["name"], "beta");
1335
1336 let row2: serde_json::Value =
1337 serde_json::from_str(lines[2]).expect("valid json line 2");
1338 assert_eq!(row2["id"], 3);
1339 assert_eq!(row2["name"], "gamma");
1340 }
1341 ref other => panic!("expected Body::Stream, got {:?}", other),
1342 }
1343 }
1344
1345 #[tokio::test]
1346 async fn stream_list_consumer_empty_result_set_emits_empty_stream() {
1347 let pool = sqlite_pool().await;
1348 sqlx::query("CREATE TABLE empty_items (id INTEGER PRIMARY KEY, name TEXT)")
1349 .execute(&pool)
1350 .await
1351 .expect("create table");
1352
1353 let mut config = SqlEndpointConfig::from_uri(
1354 "sql:select id, name from empty_items?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1355 )
1356 .unwrap();
1357 config.resolve_defaults();
1358
1359 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
1360 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1361
1362 let (tx, rx) = tokio::sync::oneshot::channel();
1363 let (mpsc_tx, mut mpsc_rx) = mpsc::channel::<ExchangeEnvelope>(8);
1364 tokio::spawn(async move {
1365 while let Some(env) = mpsc_rx.recv().await {
1366 if let Some(reply_tx) = env.reply_tx {
1367 let _ = reply_tx.send(Ok(env.exchange.clone()));
1368 }
1369 let _ = tx.send(env.exchange);
1370 break;
1371 }
1372 });
1373 let ctx = ConsumerContext::new(
1374 mpsc_tx,
1375 CancellationToken::new(),
1376 "sql-test-route".to_string(),
1377 );
1378
1379 consumer
1380 .poll_database(&pool, &ctx, &template)
1381 .await
1382 .expect("poll must succeed");
1383
1384 let exchange = rx
1385 .await
1386 .expect("StreamList should emit exchange even for empty results");
1387
1388 match exchange.input.body {
1389 Body::Stream(ref stream_body) => {
1390 let stream = stream_body.stream.clone();
1391 let mut guard = stream.lock().await;
1392 let stream_opt = guard.take();
1393
1394 use futures::StreamExt;
1395 let mut count = 0;
1396 if let Some(mut stream) = stream_opt {
1397 while let Some(chunk) = stream.next().await {
1398 let chunk = chunk.expect("stream chunk should not error");
1399 count += chunk.len();
1400 }
1401 }
1402 assert_eq!(count, 0, "empty table should produce zero stream bytes");
1403 }
1404 ref other => panic!("expected Body::Stream, got {:?}", other),
1405 }
1406 }
1407
1408 #[derive(Debug, Default)]
1412 struct RecordingHealth {
1413 forced: Arc<Mutex<Vec<(String, String, String)>>>,
1414 }
1415
1416 impl HealthCheckRegistry for RecordingHealth {
1417 fn force_unhealthy_for_route(&self, route_id: &str, name: &str, reason: &str) {
1418 self.forced.lock().unwrap().push((
1419 route_id.to_string(),
1420 name.to_string(),
1421 reason.to_string(),
1422 ));
1423 }
1424 }
1425
1426 struct NoopMetricsForConsumer;
1427
1428 impl MetricsCollector for NoopMetricsForConsumer {
1429 fn record_exchange_duration(&self, _: &str, _: Duration) {}
1430 fn increment_errors(&self, _: &str, _: &str) {}
1431 fn increment_exchanges(&self, _: &str) {}
1432 fn set_queue_depth(&self, _: &str, _: usize) {}
1433 fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
1434 }
1435
1436 struct RecordingRuntimeWithHealth {
1437 health: Arc<RecordingHealth>,
1438 }
1439
1440 impl RuntimeObservability for RecordingRuntimeWithHealth {
1441 fn metrics(&self) -> Arc<dyn MetricsCollector> {
1442 Arc::new(NoopMetricsForConsumer)
1443 }
1444 fn health(&self) -> Arc<dyn HealthCheckRegistry> {
1445 self.health.clone()
1446 }
1447 }
1448
1449 #[tokio::test]
1452 async fn consumer_pool_init_failure_calls_force_unhealthy_for_route() {
1453 let health = Arc::new(RecordingHealth::default());
1454 let recorded_health = health.clone();
1455 let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntimeWithHealth { health });
1456
1457 let mut config = SqlEndpointConfig::from_uri(
1458 "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false&initialDelay=0&delay=1",
1459 )
1460 .unwrap();
1461 config.max_connections = Some(1);
1462 config.min_connections = Some(0);
1463 config.idle_timeout_secs = Some(300);
1464 config.max_lifetime_secs = Some(1800);
1465
1466 let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), None, rt);
1467
1468 let (tx, _rx) = mpsc::channel(8);
1469 let ctx = ConsumerContext::new(
1470 tx,
1471 CancellationToken::new(),
1472 "sql-consumer-test-route".to_string(),
1473 );
1474
1475 let result = consumer.start(ctx).await;
1476 assert!(result.is_err(), "pool init should fail with bad db_url");
1477
1478 let forced = recorded_health.forced.lock().unwrap();
1479 assert_eq!(
1480 forced.len(),
1481 1,
1482 "expected one force_unhealthy_for_route call"
1483 );
1484 assert_eq!(forced[0].0, "sql-consumer-test-route");
1485 assert_eq!(forced[0].1, "g:sql:consumer-pool-init");
1486 assert!(!forced[0].2.is_empty(), "reason should be non-empty");
1487 }
1488
1489 #[tokio::test]
1493 async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
1494 use camel_component_api::NetworkRetryPolicy;
1495 use std::sync::Arc;
1496 use std::sync::atomic::{AtomicU32, Ordering};
1497
1498 let policy = NetworkRetryPolicy {
1499 max_attempts: 3,
1500 initial_delay: Duration::from_millis(1),
1501 max_delay: Duration::from_millis(1),
1502 multiplier: 1.0,
1503 ..NetworkRetryPolicy::default()
1504 };
1505
1506 let calls = Arc::new(AtomicU32::new(0));
1507 let calls_clone = Arc::clone(&calls);
1508
1509 let mut attempt: u32 = 0;
1510 let _result: Result<(), ()> = loop {
1511 attempt += 1;
1512 calls_clone.fetch_add(1, Ordering::SeqCst);
1513 let op_result: Result<(), ()> = Err(());
1514 match op_result {
1515 Ok(v) => break Ok(v),
1516 Err(_) if policy.should_retry(attempt) => {
1517 let delay = policy.delay_for(attempt - 1);
1518 tokio::time::sleep(delay).await;
1519 continue;
1520 }
1521 Err(_) => break Err(()),
1522 }
1523 };
1524
1525 assert_eq!(
1526 calls.load(Ordering::SeqCst),
1527 3,
1528 "max_attempts=3 must yield exactly 3 invocations"
1529 );
1530 }
1531}