Skip to main content

camel_component_sql/
consumer.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures::TryStreamExt;
7use serde_json::Value as JsonValue;
8use sqlx::AnyPool;
9use sqlx::any::AnyPoolOptions;
10use sqlx::any::AnyRow;
11use tokio::sync::OnceCell;
12use tracing::{debug, error, info, warn};
13
14use camel_component_api::retry_async;
15use camel_component_api::{
16    Body, CamelError, Exchange, Message, RuntimeObservability, StreamBody, StreamMetadata,
17};
18use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
19
20use crate::config::{
21    PollStrategy, ProcessingStrategy, SqlEndpointConfig, SqlOutputType, TransactionMode,
22    enrich_db_url_with_ssl, redact_db_url,
23};
24use crate::headers;
25use crate::query::{QueryTemplate, parse_query_template, resolve_params};
26use crate::utils::{bind_json_values, is_retryable_sqlx_error, row_to_json};
27
28pub struct SqlConsumer {
29    pub(crate) config: SqlEndpointConfig,
30    pub(crate) pool: Arc<OnceCell<AnyPool>>,
31    stopped: bool,
32    /// Phase B will use this for `rt.metrics().increment_errors(...)` and
33    /// `rt.health().force_unhealthy_for_route(...)` calls per ADR-0012.
34    #[allow(dead_code)]
35    runtime: Arc<dyn RuntimeObservability>,
36}
37
38impl SqlConsumer {
39    pub fn new(
40        config: SqlEndpointConfig,
41        pool: Arc<OnceCell<AnyPool>>,
42        runtime: Arc<dyn RuntimeObservability>,
43    ) -> Self {
44        Self {
45            config,
46            pool,
47            stopped: false,
48            runtime,
49        }
50    }
51
52    /// Poll the database for new rows and process them.
53    async fn poll_database(
54        &self,
55        pool: &AnyPool,
56        context: &ConsumerContext,
57        template: &QueryTemplate,
58    ) -> Result<(), CamelError> {
59        // Create an empty exchange for parameter resolution (consumer has no input)
60        let empty_exchange = Exchange::new(Message::default());
61
62        // Resolve parameters
63        let prepared = resolve_params(template, &empty_exchange, &self.config.in_separator)?;
64
65        debug!(query = %prepared.sql, "executing SQL consumer poll");
66
67        if self.config.output_type == SqlOutputType::StreamList {
68            return self.poll_database_stream(pool, context, &prepared).await;
69        }
70
71        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
72        let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
73            warn!(error = %e, "SQL consumer poll query failed");
74            CamelError::ProcessorError(format!("Query execution failed: {}", e))
75        })?;
76
77        debug!(rows = rows.len(), "SQL consumer poll completed");
78
79        if rows.is_empty() && !self.config.route_empty_result_set {
80            return Ok(());
81        }
82
83        let rows_to_process: Vec<AnyRow> = if let Some(max) = self.config.max_messages_per_poll {
84            if max > 0 {
85                rows.into_iter().take(max as usize).collect()
86            } else {
87                rows
88            }
89        } else {
90            rows
91        };
92
93        if self.config.use_iterator {
94            // Process each row individually
95            for row in rows_to_process {
96                let row_json = row_to_json(&row)?;
97
98                // Create exchange with the row as JSON body
99                let mut msg = Message::new(Body::Json(row_json.clone()));
100
101                // Set individual column headers with CamelSql. prefix per Apache Camel convention
102                if let Some(obj) = row_json.as_object() {
103                    for (key, value) in obj {
104                        msg.set_header(format!("CamelSql.{}", key), value.clone());
105                    }
106                }
107
108                let exchange = Exchange::new(msg);
109
110                // Send and wait for processing
111                let result = context.send_and_wait(exchange).await;
112
113                // Handle post-processing (onConsume/onConsumeFailed)
114                if let Err(e) = self.handle_post_processing(pool, &result, &row_json).await {
115                    // TODO(ADR-0012-e-metrics): wire increment_errors("b-prime:sql:on-consume") via bd rc-mf3
116                    error!(error = %e, "Post-processing failed"); // allow-log-levels
117                    if self.config.break_batch_on_consume_fail {
118                        return Err(e);
119                    }
120                }
121
122                // If downstream processing itself failed, honour break_batch_on_consume_fail
123                if let Err(ref consume_err) = result
124                    && self.config.break_batch_on_consume_fail
125                {
126                    return Err(consume_err.clone());
127                }
128            }
129        } else {
130            // Process all rows as a single batch
131            let rows_json: Vec<JsonValue> = rows_to_process
132                .iter()
133                .map(row_to_json)
134                .collect::<Result<Vec<_>, CamelError>>()?;
135
136            let row_count = rows_json.len();
137
138            // Create exchange with array of rows
139            let mut msg = Message::new(Body::Json(JsonValue::Array(rows_json.clone())));
140            msg.set_header(headers::ROW_COUNT, JsonValue::Number(row_count.into()));
141
142            let exchange = Exchange::new(msg);
143
144            // Send and wait for result
145            let result = context.send_and_wait(exchange).await;
146
147            // SQL-021: Run per-row post-processing even in batch mode so that
148            // onConsume/onConsumeFailed queries can reference row-specific parameters
149            // (e.g. `:#id`). Each row gets its own post-processing query execution.
150            for row_json in rows_json.iter() {
151                if let Err(e) = self.handle_post_processing(pool, &result, row_json).await {
152                    // TODO(ADR-0012-e-metrics): wire increment_errors("b-prime:sql:on-consume-batch") via bd rc-mf3
153                    error!(error = %e, "Post-processing failed for batch row"); // allow-log-levels
154                    if self.config.break_batch_on_consume_fail {
155                        return Err(e);
156                    }
157                }
158            }
159
160            // If downstream processing itself failed, honour break_batch_on_consume_fail
161            if let Err(ref consume_err) = result
162                && self.config.break_batch_on_consume_fail
163            {
164                return Err(consume_err.clone());
165            }
166        }
167
168        // Execute on_consume_batch_complete if configured
169        if let Some(ref batch_query) = self.config.on_consume_batch_complete {
170            let _ = self
171                .execute_post_query(pool, batch_query, &JsonValue::Null)
172                .await;
173        }
174
175        Ok(())
176    }
177
178    async fn poll_database_stream(
179        &self,
180        pool: &AnyPool,
181        context: &ConsumerContext,
182        prepared: &crate::query::PreparedQuery,
183    ) -> Result<(), CamelError> {
184        let pool_clone = pool.clone();
185        let sql_str = prepared.sql.clone();
186        let bindings = prepared.bindings.clone();
187
188        let byte_stream = async_stream::try_stream! {
189            let mut q = sqlx::query(&sql_str);
190            q = bind_json_values(q, &bindings);
191            let mut rows = q.fetch(&pool_clone);
192            while let Some(row) = rows.try_next().await.map_err(|e| {
193                CamelError::ProcessorError(format!("Query execution failed: {}", e))
194            })? {
195                let json_val = row_to_json(&row).map_err(|e| {
196                    CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
197                })?;
198                let mut bytes = serde_json::to_vec(&json_val)
199                    .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
200                bytes.push(b'\n');
201                yield Bytes::from(bytes);
202            }
203        };
204
205        let msg = Message::new(Body::Stream(StreamBody {
206            stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
207            metadata: StreamMetadata {
208                content_type: Some("application/x-ndjson".to_string()),
209                size_hint: None,
210                origin: None,
211            },
212        }));
213
214        let exchange = Exchange::new(msg);
215        let result = context.send_and_wait(exchange).await;
216        if let Err(e) = result {
217            // log-policy: outside-contract
218            // (category b′: send_and_wait returned Err on a normal-data send,
219            // meaning the route handler did NOT absorb the failure —
220            // see ADR-0012 "b-bridged discriminator". This emitter is the
221            // only ERROR signal for the unhandled failure; must stay loud.
222            // Regression-tested by unbridged_send_and_wait_failure_emits_error_loud.)
223            // TODO(ADR-0012-e-metrics): wire increment_errors("b-prime:sql:stream-list") via bd rc-mf3
224            error!(error = %e, "StreamList consumer downstream processing failed"); // allow-log-levels
225            return Err(e);
226        }
227
228        debug!("StreamList: consumer poll completed (lazy stream emitted)");
229        Ok(())
230    }
231
232    /// Handle post-processing after a row is processed (onConsume/onConsumeFailed).
233    async fn handle_post_processing(
234        &self,
235        pool: &AnyPool,
236        result: &Result<Exchange, CamelError>,
237        row_json: &JsonValue,
238    ) -> Result<(), CamelError> {
239        match result {
240            Ok(_) => {
241                // Success - execute onConsume if configured
242                if let Some(ref on_consume) = self.config.on_consume {
243                    self.execute_post_query(pool, on_consume, row_json).await?;
244                }
245            }
246            Err(_) => {
247                // Failure - execute onConsumeFailed if configured
248                if let Some(ref on_consume_failed) = self.config.on_consume_failed {
249                    self.execute_post_query(pool, on_consume_failed, row_json)
250                        .await?;
251                }
252            }
253        }
254        Ok(())
255    }
256
257    /// Execute a post-processing query with the row data as parameters.
258    async fn execute_post_query(
259        &self,
260        pool: &AnyPool,
261        query_str: &str,
262        row_json: &JsonValue,
263    ) -> Result<(), CamelError> {
264        // Parse the query template
265        let template = parse_query_template(query_str, self.config.placeholder)?;
266
267        // Create a temporary exchange with the row as body for parameter resolution
268        // Populate CamelSql.* headers so named params can reference them
269        let mut temp_msg = Message::new(Body::Json(row_json.clone()));
270        if let Some(obj) = row_json.as_object() {
271            for (key, value) in obj {
272                temp_msg.set_header(format!("CamelSql.{}", key), value.clone());
273            }
274        }
275        let temp_exchange = Exchange::new(temp_msg);
276
277        // Resolve parameters
278        let prepared = resolve_params(&template, &temp_exchange, &self.config.in_separator)?;
279
280        // Build and execute the query
281        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
282        let result = query.execute(pool).await.map_err(|e| {
283            CamelError::ProcessorError(format!("Post-query execution failed: {}", e))
284        })?;
285
286        // Warn if 0 rows affected (the row may not have been marked correctly)
287        if result.rows_affected() == 0 {
288            warn!(
289                query = query_str,
290                "Post-processing query affected 0 rows — the row may not have been marked correctly"
291            );
292        }
293
294        Ok(())
295    }
296
297    /// Handle the result of a single poll cycle, including bridging if configured.
298    /// Extracted from `run()` so tests can exercise the error-handling branch directly.
299    async fn handle_poll_result(
300        &self,
301        pool: &AnyPool,
302        context: &ConsumerContext,
303        template: &QueryTemplate,
304    ) {
305        if let Err(e) = self.poll_database(pool, context, template).await {
306            if self.config.bridge_error_handler {
307                // log-policy: handler-owned
308                // (category b-bridged: error will be wrapped as Exchange
309                // and flow into the route's error handler)
310                warn!(error = %e, "SQL consumer poll failed (bridged)");
311                if let Err(route_err) = self.bridge_poll_error(context, e).await {
312                    // (the bridge channel itself broke — route will CrashNotification per ADR-0007)
313                    // log-policy: system-broken
314                    error!(error = %route_err, "Failed to bridge SQL consumer error to route");
315                }
316            } else {
317                // TODO(ADR-0012-e-metrics): wire increment_errors("b-prime:sql:poll-failed")
318                //   via bd rc-mf3 (Endpoint trait change to expose ComponentContext::metrics())
319                error!(error = %e, "SQL consumer poll failed"); // allow-log-levels
320            }
321        }
322    }
323
324    async fn bridge_poll_error(
325        &self,
326        context: &ConsumerContext,
327        error: CamelError,
328    ) -> Result<(), CamelError> {
329        if !self.config.bridge_error_handler {
330            return Ok(());
331        }
332        let mut exchange = Exchange::new(Message::default());
333        exchange.set_error(error);
334        context.send_and_wait(exchange).await.map(|_| ())
335    }
336}
337
338#[async_trait]
339impl Consumer for SqlConsumer {
340    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
341        // Reject double-start
342        if self.stopped {
343            return Err(CamelError::Config(
344                "SQL consumer cannot be restarted after stop".into(),
345            ));
346        }
347
348        // Step 1: Initialize the connection pool
349        let pool = self
350            .pool
351            .get_or_try_init(|| async {
352                // Defensive: ensure config is resolved even if caller didn't use create_endpoint
353                self.config.resolve_defaults();
354                // SQL-014: resolve file-based query asynchronously (not blocking)
355                self.config.resolve_file_query().await?;
356
357                // Install all compiled-in sqlx drivers so AnyPool can resolve them.
358                // This is idempotent; safe to call multiple times.
359                sqlx::any::install_default_drivers();
360                let db_url = enrich_db_url_with_ssl(&self.config.db_url, &self.config)?;
361
362                let max_conn = self.config.max_connections.ok_or_else(|| {
363                    CamelError::Config("max_connections not resolved for SQL consumer pool".into())
364                })?;
365                let min_conn = self.config.min_connections.ok_or_else(|| {
366                    CamelError::Config("min_connections not resolved for SQL consumer pool".into())
367                })?;
368                let idle_timeout = self.config.idle_timeout_secs.ok_or_else(|| {
369                    CamelError::Config(
370                        "idle_timeout_secs not resolved for SQL consumer pool".into(),
371                    )
372                })?;
373                let max_lifetime = self.config.max_lifetime_secs.ok_or_else(|| {
374                    CamelError::Config(
375                        "max_lifetime_secs not resolved for SQL consumer pool".into(),
376                    )
377                })?;
378
379                info!(
380                    db_url = %redact_db_url(&self.config.db_url),
381                    "SQL consumer pool initializing"
382                );
383                let retry_policy = &self.config.retry;
384                retry_async::<_, _, _, _, sqlx::Error>(
385                    retry_policy,
386                    Some("sql-consumer"),
387                    || {
388                        AnyPoolOptions::new()
389                            .max_connections(max_conn)
390                            .min_connections(min_conn)
391                            .idle_timeout(Duration::from_secs(idle_timeout))
392                            .max_lifetime(Duration::from_secs(max_lifetime))
393                            .connect(&db_url)
394                    },
395                    is_retryable_sqlx_error,
396                )
397                .await
398                .map_err(|e| {
399                    // TODO(ADR-0012-g): replace with force_unhealthy_for_route once bd rc-1mo lands
400                    error!(error = %e, db_url = %redact_db_url(&self.config.db_url), "SQL connect failed, giving up"); // allow-log-levels
401                    CamelError::EndpointCreationFailed(format!(
402                        "Failed to connect to database: {}",
403                        e
404                    ))
405                })
406            })
407            .await?;
408
409        // SQL-002: warn if Managed transaction mode requested
410        if self.config.transaction_mode == TransactionMode::Managed {
411            warn!("transactionManager not yet implemented; using Auto mode");
412        }
413
414        // SQL-017/SQL-018: log processing and poll strategies
415        if self.config.processing_strategy == ProcessingStrategy::Scheduled {
416            debug!(
417                "Processing strategy: Scheduled (rows dispatched individually via send_and_wait)"
418            );
419        }
420        if self.config.poll_strategy == PollStrategy::Burst {
421            debug!("Poll strategy: Burst (rapid successive polls)");
422        }
423
424        if self.config.output_type == SqlOutputType::StreamList
425            && (self.config.on_consume.is_some()
426                || self.config.on_consume_failed.is_some()
427                || self.config.on_consume_batch_complete.is_some())
428        {
429            warn!(
430                "onConsume/onConsumeFailed/onConsumeBatchComplete are not executed in StreamList mode \
431                 (rows are consumed lazily downstream)"
432            );
433        }
434
435        // Warn if no onConsume configured
436        if self.config.on_consume.is_none() {
437            warn!(
438                "SQL consumer started without onConsume configured — consumed rows will not be marked/deleted"
439            );
440        }
441
442        info!(
443            db_url = %redact_db_url(&self.config.db_url),
444            query_len = self.config.query.len(),
445            "SQL consumer started"
446        );
447
448        // Step 2: Parse query template once (avoid re-parsing every poll)
449        let template = parse_query_template(&self.config.query, self.config.placeholder)
450            .map_err(|e| CamelError::Config(format!("Invalid query template: {}", e)))?;
451
452        // Step 3: Initial delay before starting polling
453        if self.config.initial_delay_ms > 0 {
454            tokio::select! {
455                _ = context.cancelled() => {
456                    info!("SQL consumer stopped during initial delay");
457                    return Ok(());
458                }
459                _ = tokio::time::sleep(Duration::from_millis(self.config.initial_delay_ms)) => {}
460            }
461        }
462
463        // Step 4: Polling loop
464        //
465        // This is a POLLING LOOP with fixed cadence (delay_ms), NOT a
466        // retry loop. It polls the database until cancelled or repeat_count
467        // is reached — there is no "transient error → retry with backoff"
468        // contract at this level. retry_async / retry_async_cancelable do
469        // not apply because they are designed for bounded retry, not
470        // repeated polling with uniform delay.
471        //
472        // The pool-connect retry at startup (Step 1) was migrated to
473        // retry_async in rc-d2r. The per-poll error handling (poll_database
474        // failures) is an error-bridge pattern, not a retry loop.
475        //
476        // See camel-redis/src/consumer.rs:325 for a similar polling-loop
477        // justification.
478        let mut poll_count: u32 = 0;
479        loop {
480            // SQL-015: check repeat_count limit
481            if let Some(max_repeats) = self.config.repeat_count
482                && poll_count >= max_repeats
483            {
484                info!(
485                    repeat_count = max_repeats,
486                    "SQL consumer reached repeat_count limit, stopping"
487                );
488                break;
489            }
490
491            tokio::select! {
492                _ = context.cancelled() => {
493                    info!("SQL consumer stopped");
494                    break;
495                }
496                _ = tokio::time::sleep(Duration::from_millis(self.config.delay_ms)) => {
497                    poll_count += 1;
498                    self.handle_poll_result(pool, &context, &template).await;
499                }
500            }
501        }
502
503        Ok(())
504    }
505
506    async fn stop(&mut self) -> Result<(), CamelError> {
507        // Double-stop is safe — no-op after first stop
508        if self.stopped {
509            debug!("SQL consumer stop called on already-stopped consumer");
510            return Ok(());
511        }
512
513        // Close the connection pool if it was initialized
514        if let Some(pool) = self.pool.get() {
515            debug!("SQL consumer closing connection pool");
516            pool.close().await;
517            debug!("SQL consumer pool closed");
518        }
519
520        self.stopped = true;
521        info!("SQL consumer stopped");
522        Ok(())
523    }
524
525    fn concurrency_model(&self) -> ConcurrencyModel {
526        // Sequential is correct for SQL consumers: concurrent polls would fetch
527        // duplicate rows. The design doc mentioned SharedState (which doesn't exist
528        // in this runtime) — Sequential is the correct equivalent.
529        ConcurrencyModel::Sequential
530    }
531}
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536    use camel_component_api::test_support::PanicRuntimeObservability;
537    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
538        std::sync::Arc::new(PanicRuntimeObservability)
539    }
540    use crate::config::SqlEndpointConfig;
541    use camel_component_api::ExchangeEnvelope;
542    use camel_component_api::UriConfig;
543    use sqlx::any::AnyPoolOptions;
544    use std::sync::Arc;
545    use std::time::Duration;
546    use tokio::sync::mpsc;
547    use tokio_util::sync::CancellationToken;
548
549    async fn sqlite_pool() -> AnyPool {
550        sqlx::any::install_default_drivers();
551        AnyPoolOptions::new()
552            .max_connections(1)
553            .connect("sqlite::memory:")
554            .await
555            .expect("sqlite pool")
556    }
557
558    async fn seed_consumer_table(pool: &AnyPool) {
559        sqlx::query("CREATE TABLE jobs (id INTEGER PRIMARY KEY, processed INTEGER DEFAULT 0, failed INTEGER DEFAULT 0)")
560            .execute(pool)
561            .await
562            .expect("create table");
563        sqlx::query("INSERT INTO jobs (id, processed, failed) VALUES (1, 0, 0), (2, 0, 0)")
564            .execute(pool)
565            .await
566            .expect("seed rows");
567    }
568
569    fn config() -> SqlEndpointConfig {
570        let mut c =
571            SqlEndpointConfig::from_uri("sql:select * from t?db_url=postgres://localhost/test")
572                .unwrap();
573        c.resolve_defaults();
574        c
575    }
576
577    #[test]
578    fn consumer_concurrency_model() {
579        let c = SqlConsumer::new(config(), Arc::new(OnceCell::new()), test_rt());
580        assert_eq!(c.concurrency_model(), ConcurrencyModel::Sequential);
581    }
582
583    #[test]
584    fn consumer_stores_config() {
585        let mut config = SqlEndpointConfig::from_uri(
586            "sql:select * from t?db_url=postgres://localhost/test&delay=2000&onConsume=update t set done=true"
587        ).unwrap();
588        config.resolve_defaults();
589        let c = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
590        assert_eq!(c.config.delay_ms, 2000);
591        assert!(c.config.on_consume.is_some());
592    }
593
594    #[tokio::test]
595    async fn poll_database_runs_on_consume_for_successful_rows() {
596        let pool = sqlite_pool().await;
597        seed_consumer_table(&pool).await;
598
599        let mut config = SqlEndpointConfig::from_uri(
600            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
601        )
602        .unwrap();
603        config.resolve_defaults();
604
605        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
606        let template = parse_query_template(&config.query, config.placeholder).unwrap();
607
608        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
609        tokio::spawn(async move {
610            while let Some(env) = rx.recv().await {
611                if let Some(reply_tx) = env.reply_tx {
612                    let _ = reply_tx.send(Ok(env.exchange));
613                }
614            }
615        });
616        let ctx = ConsumerContext::new(tx, CancellationToken::new());
617
618        consumer
619            .poll_database(&pool, &ctx, &template)
620            .await
621            .expect("poll must succeed");
622
623        let row = sqlx::query("select processed from jobs where id = 1")
624            .fetch_one(&pool)
625            .await
626            .expect("row 1");
627        let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
628
629        let row = sqlx::query("select processed from jobs where id = 2")
630            .fetch_one(&pool)
631            .await
632            .expect("row 2");
633        let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
634
635        assert_eq!(processed_1, 1);
636        assert_eq!(processed_2, 1);
637    }
638
639    #[tokio::test]
640    async fn poll_database_runs_on_consume_failed_when_downstream_fails() {
641        let pool = sqlite_pool().await;
642        seed_consumer_table(&pool).await;
643
644        let mut config = SqlEndpointConfig::from_uri(
645            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&initialDelay=0&delay=1",
646        )
647        .unwrap();
648        config.resolve_defaults();
649
650        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
651        let template = parse_query_template(&config.query, config.placeholder).unwrap();
652
653        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
654        tokio::spawn(async move {
655            while let Some(env) = rx.recv().await {
656                if let Some(reply_tx) = env.reply_tx {
657                    let _ =
658                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
659                }
660            }
661        });
662        let ctx = ConsumerContext::new(tx, CancellationToken::new());
663
664        consumer
665            .poll_database(&pool, &ctx, &template)
666            .await
667            .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
668
669        let row = sqlx::query("select failed from jobs where id = 1")
670            .fetch_one(&pool)
671            .await
672            .expect("row 1");
673        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
674
675        let row = sqlx::query("select failed from jobs where id = 2")
676            .fetch_one(&pool)
677            .await
678            .expect("row 2");
679        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
680
681        assert_eq!(failed_1, 1);
682        assert_eq!(failed_2, 1);
683    }
684
685    #[tokio::test]
686    async fn poll_database_breaks_batch_on_consume_fail() {
687        let pool = sqlite_pool().await;
688        seed_consumer_table(&pool).await;
689
690        let mut config = SqlEndpointConfig::from_uri(
691            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&breakBatchOnConsumeFail=true&initialDelay=0&delay=1",
692        )
693        .unwrap();
694        config.resolve_defaults();
695
696        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
697        let template = parse_query_template(&config.query, config.placeholder).unwrap();
698
699        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
700        tokio::spawn(async move {
701            while let Some(env) = rx.recv().await {
702                if let Some(reply_tx) = env.reply_tx {
703                    let _ =
704                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
705                }
706            }
707        });
708        let ctx = ConsumerContext::new(tx, CancellationToken::new());
709
710        let err = consumer
711            .poll_database(&pool, &ctx, &template)
712            .await
713            .expect_err("must stop on first downstream failure");
714        assert!(err.to_string().contains("downstream boom"));
715
716        let row = sqlx::query("select failed from jobs where id = 1")
717            .fetch_one(&pool)
718            .await
719            .expect("row 1");
720        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
721
722        let row = sqlx::query("select failed from jobs where id = 2")
723            .fetch_one(&pool)
724            .await
725            .expect("row 2");
726        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
727
728        assert_eq!(failed_1, 1);
729        assert_eq!(failed_2, 0, "second row must not be processed");
730    }
731
732    // --- Phase B hardening tests ---
733
734    // SQL-001: Direct consumer construction without resolve_defaults does not panic.
735    // The consumer defensively calls resolve_defaults() during pool init, so the pool
736    // fields get resolved. This test verifies no panic occurs.
737    #[tokio::test]
738    async fn consumer_no_panic_without_prior_resolve_defaults() {
739        let config = SqlEndpointConfig::from_uri(
740            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
741        )
742        .unwrap();
743        // Deliberately NOT calling resolve_defaults() — pool fields remain None
744        assert!(config.max_connections.is_none());
745
746        let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), test_rt());
747        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
748        tokio::spawn(async move {
749            while let Some(env) = rx.recv().await {
750                if let Some(reply_tx) = env.reply_tx {
751                    let _ = reply_tx.send(Ok(env.exchange));
752                }
753            }
754        });
755        let token = CancellationToken::new();
756        let ctx = ConsumerContext::new(tx, token.clone());
757
758        // Spawn the consumer and cancel it quickly — it should not panic
759        let consumer_handle = tokio::spawn(async move { consumer.start(ctx).await });
760
761        // Cancel after a short delay
762        tokio::time::sleep(Duration::from_millis(50)).await;
763        token.cancel();
764
765        let result = consumer_handle.await.expect("task should not panic");
766        // Should complete without panic (may be Ok or Err depending on timing)
767        let _ = result;
768    }
769
770    // SQL-008: stop() closes the pool
771    #[tokio::test]
772    async fn stop_closes_pool() {
773        let pool = sqlite_pool().await;
774        seed_consumer_table(&pool).await;
775
776        let mut config = SqlEndpointConfig::from_uri(
777            "sql:select id from jobs?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
778        )
779        .unwrap();
780        config.resolve_defaults();
781
782        let pool_cell = Arc::new(OnceCell::new());
783        pool_cell.set(pool.clone()).unwrap();
784
785        let mut consumer = SqlConsumer::new(config, pool_cell, test_rt());
786        consumer.stop().await.expect("stop should succeed");
787
788        // After stop, the pool should be closed
789        assert!(
790            pool.is_closed(),
791            "Pool should be closed after consumer.stop()"
792        );
793    }
794
795    // SQL-008: double-stop is safe
796    #[tokio::test]
797    async fn double_stop_is_safe() {
798        let pool = sqlite_pool().await;
799        let mut config = SqlEndpointConfig::from_uri(
800            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
801        )
802        .unwrap();
803        config.resolve_defaults();
804
805        let pool_cell = Arc::new(OnceCell::new());
806        pool_cell.set(pool.clone()).unwrap();
807
808        let mut consumer = SqlConsumer::new(config, pool_cell, test_rt());
809        consumer.stop().await.expect("first stop should succeed");
810        consumer
811            .stop()
812            .await
813            .expect("second stop should also succeed");
814    }
815
816    // SQL-008: start after stop is rejected
817    #[tokio::test]
818    async fn start_after_stop_rejected() {
819        let pool = sqlite_pool().await;
820        let mut config = SqlEndpointConfig::from_uri(
821            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
822        )
823        .unwrap();
824        config.resolve_defaults();
825
826        let pool_cell = Arc::new(OnceCell::new());
827        pool_cell.set(pool.clone()).unwrap();
828
829        let mut consumer = SqlConsumer::new(config, pool_cell, test_rt());
830        consumer.stop().await.expect("stop should succeed");
831
832        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
833        tokio::spawn(async move {
834            while let Some(env) = rx.recv().await {
835                if let Some(reply_tx) = env.reply_tx {
836                    let _ = reply_tx.send(Ok(env.exchange));
837                }
838            }
839        });
840        let ctx = ConsumerContext::new(tx, CancellationToken::new());
841
842        let result = consumer.start(ctx).await;
843        assert!(result.is_err());
844        let err_msg = result.unwrap_err().to_string();
845        assert!(
846            err_msg.contains("cannot be restarted") || err_msg.contains("after stop"),
847            "Expected restart error, got: {}",
848            err_msg
849        );
850    }
851
852    // SQL-021: batch mode per-row post-processing
853    #[tokio::test]
854    async fn batch_mode_per_row_post_processing() {
855        let pool = sqlite_pool().await;
856        seed_consumer_table(&pool).await;
857
858        let mut config = SqlEndpointConfig::from_uri(
859            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
860        )
861        .unwrap();
862        config.resolve_defaults();
863
864        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
865        let template = parse_query_template(&config.query, config.placeholder).unwrap();
866
867        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
868        tokio::spawn(async move {
869            while let Some(env) = rx.recv().await {
870                if let Some(reply_tx) = env.reply_tx {
871                    let _ = reply_tx.send(Ok(env.exchange));
872                }
873            }
874        });
875        let ctx = ConsumerContext::new(tx, CancellationToken::new());
876
877        consumer
878            .poll_database(&pool, &ctx, &template)
879            .await
880            .expect("poll must succeed");
881
882        // SQL-021: Each row should have been processed individually via onConsume
883        let row = sqlx::query("select processed from jobs where id = 1")
884            .fetch_one(&pool)
885            .await
886            .expect("row 1");
887        let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
888
889        let row = sqlx::query("select processed from jobs where id = 2")
890            .fetch_one(&pool)
891            .await
892            .expect("row 2");
893        let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
894
895        assert_eq!(
896            processed_1, 1,
897            "row 1 should be marked processed via per-row onConsume"
898        );
899        assert_eq!(
900            processed_2, 1,
901            "row 2 should be marked processed via per-row onConsume"
902        );
903    }
904
905    // SQL-021: batch mode per-row onConsumeFailed when downstream fails
906    #[tokio::test]
907    async fn batch_mode_per_row_post_processing_on_failure() {
908        let pool = sqlite_pool().await;
909        seed_consumer_table(&pool).await;
910
911        let mut config = SqlEndpointConfig::from_uri(
912            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
913        )
914        .unwrap();
915        config.resolve_defaults();
916
917        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
918        let template = parse_query_template(&config.query, config.placeholder).unwrap();
919
920        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
921        tokio::spawn(async move {
922            while let Some(env) = rx.recv().await {
923                if let Some(reply_tx) = env.reply_tx {
924                    let _ =
925                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
926                }
927            }
928        });
929        let ctx = ConsumerContext::new(tx, CancellationToken::new());
930
931        consumer
932            .poll_database(&pool, &ctx, &template)
933            .await
934            .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
935
936        // SQL-021: Each row should have onConsumeFailed executed individually
937        let row = sqlx::query("select failed from jobs where id = 1")
938            .fetch_one(&pool)
939            .await
940            .expect("row 1");
941        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
942
943        let row = sqlx::query("select failed from jobs where id = 2")
944            .fetch_one(&pool)
945            .await
946            .expect("row 2");
947        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
948
949        assert_eq!(
950            failed_1, 1,
951            "row 1 should be marked failed via per-row onConsumeFailed"
952        );
953        assert_eq!(
954            failed_2, 1,
955            "row 2 should be marked failed via per-row onConsumeFailed"
956        );
957    }
958
959    #[tokio::test]
960    async fn bridge_error_handler_routes_poll_errors_to_exchange_error() {
961        let mut config = config();
962        config.bridge_error_handler = true;
963        let consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), test_rt());
964
965        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
966        tokio::spawn(async move {
967            while let Some(env) = rx.recv().await {
968                assert!(env.exchange.error.is_some(), "exchange must carry error");
969                if let Some(reply_tx) = env.reply_tx {
970                    let _ = reply_tx.send(Ok(env.exchange));
971                }
972                break;
973            }
974        });
975
976        let ctx = ConsumerContext::new(tx, CancellationToken::new());
977        consumer
978            .bridge_poll_error(&ctx, CamelError::ProcessorError("poll failed".into()))
979            .await
980            .expect("bridging should succeed");
981    }
982
983    /// Regression for ADR-0012: when bridge_error_handler=true, the poll
984    /// failure must NOT emit error! (the route's error handler owns ERROR
985    /// for bridged failures). Was previously duplicated at line 429 + 431.
986    #[tracing_test::traced_test]
987    #[tokio::test]
988    async fn bridged_poll_failure_emits_warn_not_error() {
989        let pool = sqlite_pool().await;
990        // Do NOT create any table — the query against a non-existent
991        // table will fail at fetch_all, returning Err BEFORE any
992        // downstream send (so lines 103/205 are never reached).
993
994        let mut config = config();
995        config.bridge_error_handler = true;
996        // Query a non-existent table to trigger a query-failure poll error.
997        config.query = "select * from nonexistent_table".to_string();
998        config.resolve_defaults();
999        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1000        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1001
1002        // Healthy downstream — replies Ok so bridge_poll_error succeeds
1003        // and does NOT emit its own error!.
1004        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1005        tokio::spawn(async move {
1006            while let Some(env) = rx.recv().await {
1007                if let Some(reply_tx) = env.reply_tx {
1008                    let _ = reply_tx.send(Ok(env.exchange));
1009                }
1010            }
1011        });
1012        let ctx = ConsumerContext::new(tx, CancellationToken::new());
1013
1014        // Drive poll — fetch_all will fail because the table is missing.
1015        consumer.handle_poll_result(&pool, &ctx, &template).await;
1016
1017        // The bridged path must NOT emit ERROR (handler owns it).
1018        assert!(
1019            !logs_contain("ERROR"),
1020            "bridged poll failure must not emit ERROR (handler owns it); check captured logs for stray ERROR lines"
1021        );
1022        // Sanity: warn! was emitted so the failure is still visible.
1023        assert!(
1024            logs_contain("WARN"),
1025            "bridged poll failure should emit warn! for operator visibility"
1026        );
1027    }
1028
1029    /// Regression for ADR-0012 "b-bridged discriminator": when
1030    /// send_and_wait returns Err on a NORMAL-DATA send (i.e., not a
1031    /// deliberate bridge_poll_error handoff), the route handler did NOT
1032    /// absorb the failure (consumer.rs:77-91 contract; error_handler.rs
1033    /// returns Ok in every branch). The consumer's error! is the only
1034    /// ERROR signal for the unhandled failure and MUST stay at error!.
1035    ///
1036    /// Protects consumer.rs:205 (StreamList downstream send) and any
1037    /// future site that uses send_and_wait on a non-bridge path.
1038    #[tracing_test::traced_test]
1039    #[tokio::test]
1040    async fn unbridged_send_and_wait_failure_emits_error_loud() {
1041        let pool = sqlite_pool().await;
1042        sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
1043            .execute(&pool)
1044            .await
1045            .expect("create table");
1046        sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha')")
1047            .execute(&pool)
1048            .await
1049            .expect("seed rows");
1050
1051        let mut config = SqlEndpointConfig::from_uri(
1052            "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1053        )
1054        .unwrap();
1055        config.resolve_defaults();
1056        // Explicitly non-bridged: normal-data send path.
1057        config.bridge_error_handler = false;
1058        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1059        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1060
1061        // Downstream that returns Err — simulates unhandled route failure.
1062        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
1063        tokio::spawn(async move {
1064            while let Some(env) = rx.recv().await {
1065                if let Some(reply_tx) = env.reply_tx {
1066                    let _ = reply_tx.send(Err(CamelError::ProcessorError("boom".into())));
1067                }
1068            }
1069        });
1070        let ctx = ConsumerContext::new(tx, CancellationToken::new());
1071
1072        let _ = consumer.poll_database(&pool, &ctx, &template).await;
1073
1074        // The unbridged path MUST emit ERROR — consumer owns the signal.
1075        assert!(
1076            logs_contain("ERROR"),
1077            "unbridged send_and_wait failure MUST emit ERROR (consumer owns the signal)"
1078        );
1079    }
1080
1081    /// Regression for ADR-0012: when bridge_error_handler=false, the unbridged
1082    /// branch of handle_poll_result MUST emit ERROR for unhandled poll failure.
1083    #[tracing_test::traced_test]
1084    #[tokio::test]
1085    async fn unbridged_handle_poll_result_emits_error_loud() {
1086        let pool = sqlite_pool().await;
1087        // Do NOT create any table — fetch_all will fail in poll_database.
1088
1089        let mut config = config();
1090        config.bridge_error_handler = false;
1091        config.query = "select * from nonexistent_table".to_string();
1092        config.resolve_defaults();
1093        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1094        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1095
1096        // Healthy downstream task; should not be reached for this poll-failure path.
1097        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1098        tokio::spawn(async move {
1099            while let Some(env) = rx.recv().await {
1100                if let Some(reply_tx) = env.reply_tx {
1101                    let _ = reply_tx.send(Ok(env.exchange));
1102                }
1103            }
1104        });
1105        let ctx = ConsumerContext::new(tx, CancellationToken::new());
1106
1107        consumer.handle_poll_result(&pool, &ctx, &template).await;
1108
1109        assert!(
1110            logs_contain("ERROR"),
1111            "unbridged handle_poll_result failure MUST emit ERROR (consumer owns signal)"
1112        );
1113    }
1114
1115    #[tokio::test]
1116    async fn stream_list_consumer_emits_ndjson_body() {
1117        let pool = sqlite_pool().await;
1118        sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
1119            .execute(&pool)
1120            .await
1121            .expect("create table");
1122        sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')")
1123            .execute(&pool)
1124            .await
1125            .expect("seed rows");
1126
1127        let mut config = SqlEndpointConfig::from_uri(
1128            "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1129        )
1130        .unwrap();
1131        config.resolve_defaults();
1132
1133        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1134        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1135
1136        let (tx, rx) = mpsc::channel::<ExchangeEnvelope>(8);
1137        let (result_tx, result_rx) = tokio::sync::oneshot::channel::<Exchange>();
1138        tokio::spawn(async move {
1139            let mut rx = rx;
1140            if let Some(env) = rx.recv().await {
1141                if let Some(reply_tx) = env.reply_tx {
1142                    let _ = reply_tx.send(Ok(env.exchange.clone()));
1143                }
1144                let _ = result_tx.send(env.exchange);
1145            }
1146        });
1147        let ctx = ConsumerContext::new(tx, CancellationToken::new());
1148
1149        consumer
1150            .poll_database(&pool, &ctx, &template)
1151            .await
1152            .expect("poll must succeed");
1153
1154        let exchange = result_rx.await.expect("should have received one exchange");
1155
1156        match exchange.input.body {
1157            Body::Stream(ref stream_body) => {
1158                let stream = stream_body.stream.clone();
1159                let mut guard = stream.lock().await;
1160                let stream_opt = guard.take();
1161                assert!(stream_opt.is_some(), "stream should be present");
1162
1163                use futures::StreamExt;
1164                let mut collected = Vec::new();
1165                let mut stream = stream_opt.unwrap();
1166                while let Some(chunk) = stream.next().await {
1167                    let chunk = chunk.expect("stream chunk should not error");
1168                    collected.extend_from_slice(&chunk);
1169                }
1170
1171                let ndjson = String::from_utf8(collected).expect("valid utf8");
1172                let lines: Vec<&str> = ndjson.trim().lines().collect();
1173                assert_eq!(lines.len(), 3, "should have 3 NDJSON lines");
1174
1175                let row0: serde_json::Value =
1176                    serde_json::from_str(lines[0]).expect("valid json line 0");
1177                assert_eq!(row0["id"], 1);
1178                assert_eq!(row0["name"], "alpha");
1179
1180                let row1: serde_json::Value =
1181                    serde_json::from_str(lines[1]).expect("valid json line 1");
1182                assert_eq!(row1["id"], 2);
1183                assert_eq!(row1["name"], "beta");
1184
1185                let row2: serde_json::Value =
1186                    serde_json::from_str(lines[2]).expect("valid json line 2");
1187                assert_eq!(row2["id"], 3);
1188                assert_eq!(row2["name"], "gamma");
1189            }
1190            ref other => panic!("expected Body::Stream, got {:?}", other),
1191        }
1192    }
1193
1194    #[tokio::test]
1195    async fn stream_list_consumer_empty_result_set_emits_empty_stream() {
1196        let pool = sqlite_pool().await;
1197        sqlx::query("CREATE TABLE empty_items (id INTEGER PRIMARY KEY, name TEXT)")
1198            .execute(&pool)
1199            .await
1200            .expect("create table");
1201
1202        let mut config = SqlEndpointConfig::from_uri(
1203            "sql:select id, name from empty_items?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1204        )
1205        .unwrap();
1206        config.resolve_defaults();
1207
1208        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1209        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1210
1211        let (tx, rx) = tokio::sync::oneshot::channel();
1212        let (mpsc_tx, mut mpsc_rx) = mpsc::channel::<ExchangeEnvelope>(8);
1213        tokio::spawn(async move {
1214            while let Some(env) = mpsc_rx.recv().await {
1215                if let Some(reply_tx) = env.reply_tx {
1216                    let _ = reply_tx.send(Ok(env.exchange.clone()));
1217                }
1218                let _ = tx.send(env.exchange);
1219                break;
1220            }
1221        });
1222        let ctx = ConsumerContext::new(mpsc_tx, CancellationToken::new());
1223
1224        consumer
1225            .poll_database(&pool, &ctx, &template)
1226            .await
1227            .expect("poll must succeed");
1228
1229        let exchange = rx
1230            .await
1231            .expect("StreamList should emit exchange even for empty results");
1232
1233        match exchange.input.body {
1234            Body::Stream(ref stream_body) => {
1235                let stream = stream_body.stream.clone();
1236                let mut guard = stream.lock().await;
1237                let stream_opt = guard.take();
1238
1239                use futures::StreamExt;
1240                let mut count = 0;
1241                if let Some(mut stream) = stream_opt {
1242                    while let Some(chunk) = stream.next().await {
1243                        let chunk = chunk.expect("stream chunk should not error");
1244                        count += chunk.len();
1245                    }
1246                }
1247                assert_eq!(count, 0, "empty table should produce zero stream bytes");
1248            }
1249            ref other => panic!("expected Body::Stream, got {:?}", other),
1250        }
1251    }
1252
1253    /// Regression: max_attempts=N → exactly N invocations (caught OpenSearch off-by-one 1f5c4c2a).
1254    /// Replicates the exact retry loop from SqlConsumer::start() (consumer.rs:343-367):
1255    ///   attempt starts at 0, incremented at top, should_retry(attempt), delay_for(attempt-1)
1256    #[tokio::test]
1257    async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
1258        use camel_component_api::NetworkRetryPolicy;
1259        use std::sync::Arc;
1260        use std::sync::atomic::{AtomicU32, Ordering};
1261
1262        let policy = NetworkRetryPolicy {
1263            max_attempts: 3,
1264            initial_delay: Duration::from_millis(1),
1265            max_delay: Duration::from_millis(1),
1266            multiplier: 1.0,
1267            ..NetworkRetryPolicy::default()
1268        };
1269
1270        let calls = Arc::new(AtomicU32::new(0));
1271        let calls_clone = Arc::clone(&calls);
1272
1273        let mut attempt: u32 = 0;
1274        let _result: Result<(), ()> = loop {
1275            attempt += 1;
1276            calls_clone.fetch_add(1, Ordering::SeqCst);
1277            let op_result: Result<(), ()> = Err(());
1278            match op_result {
1279                Ok(v) => break Ok(v),
1280                Err(_) if policy.should_retry(attempt) => {
1281                    let delay = policy.delay_for(attempt - 1);
1282                    tokio::time::sleep(delay).await;
1283                    continue;
1284                }
1285                Err(_) => break Err(()),
1286            }
1287        };
1288
1289        assert_eq!(
1290            calls.load(Ordering::SeqCst),
1291            3,
1292            "max_attempts=3 must yield exactly 3 invocations"
1293        );
1294    }
1295}