1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures::TryStreamExt;
7use serde_json::Value as JsonValue;
8use sqlx::AnyPool;
9use sqlx::any::AnyPoolOptions;
10use sqlx::any::AnyRow;
11use tokio::sync::OnceCell;
12use tracing::{debug, error, info, warn};
13
14use camel_component_api::retry_async;
15use camel_component_api::{
16 Body, CamelError, Exchange, Message, RuntimeObservability, StreamBody, StreamMetadata,
17};
18use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
19
20use crate::config::{
21 PollStrategy, ProcessingStrategy, SqlEndpointConfig, SqlOutputType, TransactionMode,
22 enrich_db_url_with_ssl, redact_db_url,
23};
24use crate::headers;
25use crate::query::{QueryTemplate, parse_query_template, resolve_params};
26use crate::utils::{bind_json_values, is_retryable_sqlx_error, row_to_json};
27
28fn record_post_process_failure(
42 runtime: &dyn RuntimeObservability,
43 route_id: &str,
44 label: &str,
45 error: &CamelError,
46 message: &str,
47) {
48 runtime.metrics().increment_errors(route_id, label);
49 error!(error = %error, "{message}");
51}
52
53pub struct SqlConsumer {
54 pub(crate) config: SqlEndpointConfig,
55 pub(crate) pool: Arc<OnceCell<AnyPool>>,
56 stopped: bool,
57 runtime: Arc<dyn RuntimeObservability>,
60}
61
62impl SqlConsumer {
63 pub fn new(
64 config: SqlEndpointConfig,
65 pool: Arc<OnceCell<AnyPool>>,
66 runtime: Arc<dyn RuntimeObservability>,
67 ) -> Self {
68 Self {
69 config,
70 pool,
71 stopped: false,
72 runtime,
73 }
74 }
75
76 async fn poll_database(
78 &self,
79 pool: &AnyPool,
80 context: &ConsumerContext,
81 template: &QueryTemplate,
82 ) -> Result<(), CamelError> {
83 let route_id = context.route_id();
85
86 let empty_exchange = Exchange::new(Message::default());
88
89 let prepared = resolve_params(template, &empty_exchange, &self.config.in_separator)?;
91
92 debug!(query = %prepared.sql, "executing SQL consumer poll");
93
94 if self.config.output_type == SqlOutputType::StreamList {
95 return self.poll_database_stream(pool, context, &prepared).await;
96 }
97
98 let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
99 let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
100 warn!(error = %e, "SQL consumer poll query failed");
101 CamelError::ProcessorError(format!("Query execution failed: {}", e))
102 })?;
103
104 debug!(rows = rows.len(), "SQL consumer poll completed");
105
106 if rows.is_empty() && !self.config.route_empty_result_set {
107 return Ok(());
108 }
109
110 let rows_to_process: Vec<AnyRow> = if let Some(max) = self.config.max_messages_per_poll {
111 if max > 0 {
112 rows.into_iter().take(max as usize).collect()
113 } else {
114 rows
115 }
116 } else {
117 rows
118 };
119
120 if self.config.use_iterator {
121 for row in rows_to_process {
123 let row_json = row_to_json(&row)?;
124
125 let mut msg = Message::new(Body::Json(row_json.clone()));
127
128 if let Some(obj) = row_json.as_object() {
130 for (key, value) in obj {
131 msg.set_header(format!("CamelSql.{}", key), value.clone());
132 }
133 }
134
135 let exchange = Exchange::new(msg);
136
137 let result = context.send_and_wait(exchange).await;
139
140 if let Err(e) = self.handle_post_processing(pool, &result, &row_json).await {
142 record_post_process_failure(
143 self.runtime.as_ref(),
144 route_id,
145 "b-prime:sql:on-consume",
146 &e,
147 "Post-processing failed",
148 );
149 if self.config.break_batch_on_consume_fail {
150 return Err(e);
151 }
152 }
153
154 if let Err(ref consume_err) = result
156 && self.config.break_batch_on_consume_fail
157 {
158 return Err(consume_err.clone());
159 }
160 }
161 } else {
162 let rows_json: Vec<JsonValue> = rows_to_process
164 .iter()
165 .map(row_to_json)
166 .collect::<Result<Vec<_>, CamelError>>()?;
167
168 let row_count = rows_json.len();
169
170 let mut msg = Message::new(Body::Json(JsonValue::Array(rows_json.clone())));
172 msg.set_header(headers::ROW_COUNT, JsonValue::Number(row_count.into()));
173
174 let exchange = Exchange::new(msg);
175
176 let result = context.send_and_wait(exchange).await;
178
179 for row_json in rows_json.iter() {
183 if let Err(e) = self.handle_post_processing(pool, &result, row_json).await {
184 record_post_process_failure(
185 self.runtime.as_ref(),
186 route_id,
187 "b-prime:sql:on-consume-batch",
188 &e,
189 "Post-processing failed for batch row",
190 );
191 if self.config.break_batch_on_consume_fail {
192 return Err(e);
193 }
194 }
195 }
196
197 if let Err(ref consume_err) = result
199 && self.config.break_batch_on_consume_fail
200 {
201 return Err(consume_err.clone());
202 }
203 }
204
205 if let Some(ref batch_query) = self.config.on_consume_batch_complete {
207 let _ = self
208 .execute_post_query(pool, batch_query, &JsonValue::Null)
209 .await;
210 }
211
212 Ok(())
213 }
214
215 async fn poll_database_stream(
216 &self,
217 pool: &AnyPool,
218 context: &ConsumerContext,
219 prepared: &crate::query::PreparedQuery,
220 ) -> Result<(), CamelError> {
221 let pool_clone = pool.clone();
222 let sql_str = prepared.sql.clone();
223 let bindings = prepared.bindings.clone();
224
225 let byte_stream = async_stream::try_stream! {
226 let mut q = sqlx::query(&sql_str);
227 q = bind_json_values(q, &bindings);
228 let mut rows = q.fetch(&pool_clone);
229 while let Some(row) = rows.try_next().await.map_err(|e| {
230 CamelError::ProcessorError(format!("Query execution failed: {}", e))
231 })? {
232 let json_val = row_to_json(&row).map_err(|e| {
233 CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
234 })?;
235 let mut bytes = serde_json::to_vec(&json_val)
236 .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
237 bytes.push(b'\n');
238 yield Bytes::from(bytes);
239 }
240 };
241
242 let msg = Message::new(Body::Stream(StreamBody {
243 stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
244 metadata: StreamMetadata {
245 content_type: Some("application/x-ndjson".to_string()),
246 size_hint: None,
247 origin: None,
248 },
249 }));
250
251 let exchange = Exchange::new(msg);
252 let result = context.send_and_wait(exchange).await;
253 if let Err(e) = result {
254 record_post_process_failure(
255 self.runtime.as_ref(),
256 context.route_id(),
257 "b-prime:sql:stream-list",
258 &e,
259 "StreamList consumer downstream processing failed",
260 );
261 return Err(e);
262 }
263
264 debug!("StreamList: consumer poll completed (lazy stream emitted)");
265 Ok(())
266 }
267
268 async fn handle_post_processing(
270 &self,
271 pool: &AnyPool,
272 result: &Result<Exchange, CamelError>,
273 row_json: &JsonValue,
274 ) -> Result<(), CamelError> {
275 match result {
276 Ok(_) => {
277 if let Some(ref on_consume) = self.config.on_consume {
279 self.execute_post_query(pool, on_consume, row_json).await?;
280 }
281 }
282 Err(_) => {
283 if let Some(ref on_consume_failed) = self.config.on_consume_failed {
285 self.execute_post_query(pool, on_consume_failed, row_json)
286 .await?;
287 }
288 }
289 }
290 Ok(())
291 }
292
293 async fn execute_post_query(
295 &self,
296 pool: &AnyPool,
297 query_str: &str,
298 row_json: &JsonValue,
299 ) -> Result<(), CamelError> {
300 let template = parse_query_template(query_str, self.config.placeholder)?;
302
303 let mut temp_msg = Message::new(Body::Json(row_json.clone()));
306 if let Some(obj) = row_json.as_object() {
307 for (key, value) in obj {
308 temp_msg.set_header(format!("CamelSql.{}", key), value.clone());
309 }
310 }
311 let temp_exchange = Exchange::new(temp_msg);
312
313 let prepared = resolve_params(&template, &temp_exchange, &self.config.in_separator)?;
315
316 let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
318 let result = query.execute(pool).await.map_err(|e| {
319 CamelError::ProcessorError(format!("Post-query execution failed: {}", e))
320 })?;
321
322 if result.rows_affected() == 0 {
324 warn!(
325 query = query_str,
326 "Post-processing query affected 0 rows — the row may not have been marked correctly"
327 );
328 }
329
330 Ok(())
331 }
332
333 async fn handle_poll_result(
336 &self,
337 pool: &AnyPool,
338 context: &ConsumerContext,
339 template: &QueryTemplate,
340 ) {
341 if let Err(e) = self.poll_database(pool, context, template).await {
342 if self.config.bridge_error_handler {
343 warn!(error = %e, "SQL consumer poll failed (bridged)");
347 if let Err(route_err) = self.bridge_poll_error(context, e).await {
348 error!(error = %route_err, "Failed to bridge SQL consumer error to route");
351 }
352 } else {
353 record_post_process_failure(
354 self.runtime.as_ref(),
355 context.route_id(),
356 "b-prime:sql:poll-failed",
357 &e,
358 "SQL consumer poll failed",
359 );
360 }
361 }
362 }
363
364 async fn bridge_poll_error(
365 &self,
366 context: &ConsumerContext,
367 error: CamelError,
368 ) -> Result<(), CamelError> {
369 if !self.config.bridge_error_handler {
370 return Ok(());
371 }
372 let mut exchange = Exchange::new(Message::default());
373 exchange.set_error(error);
374 context.send_and_wait(exchange).await.map(|_| ())
375 }
376}
377
378#[async_trait]
379impl Consumer for SqlConsumer {
380 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
381 if self.stopped {
383 return Err(CamelError::Config(
384 "SQL consumer cannot be restarted after stop".into(),
385 ));
386 }
387
388 let route_id = context.route_id().to_string();
390 let pool = self
391 .pool
392 .get_or_try_init(|| async {
393 self.config.resolve_defaults();
395 self.config.resolve_file_query().await?;
397
398 sqlx::any::install_default_drivers();
401 let db_url = enrich_db_url_with_ssl(&self.config.db_url, &self.config)?;
402
403 let max_conn = self.config.max_connections.ok_or_else(|| {
404 CamelError::Config("max_connections not resolved for SQL consumer pool".into())
405 })?;
406 let min_conn = self.config.min_connections.ok_or_else(|| {
407 CamelError::Config("min_connections not resolved for SQL consumer pool".into())
408 })?;
409 let idle_timeout = self.config.idle_timeout_secs.ok_or_else(|| {
410 CamelError::Config(
411 "idle_timeout_secs not resolved for SQL consumer pool".into(),
412 )
413 })?;
414 let max_lifetime = self.config.max_lifetime_secs.ok_or_else(|| {
415 CamelError::Config(
416 "max_lifetime_secs not resolved for SQL consumer pool".into(),
417 )
418 })?;
419
420 info!(
421 db_url = %redact_db_url(&self.config.db_url),
422 "SQL consumer pool initializing"
423 );
424 let retry_policy = &self.config.retry;
425 retry_async::<_, _, _, _, sqlx::Error>(
426 retry_policy,
427 Some("sql-consumer"),
428 || {
429 AnyPoolOptions::new()
430 .max_connections(max_conn)
431 .min_connections(min_conn)
432 .idle_timeout(Duration::from_secs(idle_timeout))
433 .max_lifetime(Duration::from_secs(max_lifetime))
434 .connect(&db_url)
435 },
436 is_retryable_sqlx_error,
437 )
438 .await
439 .map_err(|e| {
440 self.runtime.health().force_unhealthy_for_route(
441 &route_id,
442 "g:sql:consumer-pool-init",
443 &e.to_string(),
444 );
445 error!(error = %e, db_url = %redact_db_url(&self.config.db_url), "SQL connect failed, giving up");
447 CamelError::EndpointCreationFailed(format!(
448 "Failed to connect to database: {}",
449 e
450 ))
451 })
452 })
453 .await?;
454
455 if self.config.transaction_mode == TransactionMode::Managed {
457 warn!("transactionManager not yet implemented; using Auto mode");
458 }
459
460 if self.config.processing_strategy == ProcessingStrategy::Scheduled {
462 debug!(
463 "Processing strategy: Scheduled (rows dispatched individually via send_and_wait)"
464 );
465 }
466 if self.config.poll_strategy == PollStrategy::Burst {
467 debug!("Poll strategy: Burst (rapid successive polls)");
468 }
469
470 if self.config.output_type == SqlOutputType::StreamList
471 && (self.config.on_consume.is_some()
472 || self.config.on_consume_failed.is_some()
473 || self.config.on_consume_batch_complete.is_some())
474 {
475 warn!(
476 "onConsume/onConsumeFailed/onConsumeBatchComplete are not executed in StreamList mode \
477 (rows are consumed lazily downstream)"
478 );
479 }
480
481 if self.config.on_consume.is_none() {
483 warn!(
484 "SQL consumer started without onConsume configured — consumed rows will not be marked/deleted"
485 );
486 }
487
488 info!(
489 db_url = %redact_db_url(&self.config.db_url),
490 query_len = self.config.query.len(),
491 "SQL consumer started"
492 );
493
494 let template = parse_query_template(&self.config.query, self.config.placeholder)
496 .map_err(|e| CamelError::Config(format!("Invalid query template: {}", e)))?;
497
498 if self.config.initial_delay_ms > 0 {
500 tokio::select! {
501 _ = context.cancelled() => {
502 info!("SQL consumer stopped during initial delay");
503 return Ok(());
504 }
505 _ = tokio::time::sleep(Duration::from_millis(self.config.initial_delay_ms)) => {}
506 }
507 }
508
509 let mut poll_count: u32 = 0;
525 loop {
526 if let Some(max_repeats) = self.config.repeat_count
528 && poll_count >= max_repeats
529 {
530 info!(
531 repeat_count = max_repeats,
532 "SQL consumer reached repeat_count limit, stopping"
533 );
534 break;
535 }
536
537 tokio::select! {
538 _ = context.cancelled() => {
539 info!("SQL consumer stopped");
540 break;
541 }
542 _ = tokio::time::sleep(Duration::from_millis(self.config.delay_ms)) => {
543 poll_count += 1;
544 self.handle_poll_result(pool, &context, &template).await;
545 }
546 }
547 }
548
549 Ok(())
550 }
551
552 async fn stop(&mut self) -> Result<(), CamelError> {
553 if self.stopped {
555 debug!("SQL consumer stop called on already-stopped consumer");
556 return Ok(());
557 }
558
559 if let Some(pool) = self.pool.get() {
561 debug!("SQL consumer closing connection pool");
562 pool.close().await;
563 debug!("SQL consumer pool closed");
564 }
565
566 self.stopped = true;
567 info!("SQL consumer stopped");
568 Ok(())
569 }
570
571 fn concurrency_model(&self) -> ConcurrencyModel {
572 ConcurrencyModel::Sequential
576 }
577}
578
579#[cfg(test)]
580mod tests {
581 use super::*;
582 use camel_api::MetricsCollector;
583 use camel_component_api::HealthCheckRegistry;
584 use camel_component_api::test_support::PanicRuntimeObservability;
585 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
586 std::sync::Arc::new(PanicRuntimeObservability)
587 }
588 use crate::config::SqlEndpointConfig;
589 use camel_component_api::ExchangeEnvelope;
590 use camel_component_api::UriConfig;
591 use sqlx::any::AnyPoolOptions;
592 use std::sync::Arc;
593 use std::sync::Mutex;
594 use std::time::Duration;
595 use tokio::sync::mpsc;
596 use tokio_util::sync::CancellationToken;
597
598 struct RecordingMetrics {
603 errors: Arc<Mutex<Vec<(String, String)>>>,
604 }
605
606 impl MetricsCollector for RecordingMetrics {
607 fn record_exchange_duration(&self, _: &str, _: Duration) {}
608 fn increment_errors(&self, route_id: &str, error_type: &str) {
609 self.errors
610 .lock()
611 .unwrap()
612 .push((route_id.to_string(), error_type.to_string()));
613 }
614 fn increment_exchanges(&self, _: &str) {}
615 fn set_queue_depth(&self, _: &str, _: usize) {}
616 fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
617 }
618
619 struct RecordingRuntime {
620 metrics_collector: Arc<RecordingMetrics>,
621 }
622
623 impl RecordingRuntime {
624 fn new(errors: Arc<Mutex<Vec<(String, String)>>>) -> Self {
625 Self {
626 metrics_collector: Arc::new(RecordingMetrics { errors }),
627 }
628 }
629 }
630
631 impl RuntimeObservability for RecordingRuntime {
632 fn metrics(&self) -> Arc<dyn MetricsCollector> {
633 self.metrics_collector.clone() as Arc<dyn MetricsCollector>
634 }
635 fn health(&self) -> Arc<dyn HealthCheckRegistry> {
636 panic!("RecordingRuntime::health not used in this test")
637 }
638 }
639
640 #[tracing_test::traced_test]
644 #[test]
645 fn record_post_process_failure_increments_errors_and_emits_error_log() {
646 let errors: Arc<Mutex<Vec<(String, String)>>> = Arc::new(Mutex::new(Vec::new()));
647 let runtime = Arc::new(RecordingRuntime::new(Arc::clone(&errors)));
648 let error = CamelError::ProcessorError("test failure".to_string());
649
650 record_post_process_failure(
652 runtime.as_ref(),
653 "test-route",
654 "b-prime:sql:on-consume",
655 &error,
656 "Post-processing failed",
657 );
658
659 let recorded = errors.lock().unwrap();
661 assert_eq!(recorded.len(), 1, "expected 1 increment_errors call");
662 assert_eq!(recorded[0].0, "test-route");
663 assert_eq!(recorded[0].1, "b-prime:sql:on-consume");
664 drop(recorded);
665
666 assert!(logs_contain("ERROR"), "helper must emit error! log");
668 assert!(
669 logs_contain("Post-processing failed"),
670 "helper must include the message in the log"
671 );
672 }
673
674 async fn sqlite_pool() -> AnyPool {
675 sqlx::any::install_default_drivers();
676 AnyPoolOptions::new()
677 .max_connections(1)
678 .connect("sqlite::memory:")
679 .await
680 .expect("sqlite pool")
681 }
682
683 async fn seed_consumer_table(pool: &AnyPool) {
684 sqlx::query("CREATE TABLE jobs (id INTEGER PRIMARY KEY, processed INTEGER DEFAULT 0, failed INTEGER DEFAULT 0)")
685 .execute(pool)
686 .await
687 .expect("create table");
688 sqlx::query("INSERT INTO jobs (id, processed, failed) VALUES (1, 0, 0), (2, 0, 0)")
689 .execute(pool)
690 .await
691 .expect("seed rows");
692 }
693
694 fn config() -> SqlEndpointConfig {
695 let mut c =
696 SqlEndpointConfig::from_uri("sql:select * from t?db_url=postgres://localhost/test")
697 .unwrap();
698 c.resolve_defaults();
699 c
700 }
701
702 #[test]
703 fn consumer_concurrency_model() {
704 let c = SqlConsumer::new(config(), Arc::new(OnceCell::new()), test_rt());
705 assert_eq!(c.concurrency_model(), ConcurrencyModel::Sequential);
706 }
707
708 #[test]
709 fn consumer_stores_config() {
710 let mut config = SqlEndpointConfig::from_uri(
711 "sql:select * from t?db_url=postgres://localhost/test&delay=2000&onConsume=update t set done=true"
712 ).unwrap();
713 config.resolve_defaults();
714 let c = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
715 assert_eq!(c.config.delay_ms, 2000);
716 assert!(c.config.on_consume.is_some());
717 }
718
719 #[tokio::test]
720 async fn poll_database_runs_on_consume_for_successful_rows() {
721 let pool = sqlite_pool().await;
722 seed_consumer_table(&pool).await;
723
724 let mut config = SqlEndpointConfig::from_uri(
725 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
726 )
727 .unwrap();
728 config.resolve_defaults();
729
730 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
731 let template = parse_query_template(&config.query, config.placeholder).unwrap();
732
733 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
734 tokio::spawn(async move {
735 while let Some(env) = rx.recv().await {
736 if let Some(reply_tx) = env.reply_tx {
737 let _ = reply_tx.send(Ok(env.exchange));
738 }
739 }
740 });
741 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
742
743 consumer
744 .poll_database(&pool, &ctx, &template)
745 .await
746 .expect("poll must succeed");
747
748 let row = sqlx::query("select processed from jobs where id = 1")
749 .fetch_one(&pool)
750 .await
751 .expect("row 1");
752 let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
753
754 let row = sqlx::query("select processed from jobs where id = 2")
755 .fetch_one(&pool)
756 .await
757 .expect("row 2");
758 let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
759
760 assert_eq!(processed_1, 1);
761 assert_eq!(processed_2, 1);
762 }
763
764 #[tokio::test]
765 async fn poll_database_runs_on_consume_failed_when_downstream_fails() {
766 let pool = sqlite_pool().await;
767 seed_consumer_table(&pool).await;
768
769 let mut config = SqlEndpointConfig::from_uri(
770 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&initialDelay=0&delay=1",
771 )
772 .unwrap();
773 config.resolve_defaults();
774
775 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
776 let template = parse_query_template(&config.query, config.placeholder).unwrap();
777
778 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
779 tokio::spawn(async move {
780 while let Some(env) = rx.recv().await {
781 if let Some(reply_tx) = env.reply_tx {
782 let _ =
783 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
784 }
785 }
786 });
787 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
788
789 consumer
790 .poll_database(&pool, &ctx, &template)
791 .await
792 .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
793
794 let row = sqlx::query("select failed from jobs where id = 1")
795 .fetch_one(&pool)
796 .await
797 .expect("row 1");
798 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
799
800 let row = sqlx::query("select failed from jobs where id = 2")
801 .fetch_one(&pool)
802 .await
803 .expect("row 2");
804 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
805
806 assert_eq!(failed_1, 1);
807 assert_eq!(failed_2, 1);
808 }
809
810 #[tokio::test]
811 async fn poll_database_breaks_batch_on_consume_fail() {
812 let pool = sqlite_pool().await;
813 seed_consumer_table(&pool).await;
814
815 let mut config = SqlEndpointConfig::from_uri(
816 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&breakBatchOnConsumeFail=true&initialDelay=0&delay=1",
817 )
818 .unwrap();
819 config.resolve_defaults();
820
821 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
822 let template = parse_query_template(&config.query, config.placeholder).unwrap();
823
824 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
825 tokio::spawn(async move {
826 while let Some(env) = rx.recv().await {
827 if let Some(reply_tx) = env.reply_tx {
828 let _ =
829 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
830 }
831 }
832 });
833 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
834
835 let err = consumer
836 .poll_database(&pool, &ctx, &template)
837 .await
838 .expect_err("must stop on first downstream failure");
839 assert!(err.to_string().contains("downstream boom"));
840
841 let row = sqlx::query("select failed from jobs where id = 1")
842 .fetch_one(&pool)
843 .await
844 .expect("row 1");
845 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
846
847 let row = sqlx::query("select failed from jobs where id = 2")
848 .fetch_one(&pool)
849 .await
850 .expect("row 2");
851 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
852
853 assert_eq!(failed_1, 1);
854 assert_eq!(failed_2, 0, "second row must not be processed");
855 }
856
857 #[tokio::test]
863 async fn consumer_no_panic_without_prior_resolve_defaults() {
864 let config = SqlEndpointConfig::from_uri(
865 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
866 )
867 .unwrap();
868 assert!(config.max_connections.is_none());
870
871 let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), test_rt());
872 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
873 tokio::spawn(async move {
874 while let Some(env) = rx.recv().await {
875 if let Some(reply_tx) = env.reply_tx {
876 let _ = reply_tx.send(Ok(env.exchange));
877 }
878 }
879 });
880 let token = CancellationToken::new();
881 let ctx = ConsumerContext::new(tx, token.clone(), "sql-test-route".to_string());
882
883 let consumer_handle = tokio::spawn(async move { consumer.start(ctx).await });
885
886 tokio::time::sleep(Duration::from_millis(50)).await;
888 token.cancel();
889
890 let result = consumer_handle.await.expect("task should not panic");
891 let _ = result;
893 }
894
895 #[tokio::test]
897 async fn stop_closes_pool() {
898 let pool = sqlite_pool().await;
899 seed_consumer_table(&pool).await;
900
901 let mut config = SqlEndpointConfig::from_uri(
902 "sql:select id from jobs?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
903 )
904 .unwrap();
905 config.resolve_defaults();
906
907 let pool_cell = Arc::new(OnceCell::new());
908 pool_cell.set(pool.clone()).unwrap();
909
910 let mut consumer = SqlConsumer::new(config, pool_cell, test_rt());
911 consumer.stop().await.expect("stop should succeed");
912
913 assert!(
915 pool.is_closed(),
916 "Pool should be closed after consumer.stop()"
917 );
918 }
919
920 #[tokio::test]
922 async fn double_stop_is_safe() {
923 let pool = sqlite_pool().await;
924 let mut config = SqlEndpointConfig::from_uri(
925 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
926 )
927 .unwrap();
928 config.resolve_defaults();
929
930 let pool_cell = Arc::new(OnceCell::new());
931 pool_cell.set(pool.clone()).unwrap();
932
933 let mut consumer = SqlConsumer::new(config, pool_cell, test_rt());
934 consumer.stop().await.expect("first stop should succeed");
935 consumer
936 .stop()
937 .await
938 .expect("second stop should also succeed");
939 }
940
941 #[tokio::test]
943 async fn start_after_stop_rejected() {
944 let pool = sqlite_pool().await;
945 let mut config = SqlEndpointConfig::from_uri(
946 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
947 )
948 .unwrap();
949 config.resolve_defaults();
950
951 let pool_cell = Arc::new(OnceCell::new());
952 pool_cell.set(pool.clone()).unwrap();
953
954 let mut consumer = SqlConsumer::new(config, pool_cell, test_rt());
955 consumer.stop().await.expect("stop should succeed");
956
957 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
958 tokio::spawn(async move {
959 while let Some(env) = rx.recv().await {
960 if let Some(reply_tx) = env.reply_tx {
961 let _ = reply_tx.send(Ok(env.exchange));
962 }
963 }
964 });
965 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
966
967 let result = consumer.start(ctx).await;
968 assert!(result.is_err());
969 let err_msg = result.unwrap_err().to_string();
970 assert!(
971 err_msg.contains("cannot be restarted") || err_msg.contains("after stop"),
972 "Expected restart error, got: {}",
973 err_msg
974 );
975 }
976
977 #[tokio::test]
979 async fn batch_mode_per_row_post_processing() {
980 let pool = sqlite_pool().await;
981 seed_consumer_table(&pool).await;
982
983 let mut config = SqlEndpointConfig::from_uri(
984 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
985 )
986 .unwrap();
987 config.resolve_defaults();
988
989 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
990 let template = parse_query_template(&config.query, config.placeholder).unwrap();
991
992 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
993 tokio::spawn(async move {
994 while let Some(env) = rx.recv().await {
995 if let Some(reply_tx) = env.reply_tx {
996 let _ = reply_tx.send(Ok(env.exchange));
997 }
998 }
999 });
1000 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1001
1002 consumer
1003 .poll_database(&pool, &ctx, &template)
1004 .await
1005 .expect("poll must succeed");
1006
1007 let row = sqlx::query("select processed from jobs where id = 1")
1009 .fetch_one(&pool)
1010 .await
1011 .expect("row 1");
1012 let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
1013
1014 let row = sqlx::query("select processed from jobs where id = 2")
1015 .fetch_one(&pool)
1016 .await
1017 .expect("row 2");
1018 let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
1019
1020 assert_eq!(
1021 processed_1, 1,
1022 "row 1 should be marked processed via per-row onConsume"
1023 );
1024 assert_eq!(
1025 processed_2, 1,
1026 "row 2 should be marked processed via per-row onConsume"
1027 );
1028 }
1029
1030 #[tokio::test]
1032 async fn batch_mode_per_row_post_processing_on_failure() {
1033 let pool = sqlite_pool().await;
1034 seed_consumer_table(&pool).await;
1035
1036 let mut config = SqlEndpointConfig::from_uri(
1037 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
1038 )
1039 .unwrap();
1040 config.resolve_defaults();
1041
1042 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1043 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1044
1045 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
1046 tokio::spawn(async move {
1047 while let Some(env) = rx.recv().await {
1048 if let Some(reply_tx) = env.reply_tx {
1049 let _ =
1050 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
1051 }
1052 }
1053 });
1054 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1055
1056 consumer
1057 .poll_database(&pool, &ctx, &template)
1058 .await
1059 .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
1060
1061 let row = sqlx::query("select failed from jobs where id = 1")
1063 .fetch_one(&pool)
1064 .await
1065 .expect("row 1");
1066 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
1067
1068 let row = sqlx::query("select failed from jobs where id = 2")
1069 .fetch_one(&pool)
1070 .await
1071 .expect("row 2");
1072 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
1073
1074 assert_eq!(
1075 failed_1, 1,
1076 "row 1 should be marked failed via per-row onConsumeFailed"
1077 );
1078 assert_eq!(
1079 failed_2, 1,
1080 "row 2 should be marked failed via per-row onConsumeFailed"
1081 );
1082 }
1083
1084 #[tokio::test]
1085 async fn bridge_error_handler_routes_poll_errors_to_exchange_error() {
1086 let mut config = config();
1087 config.bridge_error_handler = true;
1088 let consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), test_rt());
1089
1090 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1091 tokio::spawn(async move {
1092 while let Some(env) = rx.recv().await {
1093 assert!(env.exchange.error.is_some(), "exchange must carry error");
1094 if let Some(reply_tx) = env.reply_tx {
1095 let _ = reply_tx.send(Ok(env.exchange));
1096 }
1097 break;
1098 }
1099 });
1100
1101 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1102 consumer
1103 .bridge_poll_error(&ctx, CamelError::ProcessorError("poll failed".into()))
1104 .await
1105 .expect("bridging should succeed");
1106 }
1107
1108 #[tracing_test::traced_test]
1112 #[tokio::test]
1113 async fn bridged_poll_failure_emits_warn_not_error() {
1114 let pool = sqlite_pool().await;
1115 let mut config = config();
1120 config.bridge_error_handler = true;
1121 config.query = "select * from nonexistent_table".to_string();
1123 config.resolve_defaults();
1124 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1125 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1126
1127 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1130 tokio::spawn(async move {
1131 while let Some(env) = rx.recv().await {
1132 if let Some(reply_tx) = env.reply_tx {
1133 let _ = reply_tx.send(Ok(env.exchange));
1134 }
1135 }
1136 });
1137 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1138
1139 consumer.handle_poll_result(&pool, &ctx, &template).await;
1141
1142 assert!(
1144 !logs_contain("ERROR"),
1145 "bridged poll failure must not emit ERROR (handler owns it); check captured logs for stray ERROR lines"
1146 );
1147 assert!(
1149 logs_contain("WARN"),
1150 "bridged poll failure should emit warn! for operator visibility"
1151 );
1152 }
1153
1154 #[tracing_test::traced_test]
1164 #[tokio::test]
1165 async fn unbridged_send_and_wait_failure_emits_error_loud() {
1166 let pool = sqlite_pool().await;
1167 sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
1168 .execute(&pool)
1169 .await
1170 .expect("create table");
1171 sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha')")
1172 .execute(&pool)
1173 .await
1174 .expect("seed rows");
1175
1176 let mut config = SqlEndpointConfig::from_uri(
1177 "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1178 )
1179 .unwrap();
1180 config.resolve_defaults();
1181 config.bridge_error_handler = false;
1183 let consumer = SqlConsumer::new(
1184 config.clone(),
1185 Arc::new(OnceCell::new()),
1186 Arc::new(RecordingRuntime::new(Arc::new(Mutex::new(Vec::new())))),
1187 );
1188 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1189
1190 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
1192 tokio::spawn(async move {
1193 while let Some(env) = rx.recv().await {
1194 if let Some(reply_tx) = env.reply_tx {
1195 let _ = reply_tx.send(Err(CamelError::ProcessorError("boom".into())));
1196 }
1197 }
1198 });
1199 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1200
1201 let _ = consumer.poll_database(&pool, &ctx, &template).await;
1202
1203 assert!(
1205 logs_contain("ERROR"),
1206 "unbridged send_and_wait failure MUST emit ERROR (consumer owns the signal)"
1207 );
1208 }
1209
1210 #[tracing_test::traced_test]
1213 #[tokio::test]
1214 async fn unbridged_handle_poll_result_emits_error_loud() {
1215 let pool = sqlite_pool().await;
1216 let mut config = config();
1219 config.bridge_error_handler = false;
1220 config.query = "select * from nonexistent_table".to_string();
1221 config.resolve_defaults();
1222 let consumer = SqlConsumer::new(
1223 config.clone(),
1224 Arc::new(OnceCell::new()),
1225 Arc::new(RecordingRuntime::new(Arc::new(Mutex::new(Vec::new())))),
1226 );
1227 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1228
1229 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1231 tokio::spawn(async move {
1232 while let Some(env) = rx.recv().await {
1233 if let Some(reply_tx) = env.reply_tx {
1234 let _ = reply_tx.send(Ok(env.exchange));
1235 }
1236 }
1237 });
1238 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1239
1240 consumer.handle_poll_result(&pool, &ctx, &template).await;
1241
1242 assert!(
1243 logs_contain("ERROR"),
1244 "unbridged handle_poll_result failure MUST emit ERROR (consumer owns signal)"
1245 );
1246 }
1247
1248 #[tokio::test]
1249 async fn stream_list_consumer_emits_ndjson_body() {
1250 let pool = sqlite_pool().await;
1251 sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
1252 .execute(&pool)
1253 .await
1254 .expect("create table");
1255 sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')")
1256 .execute(&pool)
1257 .await
1258 .expect("seed rows");
1259
1260 let mut config = SqlEndpointConfig::from_uri(
1261 "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1262 )
1263 .unwrap();
1264 config.resolve_defaults();
1265
1266 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1267 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1268
1269 let (tx, rx) = mpsc::channel::<ExchangeEnvelope>(8);
1270 let (result_tx, result_rx) = tokio::sync::oneshot::channel::<Exchange>();
1271 tokio::spawn(async move {
1272 let mut rx = rx;
1273 if let Some(env) = rx.recv().await {
1274 if let Some(reply_tx) = env.reply_tx {
1275 let _ = reply_tx.send(Ok(env.exchange.clone()));
1276 }
1277 let _ = result_tx.send(env.exchange);
1278 }
1279 });
1280 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1281
1282 consumer
1283 .poll_database(&pool, &ctx, &template)
1284 .await
1285 .expect("poll must succeed");
1286
1287 let exchange = result_rx.await.expect("should have received one exchange");
1288
1289 match exchange.input.body {
1290 Body::Stream(ref stream_body) => {
1291 let stream = stream_body.stream.clone();
1292 let mut guard = stream.lock().await;
1293 let stream_opt = guard.take();
1294 assert!(stream_opt.is_some(), "stream should be present");
1295
1296 use futures::StreamExt;
1297 let mut collected = Vec::new();
1298 let mut stream = stream_opt.unwrap();
1299 while let Some(chunk) = stream.next().await {
1300 let chunk = chunk.expect("stream chunk should not error");
1301 collected.extend_from_slice(&chunk);
1302 }
1303
1304 let ndjson = String::from_utf8(collected).expect("valid utf8");
1305 let lines: Vec<&str> = ndjson.trim().lines().collect();
1306 assert_eq!(lines.len(), 3, "should have 3 NDJSON lines");
1307
1308 let row0: serde_json::Value =
1309 serde_json::from_str(lines[0]).expect("valid json line 0");
1310 assert_eq!(row0["id"], 1);
1311 assert_eq!(row0["name"], "alpha");
1312
1313 let row1: serde_json::Value =
1314 serde_json::from_str(lines[1]).expect("valid json line 1");
1315 assert_eq!(row1["id"], 2);
1316 assert_eq!(row1["name"], "beta");
1317
1318 let row2: serde_json::Value =
1319 serde_json::from_str(lines[2]).expect("valid json line 2");
1320 assert_eq!(row2["id"], 3);
1321 assert_eq!(row2["name"], "gamma");
1322 }
1323 ref other => panic!("expected Body::Stream, got {:?}", other),
1324 }
1325 }
1326
1327 #[tokio::test]
1328 async fn stream_list_consumer_empty_result_set_emits_empty_stream() {
1329 let pool = sqlite_pool().await;
1330 sqlx::query("CREATE TABLE empty_items (id INTEGER PRIMARY KEY, name TEXT)")
1331 .execute(&pool)
1332 .await
1333 .expect("create table");
1334
1335 let mut config = SqlEndpointConfig::from_uri(
1336 "sql:select id, name from empty_items?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1337 )
1338 .unwrap();
1339 config.resolve_defaults();
1340
1341 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1342 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1343
1344 let (tx, rx) = tokio::sync::oneshot::channel();
1345 let (mpsc_tx, mut mpsc_rx) = mpsc::channel::<ExchangeEnvelope>(8);
1346 tokio::spawn(async move {
1347 while let Some(env) = mpsc_rx.recv().await {
1348 if let Some(reply_tx) = env.reply_tx {
1349 let _ = reply_tx.send(Ok(env.exchange.clone()));
1350 }
1351 let _ = tx.send(env.exchange);
1352 break;
1353 }
1354 });
1355 let ctx = ConsumerContext::new(
1356 mpsc_tx,
1357 CancellationToken::new(),
1358 "sql-test-route".to_string(),
1359 );
1360
1361 consumer
1362 .poll_database(&pool, &ctx, &template)
1363 .await
1364 .expect("poll must succeed");
1365
1366 let exchange = rx
1367 .await
1368 .expect("StreamList should emit exchange even for empty results");
1369
1370 match exchange.input.body {
1371 Body::Stream(ref stream_body) => {
1372 let stream = stream_body.stream.clone();
1373 let mut guard = stream.lock().await;
1374 let stream_opt = guard.take();
1375
1376 use futures::StreamExt;
1377 let mut count = 0;
1378 if let Some(mut stream) = stream_opt {
1379 while let Some(chunk) = stream.next().await {
1380 let chunk = chunk.expect("stream chunk should not error");
1381 count += chunk.len();
1382 }
1383 }
1384 assert_eq!(count, 0, "empty table should produce zero stream bytes");
1385 }
1386 ref other => panic!("expected Body::Stream, got {:?}", other),
1387 }
1388 }
1389
1390 #[derive(Debug, Default)]
1394 struct RecordingHealth {
1395 forced: Arc<Mutex<Vec<(String, String, String)>>>,
1396 }
1397
1398 impl HealthCheckRegistry for RecordingHealth {
1399 fn force_unhealthy_for_route(&self, route_id: &str, name: &str, reason: &str) {
1400 self.forced.lock().unwrap().push((
1401 route_id.to_string(),
1402 name.to_string(),
1403 reason.to_string(),
1404 ));
1405 }
1406 }
1407
1408 struct NoopMetricsForConsumer;
1409
1410 impl MetricsCollector for NoopMetricsForConsumer {
1411 fn record_exchange_duration(&self, _: &str, _: Duration) {}
1412 fn increment_errors(&self, _: &str, _: &str) {}
1413 fn increment_exchanges(&self, _: &str) {}
1414 fn set_queue_depth(&self, _: &str, _: usize) {}
1415 fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
1416 }
1417
1418 struct RecordingRuntimeWithHealth {
1419 health: Arc<RecordingHealth>,
1420 }
1421
1422 impl RuntimeObservability for RecordingRuntimeWithHealth {
1423 fn metrics(&self) -> Arc<dyn MetricsCollector> {
1424 Arc::new(NoopMetricsForConsumer)
1425 }
1426 fn health(&self) -> Arc<dyn HealthCheckRegistry> {
1427 self.health.clone()
1428 }
1429 }
1430
1431 #[tokio::test]
1434 async fn consumer_pool_init_failure_calls_force_unhealthy_for_route() {
1435 let health = Arc::new(RecordingHealth::default());
1436 let recorded_health = health.clone();
1437 let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntimeWithHealth { health });
1438
1439 let mut config = SqlEndpointConfig::from_uri(
1440 "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false&initialDelay=0&delay=1",
1441 )
1442 .unwrap();
1443 config.max_connections = Some(1);
1444 config.min_connections = Some(0);
1445 config.idle_timeout_secs = Some(300);
1446 config.max_lifetime_secs = Some(1800);
1447
1448 let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), rt);
1449
1450 let (tx, _rx) = mpsc::channel(8);
1451 let ctx = ConsumerContext::new(
1452 tx,
1453 CancellationToken::new(),
1454 "sql-consumer-test-route".to_string(),
1455 );
1456
1457 let result = consumer.start(ctx).await;
1458 assert!(result.is_err(), "pool init should fail with bad db_url");
1459
1460 let forced = recorded_health.forced.lock().unwrap();
1461 assert_eq!(
1462 forced.len(),
1463 1,
1464 "expected one force_unhealthy_for_route call"
1465 );
1466 assert_eq!(forced[0].0, "sql-consumer-test-route");
1467 assert_eq!(forced[0].1, "g:sql:consumer-pool-init");
1468 assert!(!forced[0].2.is_empty(), "reason should be non-empty");
1469 }
1470
1471 #[tokio::test]
1475 async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
1476 use camel_component_api::NetworkRetryPolicy;
1477 use std::sync::Arc;
1478 use std::sync::atomic::{AtomicU32, Ordering};
1479
1480 let policy = NetworkRetryPolicy {
1481 max_attempts: 3,
1482 initial_delay: Duration::from_millis(1),
1483 max_delay: Duration::from_millis(1),
1484 multiplier: 1.0,
1485 ..NetworkRetryPolicy::default()
1486 };
1487
1488 let calls = Arc::new(AtomicU32::new(0));
1489 let calls_clone = Arc::clone(&calls);
1490
1491 let mut attempt: u32 = 0;
1492 let _result: Result<(), ()> = loop {
1493 attempt += 1;
1494 calls_clone.fetch_add(1, Ordering::SeqCst);
1495 let op_result: Result<(), ()> = Err(());
1496 match op_result {
1497 Ok(v) => break Ok(v),
1498 Err(_) if policy.should_retry(attempt) => {
1499 let delay = policy.delay_for(attempt - 1);
1500 tokio::time::sleep(delay).await;
1501 continue;
1502 }
1503 Err(_) => break Err(()),
1504 }
1505 };
1506
1507 assert_eq!(
1508 calls.load(Ordering::SeqCst),
1509 3,
1510 "max_attempts=3 must yield exactly 3 invocations"
1511 );
1512 }
1513}