Skip to main content

camel_component_sql/
consumer.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::Bytes;
6use camel_api::datasource::DatasourceCatalog;
7use futures::TryStreamExt;
8use serde_json::Value as JsonValue;
9use sqlx::AnyPool;
10use sqlx::any::AnyPoolOptions;
11use sqlx::any::AnyRow;
12use tokio::sync::OnceCell;
13use tracing::{debug, error, info, warn};
14
15use camel_component_api::retry_async;
16use camel_component_api::{
17    Body, CamelError, Exchange, Message, RuntimeObservability, StreamBody, StreamMetadata,
18};
19use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
20
21use crate::config::{
22    PollStrategy, ProcessingStrategy, SqlEndpointConfig, SqlOutputType, TransactionMode,
23    enrich_db_url_with_ssl, redact_db_url,
24};
25use crate::headers;
26use crate::query::{QueryTemplate, parse_query_template, resolve_params};
27use crate::utils::{bind_json_values, is_retryable_sqlx_error, row_to_json};
28
29/// Record a post-process (b′) failure for ADR-0012 outside-contract sites in this
30/// consumer. Increments the per-label error metric AND emits an `error!` log
31/// per ADR-0012 L57 + L70-72 (the metric is the operator signal; `error!`
32/// provides loud log visibility — b′ errors are NOT absorbed by route handlers).
33///
34/// Both the metric call and the `error!` live INSIDE this helper so that
35/// `lint-log-levels`'s `has_replacement_signal` (scripts/xtask/src/main.rs)
36/// sees both literals in the helper's function body. Call sites have NO
37/// `error!` of their own.
38///
39/// Regression-tested by:
40/// - `record_post_process_failure_increments_errors_and_emits_error_log` (helper unit)
41/// - `unbridged_send_and_wait_failure_emits_error_loud` (StreamList integration path)
42fn record_post_process_failure(
43    runtime: &dyn RuntimeObservability,
44    route_id: &str,
45    label: &str,
46    error: &CamelError,
47    message: &str,
48) {
49    runtime.metrics().increment_errors(route_id, label);
50    // log-policy: outside-contract
51    error!(error = %error, "{message}");
52}
53
54pub struct SqlConsumer {
55    pub(crate) config: SqlEndpointConfig,
56    pub(crate) pool: Arc<OnceCell<Arc<AnyPool>>>,
57    pub(crate) catalog: Option<Arc<dyn DatasourceCatalog>>,
58    stopped: bool,
59    /// Runtime observability for metrics and health — used by the
60    /// `record_post_process_failure` helper for ADR-0012 (b′) metric calls.
61    runtime: Arc<dyn RuntimeObservability>,
62}
63
64impl SqlConsumer {
65    pub fn new(
66        config: SqlEndpointConfig,
67        pool: Arc<OnceCell<Arc<AnyPool>>>,
68        catalog: Option<Arc<dyn DatasourceCatalog>>,
69        runtime: Arc<dyn RuntimeObservability>,
70    ) -> Self {
71        Self {
72            config,
73            pool,
74            catalog,
75            stopped: false,
76            runtime,
77        }
78    }
79
80    /// Poll the database for new rows and process them.
81    async fn poll_database(
82        &self,
83        pool: &AnyPool,
84        context: &ConsumerContext,
85        template: &QueryTemplate,
86    ) -> Result<(), CamelError> {
87        // Capture route_id from ConsumerContext for ADR-0012 metrics
88        let route_id = context.route_id();
89
90        // Create an empty exchange for parameter resolution (consumer has no input)
91        let empty_exchange = Exchange::new(Message::default());
92
93        // Resolve parameters
94        let prepared = resolve_params(template, &empty_exchange, &self.config.in_separator)?;
95
96        debug!(query = %prepared.sql, "executing SQL consumer poll");
97
98        if self.config.output_type == SqlOutputType::StreamList {
99            return self.poll_database_stream(pool, context, &prepared).await;
100        }
101
102        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
103        let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
104            warn!(error = %e, "SQL consumer poll query failed");
105            CamelError::ProcessorError(format!("Query execution failed: {}", e))
106        })?;
107
108        debug!(rows = rows.len(), "SQL consumer poll completed");
109
110        if rows.is_empty() && !self.config.route_empty_result_set {
111            return Ok(());
112        }
113
114        let rows_to_process: Vec<AnyRow> = if let Some(max) = self.config.max_messages_per_poll {
115            if max > 0 {
116                rows.into_iter().take(max as usize).collect()
117            } else {
118                rows
119            }
120        } else {
121            rows
122        };
123
124        if self.config.use_iterator {
125            // Process each row individually
126            for row in rows_to_process {
127                let row_json = row_to_json(&row)?;
128
129                // Create exchange with the row as JSON body
130                let mut msg = Message::new(Body::Json(row_json.clone()));
131
132                // Set individual column headers with CamelSql. prefix per Apache Camel convention
133                if let Some(obj) = row_json.as_object() {
134                    for (key, value) in obj {
135                        msg.set_header(format!("CamelSql.{}", key), value.clone());
136                    }
137                }
138
139                let exchange = Exchange::new(msg);
140
141                // Send and wait for processing
142                let result = context.send_and_wait(exchange).await;
143
144                // Handle post-processing (onConsume/onConsumeFailed)
145                if let Err(e) = self.handle_post_processing(pool, &result, &row_json).await {
146                    record_post_process_failure(
147                        self.runtime.as_ref(),
148                        route_id,
149                        "b-prime:sql:on-consume",
150                        &e,
151                        "Post-processing failed",
152                    );
153                    if self.config.break_batch_on_consume_fail {
154                        return Err(e);
155                    }
156                }
157
158                // If downstream processing itself failed, honour break_batch_on_consume_fail
159                if let Err(ref consume_err) = result
160                    && self.config.break_batch_on_consume_fail
161                {
162                    return Err(consume_err.clone());
163                }
164            }
165        } else {
166            // Process all rows as a single batch
167            let rows_json: Vec<JsonValue> = rows_to_process
168                .iter()
169                .map(row_to_json)
170                .collect::<Result<Vec<_>, CamelError>>()?;
171
172            let row_count = rows_json.len();
173
174            // Create exchange with array of rows
175            let mut msg = Message::new(Body::Json(JsonValue::Array(rows_json.clone())));
176            msg.set_header(headers::ROW_COUNT, JsonValue::Number(row_count.into()));
177
178            let exchange = Exchange::new(msg);
179
180            // Send and wait for result
181            let result = context.send_and_wait(exchange).await;
182
183            // SQL-021: Run per-row post-processing even in batch mode so that
184            // onConsume/onConsumeFailed queries can reference row-specific parameters
185            // (e.g. `:#id`). Each row gets its own post-processing query execution.
186            for row_json in rows_json.iter() {
187                if let Err(e) = self.handle_post_processing(pool, &result, row_json).await {
188                    record_post_process_failure(
189                        self.runtime.as_ref(),
190                        route_id,
191                        "b-prime:sql:on-consume-batch",
192                        &e,
193                        "Post-processing failed for batch row",
194                    );
195                    if self.config.break_batch_on_consume_fail {
196                        return Err(e);
197                    }
198                }
199            }
200
201            // If downstream processing itself failed, honour break_batch_on_consume_fail
202            if let Err(ref consume_err) = result
203                && self.config.break_batch_on_consume_fail
204            {
205                return Err(consume_err.clone());
206            }
207        }
208
209        // Execute on_consume_batch_complete if configured
210        if let Some(ref batch_query) = self.config.on_consume_batch_complete {
211            let _ = self
212                .execute_post_query(pool, batch_query, &JsonValue::Null)
213                .await;
214        }
215
216        Ok(())
217    }
218
219    async fn poll_database_stream(
220        &self,
221        pool: &AnyPool,
222        context: &ConsumerContext,
223        prepared: &crate::query::PreparedQuery,
224    ) -> Result<(), CamelError> {
225        let pool_clone = pool.clone();
226        let sql_str = prepared.sql.clone();
227        let bindings = prepared.bindings.clone();
228
229        let byte_stream = async_stream::try_stream! {
230            let mut q = sqlx::query(&sql_str);
231            q = bind_json_values(q, &bindings);
232            let mut rows = q.fetch(&pool_clone);
233            while let Some(row) = rows.try_next().await.map_err(|e| {
234                CamelError::ProcessorError(format!("Query execution failed: {}", e))
235            })? {
236                let json_val = row_to_json(&row).map_err(|e| {
237                    CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
238                })?;
239                let mut bytes = serde_json::to_vec(&json_val)
240                    .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
241                bytes.push(b'\n');
242                yield Bytes::from(bytes);
243            }
244        };
245
246        let msg = Message::new(Body::Stream(StreamBody {
247            stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
248            metadata: StreamMetadata {
249                content_type: Some("application/x-ndjson".to_string()),
250                size_hint: None,
251                origin: None,
252            },
253        }));
254
255        let exchange = Exchange::new(msg);
256        let result = context.send_and_wait(exchange).await;
257        if let Err(e) = result {
258            record_post_process_failure(
259                self.runtime.as_ref(),
260                context.route_id(),
261                "b-prime:sql:stream-list",
262                &e,
263                "StreamList consumer downstream processing failed",
264            );
265            return Err(e);
266        }
267
268        debug!("StreamList: consumer poll completed (lazy stream emitted)");
269        Ok(())
270    }
271
272    /// Handle post-processing after a row is processed (onConsume/onConsumeFailed).
273    async fn handle_post_processing(
274        &self,
275        pool: &AnyPool,
276        result: &Result<Exchange, CamelError>,
277        row_json: &JsonValue,
278    ) -> Result<(), CamelError> {
279        match result {
280            Ok(_) => {
281                // Success - execute onConsume if configured
282                if let Some(ref on_consume) = self.config.on_consume {
283                    self.execute_post_query(pool, on_consume, row_json).await?;
284                }
285            }
286            Err(_) => {
287                // Failure - execute onConsumeFailed if configured
288                if let Some(ref on_consume_failed) = self.config.on_consume_failed {
289                    self.execute_post_query(pool, on_consume_failed, row_json)
290                        .await?;
291                }
292            }
293        }
294        Ok(())
295    }
296
297    /// Execute a post-processing query with the row data as parameters.
298    async fn execute_post_query(
299        &self,
300        pool: &AnyPool,
301        query_str: &str,
302        row_json: &JsonValue,
303    ) -> Result<(), CamelError> {
304        // Parse the query template
305        let template = parse_query_template(query_str, self.config.placeholder)?;
306
307        // Create a temporary exchange with the row as body for parameter resolution
308        // Populate CamelSql.* headers so named params can reference them
309        let mut temp_msg = Message::new(Body::Json(row_json.clone()));
310        if let Some(obj) = row_json.as_object() {
311            for (key, value) in obj {
312                temp_msg.set_header(format!("CamelSql.{}", key), value.clone());
313            }
314        }
315        let temp_exchange = Exchange::new(temp_msg);
316
317        // Resolve parameters
318        let prepared = resolve_params(&template, &temp_exchange, &self.config.in_separator)?;
319
320        // Build and execute the query
321        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
322        let result = query.execute(pool).await.map_err(|e| {
323            CamelError::ProcessorError(format!("Post-query execution failed: {}", e))
324        })?;
325
326        // Warn if 0 rows affected (the row may not have been marked correctly)
327        if result.rows_affected() == 0 {
328            warn!(
329                query = query_str,
330                "Post-processing query affected 0 rows — the row may not have been marked correctly"
331            );
332        }
333
334        Ok(())
335    }
336
337    /// Handle the result of a single poll cycle, including bridging if configured.
338    /// Extracted from `run()` so tests can exercise the error-handling branch directly.
339    async fn handle_poll_result(
340        &self,
341        pool: &AnyPool,
342        context: &ConsumerContext,
343        template: &QueryTemplate,
344    ) {
345        if let Err(e) = self.poll_database(pool, context, template).await {
346            if self.config.bridge_error_handler {
347                // log-policy: handler-owned
348                // (category b-bridged: error will be wrapped as Exchange
349                // and flow into the route's error handler)
350                warn!(error = %e, "SQL consumer poll failed (bridged)");
351                if let Err(route_err) = self.bridge_poll_error(context, e).await {
352                    // (the bridge channel itself broke — route will CrashNotification per ADR-0007)
353                    // log-policy: system-broken
354                    error!(error = %route_err, "Failed to bridge SQL consumer error to route");
355                }
356            } else {
357                record_post_process_failure(
358                    self.runtime.as_ref(),
359                    context.route_id(),
360                    "b-prime:sql:poll-failed",
361                    &e,
362                    "SQL consumer poll failed",
363                );
364            }
365        }
366    }
367
368    async fn bridge_poll_error(
369        &self,
370        context: &ConsumerContext,
371        error: CamelError,
372    ) -> Result<(), CamelError> {
373        if !self.config.bridge_error_handler {
374            return Ok(());
375        }
376        let mut exchange = Exchange::new(Message::default());
377        exchange.set_error(error);
378        context.send_and_wait(exchange).await.map(|_| ())
379    }
380}
381
382#[async_trait]
383impl Consumer for SqlConsumer {
384    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
385        // Reject double-start
386        if self.stopped {
387            return Err(CamelError::Config(
388                "SQL consumer cannot be restarted after stop".into(),
389            ));
390        }
391
392        // Step 1: Initialize the connection pool
393        let route_id = context.route_id().to_string();
394        let catalog = self.catalog.clone();
395        let ds_name = self.config.datasource_name.clone();
396
397        // SQL-014: resolve file-based query before pool init, regardless of pool source
398        self.config.resolve_defaults();
399        self.config.resolve_file_query().await?;
400
401        let pool = self
402            .pool
403            .get_or_try_init(|| async {
404                // Catalog path: resolve shared pool from the datasource catalog
405                if let (Some(ref cat), Some(ref name)) = (catalog, ds_name) {
406                    let handle = cat.get_pool(name).await?;
407                    return handle.downcast::<AnyPool>();
408                }
409
410                // Install all compiled-in sqlx drivers so AnyPool can resolve them.
411                // This is idempotent; safe to call multiple times.
412                sqlx::any::install_default_drivers();
413                let db_url = enrich_db_url_with_ssl(&self.config.db_url, &self.config)?;
414
415                let max_conn = self.config.max_connections.ok_or_else(|| {
416                    CamelError::Config("max_connections not resolved for SQL consumer pool".into())
417                })?;
418                let min_conn = self.config.min_connections.ok_or_else(|| {
419                    CamelError::Config("min_connections not resolved for SQL consumer pool".into())
420                })?;
421                let idle_timeout = self.config.idle_timeout_secs.ok_or_else(|| {
422                    CamelError::Config(
423                        "idle_timeout_secs not resolved for SQL consumer pool".into(),
424                    )
425                })?;
426                let max_lifetime = self.config.max_lifetime_secs.ok_or_else(|| {
427                    CamelError::Config(
428                        "max_lifetime_secs not resolved for SQL consumer pool".into(),
429                    )
430                })?;
431
432                info!(
433                    db_url = %redact_db_url(&self.config.db_url),
434                    "SQL consumer pool initializing"
435                );
436                let retry_policy = &self.config.retry;
437                let pool = retry_async::<_, _, _, _, sqlx::Error>(
438                    retry_policy,
439                    Some("sql-consumer"),
440                    || {
441                        async {
442                            AnyPoolOptions::new()
443                                .max_connections(max_conn)
444                                .min_connections(min_conn)
445                                .idle_timeout(Duration::from_secs(idle_timeout))
446                                .max_lifetime(Duration::from_secs(max_lifetime))
447                                .connect(&db_url)
448                                .await
449                        }
450                    },
451                    is_retryable_sqlx_error,
452                )
453                .await
454                .map_err(|e| {
455                    self.runtime.health().force_unhealthy_for_route(
456                        &route_id,
457                        "g:sql:consumer-pool-init",
458                        &e.to_string(),
459                    );
460                    // log-policy: outside-contract
461                    error!(error = %e, db_url = %redact_db_url(&self.config.db_url), "SQL connect failed, giving up");
462                    CamelError::EndpointCreationFailed(format!(
463                        "Failed to connect to database: {}",
464                        e
465                    ))
466                })?;
467                Ok(Arc::new(pool))
468            })
469            .await?;
470
471        // SQL-002: warn if Managed transaction mode requested
472        if self.config.transaction_mode == TransactionMode::Managed {
473            warn!("transactionManager not yet implemented; using Auto mode");
474        }
475
476        // SQL-017/SQL-018: log processing and poll strategies
477        if self.config.processing_strategy == ProcessingStrategy::Scheduled {
478            debug!(
479                "Processing strategy: Scheduled (rows dispatched individually via send_and_wait)"
480            );
481        }
482        if self.config.poll_strategy == PollStrategy::Burst {
483            debug!("Poll strategy: Burst (rapid successive polls)");
484        }
485
486        if self.config.output_type == SqlOutputType::StreamList
487            && (self.config.on_consume.is_some()
488                || self.config.on_consume_failed.is_some()
489                || self.config.on_consume_batch_complete.is_some())
490        {
491            warn!(
492                "onConsume/onConsumeFailed/onConsumeBatchComplete are not executed in StreamList mode \
493                 (rows are consumed lazily downstream)"
494            );
495        }
496
497        // Warn if no onConsume configured
498        if self.config.on_consume.is_none() {
499            warn!(
500                "SQL consumer started without onConsume configured — consumed rows will not be marked/deleted"
501            );
502        }
503
504        info!(
505            db_url = %redact_db_url(&self.config.db_url),
506            query_len = self.config.query.len(),
507            "SQL consumer started"
508        );
509
510        // Step 2: Parse query template once (avoid re-parsing every poll)
511        let template = parse_query_template(&self.config.query, self.config.placeholder)
512            .map_err(|e| CamelError::Config(format!("Invalid query template: {}", e)))?;
513
514        // Step 3: Initial delay before starting polling
515        if self.config.initial_delay_ms > 0 {
516            tokio::select! {
517                _ = context.cancelled() => {
518                    info!("SQL consumer stopped during initial delay");
519                    return Ok(());
520                }
521                _ = tokio::time::sleep(Duration::from_millis(self.config.initial_delay_ms)) => {}
522            }
523        }
524
525        // Step 4: Polling loop
526        //
527        // This is a POLLING LOOP with fixed cadence (delay_ms), NOT a
528        // retry loop. It polls the database until cancelled or repeat_count
529        // is reached — there is no "transient error → retry with backoff"
530        // contract at this level. retry_async / retry_async_cancelable do
531        // not apply because they are designed for bounded retry, not
532        // repeated polling with uniform delay.
533        //
534        // The pool-connect retry at startup (Step 1) was migrated to
535        // retry_async in rc-d2r. The per-poll error handling (poll_database
536        // failures) is an error-bridge pattern, not a retry loop.
537        //
538        // See camel-redis/src/consumer.rs:325 for a similar polling-loop
539        // justification.
540        let mut poll_count: u32 = 0;
541        loop {
542            // SQL-015: check repeat_count limit
543            if let Some(max_repeats) = self.config.repeat_count
544                && poll_count >= max_repeats
545            {
546                info!(
547                    repeat_count = max_repeats,
548                    "SQL consumer reached repeat_count limit, stopping"
549                );
550                break;
551            }
552
553            tokio::select! {
554                _ = context.cancelled() => {
555                    info!("SQL consumer stopped");
556                    break;
557                }
558                _ = tokio::time::sleep(Duration::from_millis(self.config.delay_ms)) => {
559                    poll_count += 1;
560                    self.handle_poll_result(pool.as_ref(), &context, &template).await;
561                }
562            }
563        }
564
565        Ok(())
566    }
567
568    async fn stop(&mut self) -> Result<(), CamelError> {
569        // Double-stop is safe — no-op after first stop
570        if self.stopped {
571            debug!("SQL consumer stop called on already-stopped consumer");
572            return Ok(());
573        }
574
575        // Close the connection pool if it was initialized
576        if let Some(pool) = self.pool.get() {
577            debug!("SQL consumer closing connection pool");
578            pool.close().await;
579            debug!("SQL consumer pool closed");
580        }
581
582        self.stopped = true;
583        info!("SQL consumer stopped");
584        Ok(())
585    }
586
587    fn concurrency_model(&self) -> ConcurrencyModel {
588        // Sequential is correct for SQL consumers: concurrent polls would fetch
589        // duplicate rows. The design doc mentioned SharedState (which doesn't exist
590        // in this runtime) — Sequential is the correct equivalent.
591        ConcurrencyModel::Sequential
592    }
593}
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598    use camel_api::MetricsCollector;
599    use camel_component_api::HealthCheckRegistry;
600    use camel_component_api::test_support::PanicRuntimeObservability;
601    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
602        std::sync::Arc::new(PanicRuntimeObservability)
603    }
604    use crate::config::SqlEndpointConfig;
605    use camel_component_api::ExchangeEnvelope;
606    use camel_component_api::UriConfig;
607    use sqlx::any::AnyPoolOptions;
608    use std::sync::Arc;
609    use std::sync::Mutex;
610    use std::time::Duration;
611    use tokio::sync::mpsc;
612    use tokio_util::sync::CancellationToken;
613
614    // -----------------------------------------------------------------------
615    // Recording metrics collector for testing increment_errors calls
616    // -----------------------------------------------------------------------
617
618    struct RecordingMetrics {
619        errors: Arc<Mutex<Vec<(String, String)>>>,
620    }
621
622    impl MetricsCollector for RecordingMetrics {
623        fn record_exchange_duration(&self, _: &str, _: Duration) {}
624        fn increment_errors(&self, route_id: &str, error_type: &str) {
625            self.errors
626                .lock()
627                .unwrap()
628                .push((route_id.to_string(), error_type.to_string()));
629        }
630        fn increment_exchanges(&self, _: &str) {}
631        fn set_queue_depth(&self, _: &str, _: usize) {}
632        fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
633    }
634
635    struct RecordingRuntime {
636        metrics_collector: Arc<RecordingMetrics>,
637    }
638
639    impl RecordingRuntime {
640        fn new(errors: Arc<Mutex<Vec<(String, String)>>>) -> Self {
641            Self {
642                metrics_collector: Arc::new(RecordingMetrics { errors }),
643            }
644        }
645    }
646
647    impl RuntimeObservability for RecordingRuntime {
648        fn metrics(&self) -> Arc<dyn MetricsCollector> {
649            self.metrics_collector.clone() as Arc<dyn MetricsCollector>
650        }
651        fn health(&self) -> Arc<dyn HealthCheckRegistry> {
652            panic!("RecordingRuntime::health not used in this test")
653        }
654    }
655
656    /// Regression test for ADR-0012: the record_post_process_failure helper
657    /// must increment the error metric with the correct route_id and label,
658    /// AND emit error! via tracing.
659    #[tracing_test::traced_test]
660    #[test]
661    fn record_post_process_failure_increments_errors_and_emits_error_log() {
662        let errors: Arc<Mutex<Vec<(String, String)>>> = Arc::new(Mutex::new(Vec::new()));
663        let runtime = Arc::new(RecordingRuntime::new(Arc::clone(&errors)));
664        let error = CamelError::ProcessorError("test failure".to_string());
665
666        // Directly invoke the helper
667        record_post_process_failure(
668            runtime.as_ref(),
669            "test-route",
670            "b-prime:sql:on-consume",
671            &error,
672            "Post-processing failed",
673        );
674
675        // Verify MetricsCollector::increment_errors was called
676        let recorded = errors.lock().unwrap();
677        assert_eq!(recorded.len(), 1, "expected 1 increment_errors call");
678        assert_eq!(recorded[0].0, "test-route");
679        assert_eq!(recorded[0].1, "b-prime:sql:on-consume");
680        drop(recorded);
681
682        // Verify error! was emitted
683        assert!(logs_contain("ERROR"), "helper must emit error! log");
684        assert!(
685            logs_contain("Post-processing failed"),
686            "helper must include the message in the log"
687        );
688    }
689
690    async fn sqlite_pool() -> AnyPool {
691        sqlx::any::install_default_drivers();
692        AnyPoolOptions::new()
693            .max_connections(1)
694            .connect("sqlite::memory:")
695            .await
696            .expect("sqlite pool")
697    }
698
699    async fn seed_consumer_table(pool: &AnyPool) {
700        sqlx::query("CREATE TABLE jobs (id INTEGER PRIMARY KEY, processed INTEGER DEFAULT 0, failed INTEGER DEFAULT 0)")
701            .execute(pool)
702            .await
703            .expect("create table");
704        sqlx::query("INSERT INTO jobs (id, processed, failed) VALUES (1, 0, 0), (2, 0, 0)")
705            .execute(pool)
706            .await
707            .expect("seed rows");
708    }
709
710    fn config() -> SqlEndpointConfig {
711        let mut c =
712            SqlEndpointConfig::from_uri("sql:select * from t?db_url=postgres://localhost/test")
713                .unwrap();
714        c.resolve_defaults();
715        c
716    }
717
718    #[test]
719    fn consumer_concurrency_model() {
720        let c = SqlConsumer::new(config(), Arc::new(OnceCell::new()), None, test_rt());
721        assert_eq!(c.concurrency_model(), ConcurrencyModel::Sequential);
722    }
723
724    #[test]
725    fn consumer_stores_config() {
726        let mut config = SqlEndpointConfig::from_uri(
727            "sql:select * from t?db_url=postgres://localhost/test&delay=2000&onConsume=update t set done=true"
728        ).unwrap();
729        config.resolve_defaults();
730        let c = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
731        assert_eq!(c.config.delay_ms, 2000);
732        assert!(c.config.on_consume.is_some());
733    }
734
735    #[tokio::test]
736    async fn poll_database_runs_on_consume_for_successful_rows() {
737        let pool = sqlite_pool().await;
738        seed_consumer_table(&pool).await;
739
740        let mut config = SqlEndpointConfig::from_uri(
741            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
742        )
743        .unwrap();
744        config.resolve_defaults();
745
746        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
747        let template = parse_query_template(&config.query, config.placeholder).unwrap();
748
749        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
750        tokio::spawn(async move {
751            while let Some(env) = rx.recv().await {
752                if let Some(reply_tx) = env.reply_tx {
753                    let _ = reply_tx.send(Ok(env.exchange));
754                }
755            }
756        });
757        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
758
759        consumer
760            .poll_database(&pool, &ctx, &template)
761            .await
762            .expect("poll must succeed");
763
764        let row = sqlx::query("select processed from jobs where id = 1")
765            .fetch_one(&pool)
766            .await
767            .expect("row 1");
768        let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
769
770        let row = sqlx::query("select processed from jobs where id = 2")
771            .fetch_one(&pool)
772            .await
773            .expect("row 2");
774        let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
775
776        assert_eq!(processed_1, 1);
777        assert_eq!(processed_2, 1);
778    }
779
780    #[tokio::test]
781    async fn poll_database_runs_on_consume_failed_when_downstream_fails() {
782        let pool = sqlite_pool().await;
783        seed_consumer_table(&pool).await;
784
785        let mut config = SqlEndpointConfig::from_uri(
786            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&initialDelay=0&delay=1",
787        )
788        .unwrap();
789        config.resolve_defaults();
790
791        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
792        let template = parse_query_template(&config.query, config.placeholder).unwrap();
793
794        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
795        tokio::spawn(async move {
796            while let Some(env) = rx.recv().await {
797                if let Some(reply_tx) = env.reply_tx {
798                    let _ =
799                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
800                }
801            }
802        });
803        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
804
805        consumer
806            .poll_database(&pool, &ctx, &template)
807            .await
808            .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
809
810        let row = sqlx::query("select failed from jobs where id = 1")
811            .fetch_one(&pool)
812            .await
813            .expect("row 1");
814        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
815
816        let row = sqlx::query("select failed from jobs where id = 2")
817            .fetch_one(&pool)
818            .await
819            .expect("row 2");
820        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
821
822        assert_eq!(failed_1, 1);
823        assert_eq!(failed_2, 1);
824    }
825
826    #[tokio::test]
827    async fn poll_database_breaks_batch_on_consume_fail() {
828        let pool = sqlite_pool().await;
829        seed_consumer_table(&pool).await;
830
831        let mut config = SqlEndpointConfig::from_uri(
832            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&breakBatchOnConsumeFail=true&initialDelay=0&delay=1",
833        )
834        .unwrap();
835        config.resolve_defaults();
836
837        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
838        let template = parse_query_template(&config.query, config.placeholder).unwrap();
839
840        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
841        tokio::spawn(async move {
842            while let Some(env) = rx.recv().await {
843                if let Some(reply_tx) = env.reply_tx {
844                    let _ =
845                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
846                }
847            }
848        });
849        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
850
851        let err = consumer
852            .poll_database(&pool, &ctx, &template)
853            .await
854            .expect_err("must stop on first downstream failure");
855        assert!(err.to_string().contains("downstream boom"));
856
857        let row = sqlx::query("select failed from jobs where id = 1")
858            .fetch_one(&pool)
859            .await
860            .expect("row 1");
861        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
862
863        let row = sqlx::query("select failed from jobs where id = 2")
864            .fetch_one(&pool)
865            .await
866            .expect("row 2");
867        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
868
869        assert_eq!(failed_1, 1);
870        assert_eq!(failed_2, 0, "second row must not be processed");
871    }
872
873    // --- Phase B hardening tests ---
874
875    // SQL-001: Direct consumer construction without resolve_defaults does not panic.
876    // The consumer defensively calls resolve_defaults() during pool init, so the pool
877    // fields get resolved. This test verifies no panic occurs.
878    #[tokio::test]
879    async fn consumer_no_panic_without_prior_resolve_defaults() {
880        let config = SqlEndpointConfig::from_uri(
881            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
882        )
883        .unwrap();
884        // Deliberately NOT calling resolve_defaults() — pool fields remain None
885        assert!(config.max_connections.is_none());
886
887        let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), None, test_rt());
888        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
889        tokio::spawn(async move {
890            while let Some(env) = rx.recv().await {
891                if let Some(reply_tx) = env.reply_tx {
892                    let _ = reply_tx.send(Ok(env.exchange));
893                }
894            }
895        });
896        let token = CancellationToken::new();
897        let ctx = ConsumerContext::new(tx, token.clone(), "sql-test-route".to_string());
898
899        // Spawn the consumer and cancel it quickly — it should not panic
900        let consumer_handle = tokio::spawn(async move { consumer.start(ctx).await });
901
902        // Cancel after a short delay
903        tokio::time::sleep(Duration::from_millis(50)).await;
904        token.cancel();
905
906        let result = consumer_handle.await.expect("task should not panic");
907        // Should complete without panic (may be Ok or Err depending on timing)
908        let _ = result;
909    }
910
911    // SQL-008: stop() closes the pool
912    #[tokio::test]
913    async fn stop_closes_pool() {
914        let pool = sqlite_pool().await;
915        seed_consumer_table(&pool).await;
916
917        let mut config = SqlEndpointConfig::from_uri(
918            "sql:select id from jobs?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
919        )
920        .unwrap();
921        config.resolve_defaults();
922
923        let pool_cell = Arc::new(OnceCell::new());
924        pool_cell.set(Arc::new(pool.clone())).unwrap();
925
926        let mut consumer = SqlConsumer::new(config, pool_cell, None, test_rt());
927        consumer.stop().await.expect("stop should succeed");
928
929        // After stop, the pool should be closed
930        assert!(
931            pool.is_closed(),
932            "Pool should be closed after consumer.stop()"
933        );
934    }
935
936    // SQL-008: double-stop is safe
937    #[tokio::test]
938    async fn double_stop_is_safe() {
939        let pool = sqlite_pool().await;
940        let mut config = SqlEndpointConfig::from_uri(
941            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
942        )
943        .unwrap();
944        config.resolve_defaults();
945
946        let pool_cell = Arc::new(OnceCell::new());
947        pool_cell.set(Arc::new(pool.clone())).unwrap();
948
949        let mut consumer = SqlConsumer::new(config, pool_cell, None, test_rt());
950        consumer.stop().await.expect("first stop should succeed");
951        consumer
952            .stop()
953            .await
954            .expect("second stop should also succeed");
955    }
956
957    // SQL-008: start after stop is rejected
958    #[tokio::test]
959    async fn start_after_stop_rejected() {
960        let pool = sqlite_pool().await;
961        let mut config = SqlEndpointConfig::from_uri(
962            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
963        )
964        .unwrap();
965        config.resolve_defaults();
966
967        let pool_cell = Arc::new(OnceCell::new());
968        pool_cell.set(Arc::new(pool.clone())).unwrap();
969
970        let mut consumer = SqlConsumer::new(config, pool_cell, None, test_rt());
971        consumer.stop().await.expect("stop should succeed");
972
973        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
974        tokio::spawn(async move {
975            while let Some(env) = rx.recv().await {
976                if let Some(reply_tx) = env.reply_tx {
977                    let _ = reply_tx.send(Ok(env.exchange));
978                }
979            }
980        });
981        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
982
983        let result = consumer.start(ctx).await;
984        assert!(result.is_err());
985        let err_msg = result.unwrap_err().to_string();
986        assert!(
987            err_msg.contains("cannot be restarted") || err_msg.contains("after stop"),
988            "Expected restart error, got: {}",
989            err_msg
990        );
991    }
992
993    // SQL-021: batch mode per-row post-processing
994    #[tokio::test]
995    async fn batch_mode_per_row_post_processing() {
996        let pool = sqlite_pool().await;
997        seed_consumer_table(&pool).await;
998
999        let mut config = SqlEndpointConfig::from_uri(
1000            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
1001        )
1002        .unwrap();
1003        config.resolve_defaults();
1004
1005        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
1006        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1007
1008        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
1009        tokio::spawn(async move {
1010            while let Some(env) = rx.recv().await {
1011                if let Some(reply_tx) = env.reply_tx {
1012                    let _ = reply_tx.send(Ok(env.exchange));
1013                }
1014            }
1015        });
1016        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1017
1018        consumer
1019            .poll_database(&pool, &ctx, &template)
1020            .await
1021            .expect("poll must succeed");
1022
1023        // SQL-021: Each row should have been processed individually via onConsume
1024        let row = sqlx::query("select processed from jobs where id = 1")
1025            .fetch_one(&pool)
1026            .await
1027            .expect("row 1");
1028        let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
1029
1030        let row = sqlx::query("select processed from jobs where id = 2")
1031            .fetch_one(&pool)
1032            .await
1033            .expect("row 2");
1034        let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
1035
1036        assert_eq!(
1037            processed_1, 1,
1038            "row 1 should be marked processed via per-row onConsume"
1039        );
1040        assert_eq!(
1041            processed_2, 1,
1042            "row 2 should be marked processed via per-row onConsume"
1043        );
1044    }
1045
1046    // SQL-021: batch mode per-row onConsumeFailed when downstream fails
1047    #[tokio::test]
1048    async fn batch_mode_per_row_post_processing_on_failure() {
1049        let pool = sqlite_pool().await;
1050        seed_consumer_table(&pool).await;
1051
1052        let mut config = SqlEndpointConfig::from_uri(
1053            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
1054        )
1055        .unwrap();
1056        config.resolve_defaults();
1057
1058        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
1059        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1060
1061        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
1062        tokio::spawn(async move {
1063            while let Some(env) = rx.recv().await {
1064                if let Some(reply_tx) = env.reply_tx {
1065                    let _ =
1066                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
1067                }
1068            }
1069        });
1070        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1071
1072        consumer
1073            .poll_database(&pool, &ctx, &template)
1074            .await
1075            .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
1076
1077        // SQL-021: Each row should have onConsumeFailed executed individually
1078        let row = sqlx::query("select failed from jobs where id = 1")
1079            .fetch_one(&pool)
1080            .await
1081            .expect("row 1");
1082        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
1083
1084        let row = sqlx::query("select failed from jobs where id = 2")
1085            .fetch_one(&pool)
1086            .await
1087            .expect("row 2");
1088        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
1089
1090        assert_eq!(
1091            failed_1, 1,
1092            "row 1 should be marked failed via per-row onConsumeFailed"
1093        );
1094        assert_eq!(
1095            failed_2, 1,
1096            "row 2 should be marked failed via per-row onConsumeFailed"
1097        );
1098    }
1099
1100    #[tokio::test]
1101    async fn bridge_error_handler_routes_poll_errors_to_exchange_error() {
1102        let mut config = config();
1103        config.bridge_error_handler = true;
1104        let consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), None, test_rt());
1105
1106        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1107        tokio::spawn(async move {
1108            while let Some(env) = rx.recv().await {
1109                assert!(env.exchange.error.is_some(), "exchange must carry error");
1110                if let Some(reply_tx) = env.reply_tx {
1111                    let _ = reply_tx.send(Ok(env.exchange));
1112                }
1113                break;
1114            }
1115        });
1116
1117        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1118        consumer
1119            .bridge_poll_error(&ctx, CamelError::ProcessorError("poll failed".into()))
1120            .await
1121            .expect("bridging should succeed");
1122    }
1123
1124    /// Regression for ADR-0012: when bridge_error_handler=true, the poll
1125    /// failure must NOT emit error! (the route's error handler owns ERROR
1126    /// for bridged failures). Was previously duplicated at line 429 + 431.
1127    #[tracing_test::traced_test]
1128    #[tokio::test]
1129    async fn bridged_poll_failure_emits_warn_not_error() {
1130        let pool = sqlite_pool().await;
1131        // Do NOT create any table — the query against a non-existent
1132        // table will fail at fetch_all, returning Err BEFORE any
1133        // downstream send (so lines 103/205 are never reached).
1134
1135        let mut config = config();
1136        config.bridge_error_handler = true;
1137        // Query a non-existent table to trigger a query-failure poll error.
1138        config.query = "select * from nonexistent_table".to_string();
1139        config.resolve_defaults();
1140        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
1141        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1142
1143        // Healthy downstream — replies Ok so bridge_poll_error succeeds
1144        // and does NOT emit its own error!.
1145        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1146        tokio::spawn(async move {
1147            while let Some(env) = rx.recv().await {
1148                if let Some(reply_tx) = env.reply_tx {
1149                    let _ = reply_tx.send(Ok(env.exchange));
1150                }
1151            }
1152        });
1153        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1154
1155        // Drive poll — fetch_all will fail because the table is missing.
1156        consumer.handle_poll_result(&pool, &ctx, &template).await;
1157
1158        // The bridged path must NOT emit ERROR (handler owns it).
1159        assert!(
1160            !logs_contain("ERROR"),
1161            "bridged poll failure must not emit ERROR (handler owns it); check captured logs for stray ERROR lines"
1162        );
1163        // Sanity: warn! was emitted so the failure is still visible.
1164        assert!(
1165            logs_contain("WARN"),
1166            "bridged poll failure should emit warn! for operator visibility"
1167        );
1168    }
1169
1170    /// Regression for ADR-0012 "b-bridged discriminator": when
1171    /// send_and_wait returns Err on a NORMAL-DATA send (i.e., not a
1172    /// deliberate bridge_poll_error handoff), the route handler did NOT
1173    /// absorb the failure (consumer.rs:77-91 contract; error_handler.rs
1174    /// returns Ok in every branch). The consumer's error! is the only
1175    /// ERROR signal for the unhandled failure and MUST stay at error!.
1176    ///
1177    /// Protects consumer.rs:205 (StreamList downstream send) and any
1178    /// future site that uses send_and_wait on a non-bridge path.
1179    #[tracing_test::traced_test]
1180    #[tokio::test]
1181    async fn unbridged_send_and_wait_failure_emits_error_loud() {
1182        let pool = sqlite_pool().await;
1183        sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
1184            .execute(&pool)
1185            .await
1186            .expect("create table");
1187        sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha')")
1188            .execute(&pool)
1189            .await
1190            .expect("seed rows");
1191
1192        let mut config = SqlEndpointConfig::from_uri(
1193            "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1194        )
1195        .unwrap();
1196        config.resolve_defaults();
1197        // Explicitly non-bridged: normal-data send path.
1198        config.bridge_error_handler = false;
1199        let consumer = SqlConsumer::new(
1200            config.clone(),
1201            Arc::new(OnceCell::new()),
1202            None,
1203            Arc::new(RecordingRuntime::new(Arc::new(Mutex::new(Vec::new())))),
1204        );
1205        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1206
1207        // Downstream that returns Err — simulates unhandled route failure.
1208        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
1209        tokio::spawn(async move {
1210            while let Some(env) = rx.recv().await {
1211                if let Some(reply_tx) = env.reply_tx {
1212                    let _ = reply_tx.send(Err(CamelError::ProcessorError("boom".into())));
1213                }
1214            }
1215        });
1216        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1217
1218        let _ = consumer.poll_database(&pool, &ctx, &template).await;
1219
1220        // The unbridged path MUST emit ERROR — consumer owns the signal.
1221        assert!(
1222            logs_contain("ERROR"),
1223            "unbridged send_and_wait failure MUST emit ERROR (consumer owns the signal)"
1224        );
1225    }
1226
1227    /// Regression for ADR-0012: when bridge_error_handler=false, the unbridged
1228    /// branch of handle_poll_result MUST emit ERROR for unhandled poll failure.
1229    #[tracing_test::traced_test]
1230    #[tokio::test]
1231    async fn unbridged_handle_poll_result_emits_error_loud() {
1232        let pool = sqlite_pool().await;
1233        // Do NOT create any table — fetch_all will fail in poll_database.
1234
1235        let mut config = config();
1236        config.bridge_error_handler = false;
1237        config.query = "select * from nonexistent_table".to_string();
1238        config.resolve_defaults();
1239        let consumer = SqlConsumer::new(
1240            config.clone(),
1241            Arc::new(OnceCell::new()),
1242            None,
1243            Arc::new(RecordingRuntime::new(Arc::new(Mutex::new(Vec::new())))),
1244        );
1245        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1246
1247        // Healthy downstream task; should not be reached for this poll-failure path.
1248        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1249        tokio::spawn(async move {
1250            while let Some(env) = rx.recv().await {
1251                if let Some(reply_tx) = env.reply_tx {
1252                    let _ = reply_tx.send(Ok(env.exchange));
1253                }
1254            }
1255        });
1256        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1257
1258        consumer.handle_poll_result(&pool, &ctx, &template).await;
1259
1260        assert!(
1261            logs_contain("ERROR"),
1262            "unbridged handle_poll_result failure MUST emit ERROR (consumer owns signal)"
1263        );
1264    }
1265
1266    #[tokio::test]
1267    async fn stream_list_consumer_emits_ndjson_body() {
1268        let pool = sqlite_pool().await;
1269        sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
1270            .execute(&pool)
1271            .await
1272            .expect("create table");
1273        sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')")
1274            .execute(&pool)
1275            .await
1276            .expect("seed rows");
1277
1278        let mut config = SqlEndpointConfig::from_uri(
1279            "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1280        )
1281        .unwrap();
1282        config.resolve_defaults();
1283
1284        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
1285        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1286
1287        let (tx, rx) = mpsc::channel::<ExchangeEnvelope>(8);
1288        let (result_tx, result_rx) = tokio::sync::oneshot::channel::<Exchange>();
1289        tokio::spawn(async move {
1290            let mut rx = rx;
1291            if let Some(env) = rx.recv().await {
1292                if let Some(reply_tx) = env.reply_tx {
1293                    let _ = reply_tx.send(Ok(env.exchange.clone()));
1294                }
1295                let _ = result_tx.send(env.exchange);
1296            }
1297        });
1298        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1299
1300        consumer
1301            .poll_database(&pool, &ctx, &template)
1302            .await
1303            .expect("poll must succeed");
1304
1305        let exchange = result_rx.await.expect("should have received one exchange");
1306
1307        match exchange.input.body {
1308            Body::Stream(ref stream_body) => {
1309                let stream = stream_body.stream.clone();
1310                let mut guard = stream.lock().await;
1311                let stream_opt = guard.take();
1312                assert!(stream_opt.is_some(), "stream should be present");
1313
1314                use futures::StreamExt;
1315                let mut collected = Vec::new();
1316                let mut stream = stream_opt.unwrap();
1317                while let Some(chunk) = stream.next().await {
1318                    let chunk = chunk.expect("stream chunk should not error");
1319                    collected.extend_from_slice(&chunk);
1320                }
1321
1322                let ndjson = String::from_utf8(collected).expect("valid utf8");
1323                let lines: Vec<&str> = ndjson.trim().lines().collect();
1324                assert_eq!(lines.len(), 3, "should have 3 NDJSON lines");
1325
1326                let row0: serde_json::Value =
1327                    serde_json::from_str(lines[0]).expect("valid json line 0");
1328                assert_eq!(row0["id"], 1);
1329                assert_eq!(row0["name"], "alpha");
1330
1331                let row1: serde_json::Value =
1332                    serde_json::from_str(lines[1]).expect("valid json line 1");
1333                assert_eq!(row1["id"], 2);
1334                assert_eq!(row1["name"], "beta");
1335
1336                let row2: serde_json::Value =
1337                    serde_json::from_str(lines[2]).expect("valid json line 2");
1338                assert_eq!(row2["id"], 3);
1339                assert_eq!(row2["name"], "gamma");
1340            }
1341            ref other => panic!("expected Body::Stream, got {:?}", other),
1342        }
1343    }
1344
1345    #[tokio::test]
1346    async fn stream_list_consumer_empty_result_set_emits_empty_stream() {
1347        let pool = sqlite_pool().await;
1348        sqlx::query("CREATE TABLE empty_items (id INTEGER PRIMARY KEY, name TEXT)")
1349            .execute(&pool)
1350            .await
1351            .expect("create table");
1352
1353        let mut config = SqlEndpointConfig::from_uri(
1354            "sql:select id, name from empty_items?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1355        )
1356        .unwrap();
1357        config.resolve_defaults();
1358
1359        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), None, test_rt());
1360        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1361
1362        let (tx, rx) = tokio::sync::oneshot::channel();
1363        let (mpsc_tx, mut mpsc_rx) = mpsc::channel::<ExchangeEnvelope>(8);
1364        tokio::spawn(async move {
1365            while let Some(env) = mpsc_rx.recv().await {
1366                if let Some(reply_tx) = env.reply_tx {
1367                    let _ = reply_tx.send(Ok(env.exchange.clone()));
1368                }
1369                let _ = tx.send(env.exchange);
1370                break;
1371            }
1372        });
1373        let ctx = ConsumerContext::new(
1374            mpsc_tx,
1375            CancellationToken::new(),
1376            "sql-test-route".to_string(),
1377        );
1378
1379        consumer
1380            .poll_database(&pool, &ctx, &template)
1381            .await
1382            .expect("poll must succeed");
1383
1384        let exchange = rx
1385            .await
1386            .expect("StreamList should emit exchange even for empty results");
1387
1388        match exchange.input.body {
1389            Body::Stream(ref stream_body) => {
1390                let stream = stream_body.stream.clone();
1391                let mut guard = stream.lock().await;
1392                let stream_opt = guard.take();
1393
1394                use futures::StreamExt;
1395                let mut count = 0;
1396                if let Some(mut stream) = stream_opt {
1397                    while let Some(chunk) = stream.next().await {
1398                        let chunk = chunk.expect("stream chunk should not error");
1399                        count += chunk.len();
1400                    }
1401                }
1402                assert_eq!(count, 0, "empty table should produce zero stream bytes");
1403            }
1404            ref other => panic!("expected Body::Stream, got {:?}", other),
1405        }
1406    }
1407
1408    // ── ADR-0012 (g) regression tests ──────────────────────────────────
1409
1410    /// Fixture: captures `force_unhealthy_for_route` calls.
1411    #[derive(Debug, Default)]
1412    struct RecordingHealth {
1413        forced: Arc<Mutex<Vec<(String, String, String)>>>,
1414    }
1415
1416    impl HealthCheckRegistry for RecordingHealth {
1417        fn force_unhealthy_for_route(&self, route_id: &str, name: &str, reason: &str) {
1418            self.forced.lock().unwrap().push((
1419                route_id.to_string(),
1420                name.to_string(),
1421                reason.to_string(),
1422            ));
1423        }
1424    }
1425
1426    struct NoopMetricsForConsumer;
1427
1428    impl MetricsCollector for NoopMetricsForConsumer {
1429        fn record_exchange_duration(&self, _: &str, _: Duration) {}
1430        fn increment_errors(&self, _: &str, _: &str) {}
1431        fn increment_exchanges(&self, _: &str) {}
1432        fn set_queue_depth(&self, _: &str, _: usize) {}
1433        fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
1434    }
1435
1436    struct RecordingRuntimeWithHealth {
1437        health: Arc<RecordingHealth>,
1438    }
1439
1440    impl RuntimeObservability for RecordingRuntimeWithHealth {
1441        fn metrics(&self) -> Arc<dyn MetricsCollector> {
1442            Arc::new(NoopMetricsForConsumer)
1443        }
1444        fn health(&self) -> Arc<dyn HealthCheckRegistry> {
1445            self.health.clone()
1446        }
1447    }
1448
1449    /// Regression: consumer pool init failure calls force_unhealthy_for_route
1450    /// with correct route_id + name "g:sql:consumer-pool-init" + non-empty reason.
1451    #[tokio::test]
1452    async fn consumer_pool_init_failure_calls_force_unhealthy_for_route() {
1453        let health = Arc::new(RecordingHealth::default());
1454        let recorded_health = health.clone();
1455        let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntimeWithHealth { health });
1456
1457        let mut config = SqlEndpointConfig::from_uri(
1458            "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false&initialDelay=0&delay=1",
1459        )
1460        .unwrap();
1461        config.max_connections = Some(1);
1462        config.min_connections = Some(0);
1463        config.idle_timeout_secs = Some(300);
1464        config.max_lifetime_secs = Some(1800);
1465
1466        let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), None, rt);
1467
1468        let (tx, _rx) = mpsc::channel(8);
1469        let ctx = ConsumerContext::new(
1470            tx,
1471            CancellationToken::new(),
1472            "sql-consumer-test-route".to_string(),
1473        );
1474
1475        let result = consumer.start(ctx).await;
1476        assert!(result.is_err(), "pool init should fail with bad db_url");
1477
1478        let forced = recorded_health.forced.lock().unwrap();
1479        assert_eq!(
1480            forced.len(),
1481            1,
1482            "expected one force_unhealthy_for_route call"
1483        );
1484        assert_eq!(forced[0].0, "sql-consumer-test-route");
1485        assert_eq!(forced[0].1, "g:sql:consumer-pool-init");
1486        assert!(!forced[0].2.is_empty(), "reason should be non-empty");
1487    }
1488
1489    /// Regression: max_attempts=N → exactly N invocations (caught OpenSearch off-by-one 1f5c4c2a).
1490    /// Replicates the exact retry loop from SqlConsumer::start() (consumer.rs:343-367):
1491    ///   attempt starts at 0, incremented at top, should_retry(attempt), delay_for(attempt-1)
1492    #[tokio::test]
1493    async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
1494        use camel_component_api::NetworkRetryPolicy;
1495        use std::sync::Arc;
1496        use std::sync::atomic::{AtomicU32, Ordering};
1497
1498        let policy = NetworkRetryPolicy {
1499            max_attempts: 3,
1500            initial_delay: Duration::from_millis(1),
1501            max_delay: Duration::from_millis(1),
1502            multiplier: 1.0,
1503            ..NetworkRetryPolicy::default()
1504        };
1505
1506        let calls = Arc::new(AtomicU32::new(0));
1507        let calls_clone = Arc::clone(&calls);
1508
1509        let mut attempt: u32 = 0;
1510        let _result: Result<(), ()> = loop {
1511            attempt += 1;
1512            calls_clone.fetch_add(1, Ordering::SeqCst);
1513            let op_result: Result<(), ()> = Err(());
1514            match op_result {
1515                Ok(v) => break Ok(v),
1516                Err(_) if policy.should_retry(attempt) => {
1517                    let delay = policy.delay_for(attempt - 1);
1518                    tokio::time::sleep(delay).await;
1519                    continue;
1520                }
1521                Err(_) => break Err(()),
1522            }
1523        };
1524
1525        assert_eq!(
1526            calls.load(Ordering::SeqCst),
1527            3,
1528            "max_attempts=3 must yield exactly 3 invocations"
1529        );
1530    }
1531}