Skip to main content

camel_component_sql/
consumer.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures::TryStreamExt;
7use serde_json::Value as JsonValue;
8use sqlx::AnyPool;
9use sqlx::any::AnyPoolOptions;
10use sqlx::any::AnyRow;
11use tokio::sync::OnceCell;
12use tracing::{debug, error, info, warn};
13
14use camel_component_api::retry_async;
15use camel_component_api::{
16    Body, CamelError, Exchange, Message, RuntimeObservability, StreamBody, StreamMetadata,
17};
18use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
19
20use crate::config::{
21    PollStrategy, ProcessingStrategy, SqlEndpointConfig, SqlOutputType, TransactionMode,
22    enrich_db_url_with_ssl, redact_db_url,
23};
24use crate::headers;
25use crate::query::{QueryTemplate, parse_query_template, resolve_params};
26use crate::utils::{bind_json_values, is_retryable_sqlx_error, row_to_json};
27
28/// Record a post-process (b′) failure for ADR-0012 outside-contract sites in this
29/// consumer. Increments the per-label error metric AND emits an `error!` log
30/// per ADR-0012 L57 + L70-72 (the metric is the operator signal; `error!`
31/// provides loud log visibility — b′ errors are NOT absorbed by route handlers).
32///
33/// Both the metric call and the `error!` live INSIDE this helper so that
34/// `lint-log-levels`'s `has_replacement_signal` (scripts/xtask/src/main.rs)
35/// sees both literals in the helper's function body. Call sites have NO
36/// `error!` of their own.
37///
38/// Regression-tested by:
39/// - `record_post_process_failure_increments_errors_and_emits_error_log` (helper unit)
40/// - `unbridged_send_and_wait_failure_emits_error_loud` (StreamList integration path)
41fn record_post_process_failure(
42    runtime: &dyn RuntimeObservability,
43    route_id: &str,
44    label: &str,
45    error: &CamelError,
46    message: &str,
47) {
48    runtime.metrics().increment_errors(route_id, label);
49    // log-policy: outside-contract
50    error!(error = %error, "{message}");
51}
52
53pub struct SqlConsumer {
54    pub(crate) config: SqlEndpointConfig,
55    pub(crate) pool: Arc<OnceCell<AnyPool>>,
56    stopped: bool,
57    /// Runtime observability for metrics and health — used by the
58    /// `record_post_process_failure` helper for ADR-0012 (b′) metric calls.
59    runtime: Arc<dyn RuntimeObservability>,
60}
61
62impl SqlConsumer {
63    pub fn new(
64        config: SqlEndpointConfig,
65        pool: Arc<OnceCell<AnyPool>>,
66        runtime: Arc<dyn RuntimeObservability>,
67    ) -> Self {
68        Self {
69            config,
70            pool,
71            stopped: false,
72            runtime,
73        }
74    }
75
76    /// Poll the database for new rows and process them.
77    async fn poll_database(
78        &self,
79        pool: &AnyPool,
80        context: &ConsumerContext,
81        template: &QueryTemplate,
82    ) -> Result<(), CamelError> {
83        // Capture route_id from ConsumerContext for ADR-0012 metrics
84        let route_id = context.route_id();
85
86        // Create an empty exchange for parameter resolution (consumer has no input)
87        let empty_exchange = Exchange::new(Message::default());
88
89        // Resolve parameters
90        let prepared = resolve_params(template, &empty_exchange, &self.config.in_separator)?;
91
92        debug!(query = %prepared.sql, "executing SQL consumer poll");
93
94        if self.config.output_type == SqlOutputType::StreamList {
95            return self.poll_database_stream(pool, context, &prepared).await;
96        }
97
98        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
99        let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
100            warn!(error = %e, "SQL consumer poll query failed");
101            CamelError::ProcessorError(format!("Query execution failed: {}", e))
102        })?;
103
104        debug!(rows = rows.len(), "SQL consumer poll completed");
105
106        if rows.is_empty() && !self.config.route_empty_result_set {
107            return Ok(());
108        }
109
110        let rows_to_process: Vec<AnyRow> = if let Some(max) = self.config.max_messages_per_poll {
111            if max > 0 {
112                rows.into_iter().take(max as usize).collect()
113            } else {
114                rows
115            }
116        } else {
117            rows
118        };
119
120        if self.config.use_iterator {
121            // Process each row individually
122            for row in rows_to_process {
123                let row_json = row_to_json(&row)?;
124
125                // Create exchange with the row as JSON body
126                let mut msg = Message::new(Body::Json(row_json.clone()));
127
128                // Set individual column headers with CamelSql. prefix per Apache Camel convention
129                if let Some(obj) = row_json.as_object() {
130                    for (key, value) in obj {
131                        msg.set_header(format!("CamelSql.{}", key), value.clone());
132                    }
133                }
134
135                let exchange = Exchange::new(msg);
136
137                // Send and wait for processing
138                let result = context.send_and_wait(exchange).await;
139
140                // Handle post-processing (onConsume/onConsumeFailed)
141                if let Err(e) = self.handle_post_processing(pool, &result, &row_json).await {
142                    record_post_process_failure(
143                        self.runtime.as_ref(),
144                        route_id,
145                        "b-prime:sql:on-consume",
146                        &e,
147                        "Post-processing failed",
148                    );
149                    if self.config.break_batch_on_consume_fail {
150                        return Err(e);
151                    }
152                }
153
154                // If downstream processing itself failed, honour break_batch_on_consume_fail
155                if let Err(ref consume_err) = result
156                    && self.config.break_batch_on_consume_fail
157                {
158                    return Err(consume_err.clone());
159                }
160            }
161        } else {
162            // Process all rows as a single batch
163            let rows_json: Vec<JsonValue> = rows_to_process
164                .iter()
165                .map(row_to_json)
166                .collect::<Result<Vec<_>, CamelError>>()?;
167
168            let row_count = rows_json.len();
169
170            // Create exchange with array of rows
171            let mut msg = Message::new(Body::Json(JsonValue::Array(rows_json.clone())));
172            msg.set_header(headers::ROW_COUNT, JsonValue::Number(row_count.into()));
173
174            let exchange = Exchange::new(msg);
175
176            // Send and wait for result
177            let result = context.send_and_wait(exchange).await;
178
179            // SQL-021: Run per-row post-processing even in batch mode so that
180            // onConsume/onConsumeFailed queries can reference row-specific parameters
181            // (e.g. `:#id`). Each row gets its own post-processing query execution.
182            for row_json in rows_json.iter() {
183                if let Err(e) = self.handle_post_processing(pool, &result, row_json).await {
184                    record_post_process_failure(
185                        self.runtime.as_ref(),
186                        route_id,
187                        "b-prime:sql:on-consume-batch",
188                        &e,
189                        "Post-processing failed for batch row",
190                    );
191                    if self.config.break_batch_on_consume_fail {
192                        return Err(e);
193                    }
194                }
195            }
196
197            // If downstream processing itself failed, honour break_batch_on_consume_fail
198            if let Err(ref consume_err) = result
199                && self.config.break_batch_on_consume_fail
200            {
201                return Err(consume_err.clone());
202            }
203        }
204
205        // Execute on_consume_batch_complete if configured
206        if let Some(ref batch_query) = self.config.on_consume_batch_complete {
207            let _ = self
208                .execute_post_query(pool, batch_query, &JsonValue::Null)
209                .await;
210        }
211
212        Ok(())
213    }
214
215    async fn poll_database_stream(
216        &self,
217        pool: &AnyPool,
218        context: &ConsumerContext,
219        prepared: &crate::query::PreparedQuery,
220    ) -> Result<(), CamelError> {
221        let pool_clone = pool.clone();
222        let sql_str = prepared.sql.clone();
223        let bindings = prepared.bindings.clone();
224
225        let byte_stream = async_stream::try_stream! {
226            let mut q = sqlx::query(&sql_str);
227            q = bind_json_values(q, &bindings);
228            let mut rows = q.fetch(&pool_clone);
229            while let Some(row) = rows.try_next().await.map_err(|e| {
230                CamelError::ProcessorError(format!("Query execution failed: {}", e))
231            })? {
232                let json_val = row_to_json(&row).map_err(|e| {
233                    CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
234                })?;
235                let mut bytes = serde_json::to_vec(&json_val)
236                    .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
237                bytes.push(b'\n');
238                yield Bytes::from(bytes);
239            }
240        };
241
242        let msg = Message::new(Body::Stream(StreamBody {
243            stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
244            metadata: StreamMetadata {
245                content_type: Some("application/x-ndjson".to_string()),
246                size_hint: None,
247                origin: None,
248            },
249        }));
250
251        let exchange = Exchange::new(msg);
252        let result = context.send_and_wait(exchange).await;
253        if let Err(e) = result {
254            record_post_process_failure(
255                self.runtime.as_ref(),
256                context.route_id(),
257                "b-prime:sql:stream-list",
258                &e,
259                "StreamList consumer downstream processing failed",
260            );
261            return Err(e);
262        }
263
264        debug!("StreamList: consumer poll completed (lazy stream emitted)");
265        Ok(())
266    }
267
268    /// Handle post-processing after a row is processed (onConsume/onConsumeFailed).
269    async fn handle_post_processing(
270        &self,
271        pool: &AnyPool,
272        result: &Result<Exchange, CamelError>,
273        row_json: &JsonValue,
274    ) -> Result<(), CamelError> {
275        match result {
276            Ok(_) => {
277                // Success - execute onConsume if configured
278                if let Some(ref on_consume) = self.config.on_consume {
279                    self.execute_post_query(pool, on_consume, row_json).await?;
280                }
281            }
282            Err(_) => {
283                // Failure - execute onConsumeFailed if configured
284                if let Some(ref on_consume_failed) = self.config.on_consume_failed {
285                    self.execute_post_query(pool, on_consume_failed, row_json)
286                        .await?;
287                }
288            }
289        }
290        Ok(())
291    }
292
293    /// Execute a post-processing query with the row data as parameters.
294    async fn execute_post_query(
295        &self,
296        pool: &AnyPool,
297        query_str: &str,
298        row_json: &JsonValue,
299    ) -> Result<(), CamelError> {
300        // Parse the query template
301        let template = parse_query_template(query_str, self.config.placeholder)?;
302
303        // Create a temporary exchange with the row as body for parameter resolution
304        // Populate CamelSql.* headers so named params can reference them
305        let mut temp_msg = Message::new(Body::Json(row_json.clone()));
306        if let Some(obj) = row_json.as_object() {
307            for (key, value) in obj {
308                temp_msg.set_header(format!("CamelSql.{}", key), value.clone());
309            }
310        }
311        let temp_exchange = Exchange::new(temp_msg);
312
313        // Resolve parameters
314        let prepared = resolve_params(&template, &temp_exchange, &self.config.in_separator)?;
315
316        // Build and execute the query
317        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
318        let result = query.execute(pool).await.map_err(|e| {
319            CamelError::ProcessorError(format!("Post-query execution failed: {}", e))
320        })?;
321
322        // Warn if 0 rows affected (the row may not have been marked correctly)
323        if result.rows_affected() == 0 {
324            warn!(
325                query = query_str,
326                "Post-processing query affected 0 rows — the row may not have been marked correctly"
327            );
328        }
329
330        Ok(())
331    }
332
333    /// Handle the result of a single poll cycle, including bridging if configured.
334    /// Extracted from `run()` so tests can exercise the error-handling branch directly.
335    async fn handle_poll_result(
336        &self,
337        pool: &AnyPool,
338        context: &ConsumerContext,
339        template: &QueryTemplate,
340    ) {
341        if let Err(e) = self.poll_database(pool, context, template).await {
342            if self.config.bridge_error_handler {
343                // log-policy: handler-owned
344                // (category b-bridged: error will be wrapped as Exchange
345                // and flow into the route's error handler)
346                warn!(error = %e, "SQL consumer poll failed (bridged)");
347                if let Err(route_err) = self.bridge_poll_error(context, e).await {
348                    // (the bridge channel itself broke — route will CrashNotification per ADR-0007)
349                    // log-policy: system-broken
350                    error!(error = %route_err, "Failed to bridge SQL consumer error to route");
351                }
352            } else {
353                record_post_process_failure(
354                    self.runtime.as_ref(),
355                    context.route_id(),
356                    "b-prime:sql:poll-failed",
357                    &e,
358                    "SQL consumer poll failed",
359                );
360            }
361        }
362    }
363
364    async fn bridge_poll_error(
365        &self,
366        context: &ConsumerContext,
367        error: CamelError,
368    ) -> Result<(), CamelError> {
369        if !self.config.bridge_error_handler {
370            return Ok(());
371        }
372        let mut exchange = Exchange::new(Message::default());
373        exchange.set_error(error);
374        context.send_and_wait(exchange).await.map(|_| ())
375    }
376}
377
378#[async_trait]
379impl Consumer for SqlConsumer {
380    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
381        // Reject double-start
382        if self.stopped {
383            return Err(CamelError::Config(
384                "SQL consumer cannot be restarted after stop".into(),
385            ));
386        }
387
388        // Step 1: Initialize the connection pool
389        let route_id = context.route_id().to_string();
390        let pool = self
391            .pool
392            .get_or_try_init(|| async {
393                // Defensive: ensure config is resolved even if caller didn't use create_endpoint
394                self.config.resolve_defaults();
395                // SQL-014: resolve file-based query asynchronously (not blocking)
396                self.config.resolve_file_query().await?;
397
398                // Install all compiled-in sqlx drivers so AnyPool can resolve them.
399                // This is idempotent; safe to call multiple times.
400                sqlx::any::install_default_drivers();
401                let db_url = enrich_db_url_with_ssl(&self.config.db_url, &self.config)?;
402
403                let max_conn = self.config.max_connections.ok_or_else(|| {
404                    CamelError::Config("max_connections not resolved for SQL consumer pool".into())
405                })?;
406                let min_conn = self.config.min_connections.ok_or_else(|| {
407                    CamelError::Config("min_connections not resolved for SQL consumer pool".into())
408                })?;
409                let idle_timeout = self.config.idle_timeout_secs.ok_or_else(|| {
410                    CamelError::Config(
411                        "idle_timeout_secs not resolved for SQL consumer pool".into(),
412                    )
413                })?;
414                let max_lifetime = self.config.max_lifetime_secs.ok_or_else(|| {
415                    CamelError::Config(
416                        "max_lifetime_secs not resolved for SQL consumer pool".into(),
417                    )
418                })?;
419
420                info!(
421                    db_url = %redact_db_url(&self.config.db_url),
422                    "SQL consumer pool initializing"
423                );
424                let retry_policy = &self.config.retry;
425                retry_async::<_, _, _, _, sqlx::Error>(
426                    retry_policy,
427                    Some("sql-consumer"),
428                    || {
429                        AnyPoolOptions::new()
430                            .max_connections(max_conn)
431                            .min_connections(min_conn)
432                            .idle_timeout(Duration::from_secs(idle_timeout))
433                            .max_lifetime(Duration::from_secs(max_lifetime))
434                            .connect(&db_url)
435                    },
436                    is_retryable_sqlx_error,
437                )
438                .await
439                .map_err(|e| {
440                    self.runtime.health().force_unhealthy_for_route(
441                        &route_id,
442                        "g:sql:consumer-pool-init",
443                        &e.to_string(),
444                    );
445                    // log-policy: outside-contract
446                    error!(error = %e, db_url = %redact_db_url(&self.config.db_url), "SQL connect failed, giving up");
447                    CamelError::EndpointCreationFailed(format!(
448                        "Failed to connect to database: {}",
449                        e
450                    ))
451                })
452            })
453            .await?;
454
455        // SQL-002: warn if Managed transaction mode requested
456        if self.config.transaction_mode == TransactionMode::Managed {
457            warn!("transactionManager not yet implemented; using Auto mode");
458        }
459
460        // SQL-017/SQL-018: log processing and poll strategies
461        if self.config.processing_strategy == ProcessingStrategy::Scheduled {
462            debug!(
463                "Processing strategy: Scheduled (rows dispatched individually via send_and_wait)"
464            );
465        }
466        if self.config.poll_strategy == PollStrategy::Burst {
467            debug!("Poll strategy: Burst (rapid successive polls)");
468        }
469
470        if self.config.output_type == SqlOutputType::StreamList
471            && (self.config.on_consume.is_some()
472                || self.config.on_consume_failed.is_some()
473                || self.config.on_consume_batch_complete.is_some())
474        {
475            warn!(
476                "onConsume/onConsumeFailed/onConsumeBatchComplete are not executed in StreamList mode \
477                 (rows are consumed lazily downstream)"
478            );
479        }
480
481        // Warn if no onConsume configured
482        if self.config.on_consume.is_none() {
483            warn!(
484                "SQL consumer started without onConsume configured — consumed rows will not be marked/deleted"
485            );
486        }
487
488        info!(
489            db_url = %redact_db_url(&self.config.db_url),
490            query_len = self.config.query.len(),
491            "SQL consumer started"
492        );
493
494        // Step 2: Parse query template once (avoid re-parsing every poll)
495        let template = parse_query_template(&self.config.query, self.config.placeholder)
496            .map_err(|e| CamelError::Config(format!("Invalid query template: {}", e)))?;
497
498        // Step 3: Initial delay before starting polling
499        if self.config.initial_delay_ms > 0 {
500            tokio::select! {
501                _ = context.cancelled() => {
502                    info!("SQL consumer stopped during initial delay");
503                    return Ok(());
504                }
505                _ = tokio::time::sleep(Duration::from_millis(self.config.initial_delay_ms)) => {}
506            }
507        }
508
509        // Step 4: Polling loop
510        //
511        // This is a POLLING LOOP with fixed cadence (delay_ms), NOT a
512        // retry loop. It polls the database until cancelled or repeat_count
513        // is reached — there is no "transient error → retry with backoff"
514        // contract at this level. retry_async / retry_async_cancelable do
515        // not apply because they are designed for bounded retry, not
516        // repeated polling with uniform delay.
517        //
518        // The pool-connect retry at startup (Step 1) was migrated to
519        // retry_async in rc-d2r. The per-poll error handling (poll_database
520        // failures) is an error-bridge pattern, not a retry loop.
521        //
522        // See camel-redis/src/consumer.rs:325 for a similar polling-loop
523        // justification.
524        let mut poll_count: u32 = 0;
525        loop {
526            // SQL-015: check repeat_count limit
527            if let Some(max_repeats) = self.config.repeat_count
528                && poll_count >= max_repeats
529            {
530                info!(
531                    repeat_count = max_repeats,
532                    "SQL consumer reached repeat_count limit, stopping"
533                );
534                break;
535            }
536
537            tokio::select! {
538                _ = context.cancelled() => {
539                    info!("SQL consumer stopped");
540                    break;
541                }
542                _ = tokio::time::sleep(Duration::from_millis(self.config.delay_ms)) => {
543                    poll_count += 1;
544                    self.handle_poll_result(pool, &context, &template).await;
545                }
546            }
547        }
548
549        Ok(())
550    }
551
552    async fn stop(&mut self) -> Result<(), CamelError> {
553        // Double-stop is safe — no-op after first stop
554        if self.stopped {
555            debug!("SQL consumer stop called on already-stopped consumer");
556            return Ok(());
557        }
558
559        // Close the connection pool if it was initialized
560        if let Some(pool) = self.pool.get() {
561            debug!("SQL consumer closing connection pool");
562            pool.close().await;
563            debug!("SQL consumer pool closed");
564        }
565
566        self.stopped = true;
567        info!("SQL consumer stopped");
568        Ok(())
569    }
570
571    fn concurrency_model(&self) -> ConcurrencyModel {
572        // Sequential is correct for SQL consumers: concurrent polls would fetch
573        // duplicate rows. The design doc mentioned SharedState (which doesn't exist
574        // in this runtime) — Sequential is the correct equivalent.
575        ConcurrencyModel::Sequential
576    }
577}
578
579#[cfg(test)]
580mod tests {
581    use super::*;
582    use camel_api::MetricsCollector;
583    use camel_component_api::HealthCheckRegistry;
584    use camel_component_api::test_support::PanicRuntimeObservability;
585    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
586        std::sync::Arc::new(PanicRuntimeObservability)
587    }
588    use crate::config::SqlEndpointConfig;
589    use camel_component_api::ExchangeEnvelope;
590    use camel_component_api::UriConfig;
591    use sqlx::any::AnyPoolOptions;
592    use std::sync::Arc;
593    use std::sync::Mutex;
594    use std::time::Duration;
595    use tokio::sync::mpsc;
596    use tokio_util::sync::CancellationToken;
597
598    // -----------------------------------------------------------------------
599    // Recording metrics collector for testing increment_errors calls
600    // -----------------------------------------------------------------------
601
602    struct RecordingMetrics {
603        errors: Arc<Mutex<Vec<(String, String)>>>,
604    }
605
606    impl MetricsCollector for RecordingMetrics {
607        fn record_exchange_duration(&self, _: &str, _: Duration) {}
608        fn increment_errors(&self, route_id: &str, error_type: &str) {
609            self.errors
610                .lock()
611                .unwrap()
612                .push((route_id.to_string(), error_type.to_string()));
613        }
614        fn increment_exchanges(&self, _: &str) {}
615        fn set_queue_depth(&self, _: &str, _: usize) {}
616        fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
617    }
618
619    struct RecordingRuntime {
620        metrics_collector: Arc<RecordingMetrics>,
621    }
622
623    impl RecordingRuntime {
624        fn new(errors: Arc<Mutex<Vec<(String, String)>>>) -> Self {
625            Self {
626                metrics_collector: Arc::new(RecordingMetrics { errors }),
627            }
628        }
629    }
630
631    impl RuntimeObservability for RecordingRuntime {
632        fn metrics(&self) -> Arc<dyn MetricsCollector> {
633            self.metrics_collector.clone() as Arc<dyn MetricsCollector>
634        }
635        fn health(&self) -> Arc<dyn HealthCheckRegistry> {
636            panic!("RecordingRuntime::health not used in this test")
637        }
638    }
639
640    /// Regression test for ADR-0012: the record_post_process_failure helper
641    /// must increment the error metric with the correct route_id and label,
642    /// AND emit error! via tracing.
643    #[tracing_test::traced_test]
644    #[test]
645    fn record_post_process_failure_increments_errors_and_emits_error_log() {
646        let errors: Arc<Mutex<Vec<(String, String)>>> = Arc::new(Mutex::new(Vec::new()));
647        let runtime = Arc::new(RecordingRuntime::new(Arc::clone(&errors)));
648        let error = CamelError::ProcessorError("test failure".to_string());
649
650        // Directly invoke the helper
651        record_post_process_failure(
652            runtime.as_ref(),
653            "test-route",
654            "b-prime:sql:on-consume",
655            &error,
656            "Post-processing failed",
657        );
658
659        // Verify MetricsCollector::increment_errors was called
660        let recorded = errors.lock().unwrap();
661        assert_eq!(recorded.len(), 1, "expected 1 increment_errors call");
662        assert_eq!(recorded[0].0, "test-route");
663        assert_eq!(recorded[0].1, "b-prime:sql:on-consume");
664        drop(recorded);
665
666        // Verify error! was emitted
667        assert!(logs_contain("ERROR"), "helper must emit error! log");
668        assert!(
669            logs_contain("Post-processing failed"),
670            "helper must include the message in the log"
671        );
672    }
673
674    async fn sqlite_pool() -> AnyPool {
675        sqlx::any::install_default_drivers();
676        AnyPoolOptions::new()
677            .max_connections(1)
678            .connect("sqlite::memory:")
679            .await
680            .expect("sqlite pool")
681    }
682
683    async fn seed_consumer_table(pool: &AnyPool) {
684        sqlx::query("CREATE TABLE jobs (id INTEGER PRIMARY KEY, processed INTEGER DEFAULT 0, failed INTEGER DEFAULT 0)")
685            .execute(pool)
686            .await
687            .expect("create table");
688        sqlx::query("INSERT INTO jobs (id, processed, failed) VALUES (1, 0, 0), (2, 0, 0)")
689            .execute(pool)
690            .await
691            .expect("seed rows");
692    }
693
694    fn config() -> SqlEndpointConfig {
695        let mut c =
696            SqlEndpointConfig::from_uri("sql:select * from t?db_url=postgres://localhost/test")
697                .unwrap();
698        c.resolve_defaults();
699        c
700    }
701
702    #[test]
703    fn consumer_concurrency_model() {
704        let c = SqlConsumer::new(config(), Arc::new(OnceCell::new()), test_rt());
705        assert_eq!(c.concurrency_model(), ConcurrencyModel::Sequential);
706    }
707
708    #[test]
709    fn consumer_stores_config() {
710        let mut config = SqlEndpointConfig::from_uri(
711            "sql:select * from t?db_url=postgres://localhost/test&delay=2000&onConsume=update t set done=true"
712        ).unwrap();
713        config.resolve_defaults();
714        let c = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
715        assert_eq!(c.config.delay_ms, 2000);
716        assert!(c.config.on_consume.is_some());
717    }
718
719    #[tokio::test]
720    async fn poll_database_runs_on_consume_for_successful_rows() {
721        let pool = sqlite_pool().await;
722        seed_consumer_table(&pool).await;
723
724        let mut config = SqlEndpointConfig::from_uri(
725            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
726        )
727        .unwrap();
728        config.resolve_defaults();
729
730        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
731        let template = parse_query_template(&config.query, config.placeholder).unwrap();
732
733        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
734        tokio::spawn(async move {
735            while let Some(env) = rx.recv().await {
736                if let Some(reply_tx) = env.reply_tx {
737                    let _ = reply_tx.send(Ok(env.exchange));
738                }
739            }
740        });
741        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
742
743        consumer
744            .poll_database(&pool, &ctx, &template)
745            .await
746            .expect("poll must succeed");
747
748        let row = sqlx::query("select processed from jobs where id = 1")
749            .fetch_one(&pool)
750            .await
751            .expect("row 1");
752        let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
753
754        let row = sqlx::query("select processed from jobs where id = 2")
755            .fetch_one(&pool)
756            .await
757            .expect("row 2");
758        let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
759
760        assert_eq!(processed_1, 1);
761        assert_eq!(processed_2, 1);
762    }
763
764    #[tokio::test]
765    async fn poll_database_runs_on_consume_failed_when_downstream_fails() {
766        let pool = sqlite_pool().await;
767        seed_consumer_table(&pool).await;
768
769        let mut config = SqlEndpointConfig::from_uri(
770            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&initialDelay=0&delay=1",
771        )
772        .unwrap();
773        config.resolve_defaults();
774
775        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
776        let template = parse_query_template(&config.query, config.placeholder).unwrap();
777
778        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
779        tokio::spawn(async move {
780            while let Some(env) = rx.recv().await {
781                if let Some(reply_tx) = env.reply_tx {
782                    let _ =
783                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
784                }
785            }
786        });
787        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
788
789        consumer
790            .poll_database(&pool, &ctx, &template)
791            .await
792            .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
793
794        let row = sqlx::query("select failed from jobs where id = 1")
795            .fetch_one(&pool)
796            .await
797            .expect("row 1");
798        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
799
800        let row = sqlx::query("select failed from jobs where id = 2")
801            .fetch_one(&pool)
802            .await
803            .expect("row 2");
804        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
805
806        assert_eq!(failed_1, 1);
807        assert_eq!(failed_2, 1);
808    }
809
810    #[tokio::test]
811    async fn poll_database_breaks_batch_on_consume_fail() {
812        let pool = sqlite_pool().await;
813        seed_consumer_table(&pool).await;
814
815        let mut config = SqlEndpointConfig::from_uri(
816            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&breakBatchOnConsumeFail=true&initialDelay=0&delay=1",
817        )
818        .unwrap();
819        config.resolve_defaults();
820
821        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
822        let template = parse_query_template(&config.query, config.placeholder).unwrap();
823
824        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
825        tokio::spawn(async move {
826            while let Some(env) = rx.recv().await {
827                if let Some(reply_tx) = env.reply_tx {
828                    let _ =
829                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
830                }
831            }
832        });
833        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
834
835        let err = consumer
836            .poll_database(&pool, &ctx, &template)
837            .await
838            .expect_err("must stop on first downstream failure");
839        assert!(err.to_string().contains("downstream boom"));
840
841        let row = sqlx::query("select failed from jobs where id = 1")
842            .fetch_one(&pool)
843            .await
844            .expect("row 1");
845        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
846
847        let row = sqlx::query("select failed from jobs where id = 2")
848            .fetch_one(&pool)
849            .await
850            .expect("row 2");
851        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
852
853        assert_eq!(failed_1, 1);
854        assert_eq!(failed_2, 0, "second row must not be processed");
855    }
856
857    // --- Phase B hardening tests ---
858
859    // SQL-001: Direct consumer construction without resolve_defaults does not panic.
860    // The consumer defensively calls resolve_defaults() during pool init, so the pool
861    // fields get resolved. This test verifies no panic occurs.
862    #[tokio::test]
863    async fn consumer_no_panic_without_prior_resolve_defaults() {
864        let config = SqlEndpointConfig::from_uri(
865            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
866        )
867        .unwrap();
868        // Deliberately NOT calling resolve_defaults() — pool fields remain None
869        assert!(config.max_connections.is_none());
870
871        let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), test_rt());
872        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
873        tokio::spawn(async move {
874            while let Some(env) = rx.recv().await {
875                if let Some(reply_tx) = env.reply_tx {
876                    let _ = reply_tx.send(Ok(env.exchange));
877                }
878            }
879        });
880        let token = CancellationToken::new();
881        let ctx = ConsumerContext::new(tx, token.clone(), "sql-test-route".to_string());
882
883        // Spawn the consumer and cancel it quickly — it should not panic
884        let consumer_handle = tokio::spawn(async move { consumer.start(ctx).await });
885
886        // Cancel after a short delay
887        tokio::time::sleep(Duration::from_millis(50)).await;
888        token.cancel();
889
890        let result = consumer_handle.await.expect("task should not panic");
891        // Should complete without panic (may be Ok or Err depending on timing)
892        let _ = result;
893    }
894
895    // SQL-008: stop() closes the pool
896    #[tokio::test]
897    async fn stop_closes_pool() {
898        let pool = sqlite_pool().await;
899        seed_consumer_table(&pool).await;
900
901        let mut config = SqlEndpointConfig::from_uri(
902            "sql:select id from jobs?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
903        )
904        .unwrap();
905        config.resolve_defaults();
906
907        let pool_cell = Arc::new(OnceCell::new());
908        pool_cell.set(pool.clone()).unwrap();
909
910        let mut consumer = SqlConsumer::new(config, pool_cell, test_rt());
911        consumer.stop().await.expect("stop should succeed");
912
913        // After stop, the pool should be closed
914        assert!(
915            pool.is_closed(),
916            "Pool should be closed after consumer.stop()"
917        );
918    }
919
920    // SQL-008: double-stop is safe
921    #[tokio::test]
922    async fn double_stop_is_safe() {
923        let pool = sqlite_pool().await;
924        let mut config = SqlEndpointConfig::from_uri(
925            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
926        )
927        .unwrap();
928        config.resolve_defaults();
929
930        let pool_cell = Arc::new(OnceCell::new());
931        pool_cell.set(pool.clone()).unwrap();
932
933        let mut consumer = SqlConsumer::new(config, pool_cell, test_rt());
934        consumer.stop().await.expect("first stop should succeed");
935        consumer
936            .stop()
937            .await
938            .expect("second stop should also succeed");
939    }
940
941    // SQL-008: start after stop is rejected
942    #[tokio::test]
943    async fn start_after_stop_rejected() {
944        let pool = sqlite_pool().await;
945        let mut config = SqlEndpointConfig::from_uri(
946            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
947        )
948        .unwrap();
949        config.resolve_defaults();
950
951        let pool_cell = Arc::new(OnceCell::new());
952        pool_cell.set(pool.clone()).unwrap();
953
954        let mut consumer = SqlConsumer::new(config, pool_cell, test_rt());
955        consumer.stop().await.expect("stop should succeed");
956
957        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
958        tokio::spawn(async move {
959            while let Some(env) = rx.recv().await {
960                if let Some(reply_tx) = env.reply_tx {
961                    let _ = reply_tx.send(Ok(env.exchange));
962                }
963            }
964        });
965        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
966
967        let result = consumer.start(ctx).await;
968        assert!(result.is_err());
969        let err_msg = result.unwrap_err().to_string();
970        assert!(
971            err_msg.contains("cannot be restarted") || err_msg.contains("after stop"),
972            "Expected restart error, got: {}",
973            err_msg
974        );
975    }
976
977    // SQL-021: batch mode per-row post-processing
978    #[tokio::test]
979    async fn batch_mode_per_row_post_processing() {
980        let pool = sqlite_pool().await;
981        seed_consumer_table(&pool).await;
982
983        let mut config = SqlEndpointConfig::from_uri(
984            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
985        )
986        .unwrap();
987        config.resolve_defaults();
988
989        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
990        let template = parse_query_template(&config.query, config.placeholder).unwrap();
991
992        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
993        tokio::spawn(async move {
994            while let Some(env) = rx.recv().await {
995                if let Some(reply_tx) = env.reply_tx {
996                    let _ = reply_tx.send(Ok(env.exchange));
997                }
998            }
999        });
1000        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1001
1002        consumer
1003            .poll_database(&pool, &ctx, &template)
1004            .await
1005            .expect("poll must succeed");
1006
1007        // SQL-021: Each row should have been processed individually via onConsume
1008        let row = sqlx::query("select processed from jobs where id = 1")
1009            .fetch_one(&pool)
1010            .await
1011            .expect("row 1");
1012        let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
1013
1014        let row = sqlx::query("select processed from jobs where id = 2")
1015            .fetch_one(&pool)
1016            .await
1017            .expect("row 2");
1018        let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
1019
1020        assert_eq!(
1021            processed_1, 1,
1022            "row 1 should be marked processed via per-row onConsume"
1023        );
1024        assert_eq!(
1025            processed_2, 1,
1026            "row 2 should be marked processed via per-row onConsume"
1027        );
1028    }
1029
1030    // SQL-021: batch mode per-row onConsumeFailed when downstream fails
1031    #[tokio::test]
1032    async fn batch_mode_per_row_post_processing_on_failure() {
1033        let pool = sqlite_pool().await;
1034        seed_consumer_table(&pool).await;
1035
1036        let mut config = SqlEndpointConfig::from_uri(
1037            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
1038        )
1039        .unwrap();
1040        config.resolve_defaults();
1041
1042        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1043        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1044
1045        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
1046        tokio::spawn(async move {
1047            while let Some(env) = rx.recv().await {
1048                if let Some(reply_tx) = env.reply_tx {
1049                    let _ =
1050                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
1051                }
1052            }
1053        });
1054        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1055
1056        consumer
1057            .poll_database(&pool, &ctx, &template)
1058            .await
1059            .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
1060
1061        // SQL-021: Each row should have onConsumeFailed executed individually
1062        let row = sqlx::query("select failed from jobs where id = 1")
1063            .fetch_one(&pool)
1064            .await
1065            .expect("row 1");
1066        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
1067
1068        let row = sqlx::query("select failed from jobs where id = 2")
1069            .fetch_one(&pool)
1070            .await
1071            .expect("row 2");
1072        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
1073
1074        assert_eq!(
1075            failed_1, 1,
1076            "row 1 should be marked failed via per-row onConsumeFailed"
1077        );
1078        assert_eq!(
1079            failed_2, 1,
1080            "row 2 should be marked failed via per-row onConsumeFailed"
1081        );
1082    }
1083
1084    #[tokio::test]
1085    async fn bridge_error_handler_routes_poll_errors_to_exchange_error() {
1086        let mut config = config();
1087        config.bridge_error_handler = true;
1088        let consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), test_rt());
1089
1090        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1091        tokio::spawn(async move {
1092            while let Some(env) = rx.recv().await {
1093                assert!(env.exchange.error.is_some(), "exchange must carry error");
1094                if let Some(reply_tx) = env.reply_tx {
1095                    let _ = reply_tx.send(Ok(env.exchange));
1096                }
1097                break;
1098            }
1099        });
1100
1101        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1102        consumer
1103            .bridge_poll_error(&ctx, CamelError::ProcessorError("poll failed".into()))
1104            .await
1105            .expect("bridging should succeed");
1106    }
1107
1108    /// Regression for ADR-0012: when bridge_error_handler=true, the poll
1109    /// failure must NOT emit error! (the route's error handler owns ERROR
1110    /// for bridged failures). Was previously duplicated at line 429 + 431.
1111    #[tracing_test::traced_test]
1112    #[tokio::test]
1113    async fn bridged_poll_failure_emits_warn_not_error() {
1114        let pool = sqlite_pool().await;
1115        // Do NOT create any table — the query against a non-existent
1116        // table will fail at fetch_all, returning Err BEFORE any
1117        // downstream send (so lines 103/205 are never reached).
1118
1119        let mut config = config();
1120        config.bridge_error_handler = true;
1121        // Query a non-existent table to trigger a query-failure poll error.
1122        config.query = "select * from nonexistent_table".to_string();
1123        config.resolve_defaults();
1124        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1125        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1126
1127        // Healthy downstream — replies Ok so bridge_poll_error succeeds
1128        // and does NOT emit its own error!.
1129        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1130        tokio::spawn(async move {
1131            while let Some(env) = rx.recv().await {
1132                if let Some(reply_tx) = env.reply_tx {
1133                    let _ = reply_tx.send(Ok(env.exchange));
1134                }
1135            }
1136        });
1137        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1138
1139        // Drive poll — fetch_all will fail because the table is missing.
1140        consumer.handle_poll_result(&pool, &ctx, &template).await;
1141
1142        // The bridged path must NOT emit ERROR (handler owns it).
1143        assert!(
1144            !logs_contain("ERROR"),
1145            "bridged poll failure must not emit ERROR (handler owns it); check captured logs for stray ERROR lines"
1146        );
1147        // Sanity: warn! was emitted so the failure is still visible.
1148        assert!(
1149            logs_contain("WARN"),
1150            "bridged poll failure should emit warn! for operator visibility"
1151        );
1152    }
1153
1154    /// Regression for ADR-0012 "b-bridged discriminator": when
1155    /// send_and_wait returns Err on a NORMAL-DATA send (i.e., not a
1156    /// deliberate bridge_poll_error handoff), the route handler did NOT
1157    /// absorb the failure (consumer.rs:77-91 contract; error_handler.rs
1158    /// returns Ok in every branch). The consumer's error! is the only
1159    /// ERROR signal for the unhandled failure and MUST stay at error!.
1160    ///
1161    /// Protects consumer.rs:205 (StreamList downstream send) and any
1162    /// future site that uses send_and_wait on a non-bridge path.
1163    #[tracing_test::traced_test]
1164    #[tokio::test]
1165    async fn unbridged_send_and_wait_failure_emits_error_loud() {
1166        let pool = sqlite_pool().await;
1167        sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
1168            .execute(&pool)
1169            .await
1170            .expect("create table");
1171        sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha')")
1172            .execute(&pool)
1173            .await
1174            .expect("seed rows");
1175
1176        let mut config = SqlEndpointConfig::from_uri(
1177            "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1178        )
1179        .unwrap();
1180        config.resolve_defaults();
1181        // Explicitly non-bridged: normal-data send path.
1182        config.bridge_error_handler = false;
1183        let consumer = SqlConsumer::new(
1184            config.clone(),
1185            Arc::new(OnceCell::new()),
1186            Arc::new(RecordingRuntime::new(Arc::new(Mutex::new(Vec::new())))),
1187        );
1188        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1189
1190        // Downstream that returns Err — simulates unhandled route failure.
1191        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
1192        tokio::spawn(async move {
1193            while let Some(env) = rx.recv().await {
1194                if let Some(reply_tx) = env.reply_tx {
1195                    let _ = reply_tx.send(Err(CamelError::ProcessorError("boom".into())));
1196                }
1197            }
1198        });
1199        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1200
1201        let _ = consumer.poll_database(&pool, &ctx, &template).await;
1202
1203        // The unbridged path MUST emit ERROR — consumer owns the signal.
1204        assert!(
1205            logs_contain("ERROR"),
1206            "unbridged send_and_wait failure MUST emit ERROR (consumer owns the signal)"
1207        );
1208    }
1209
1210    /// Regression for ADR-0012: when bridge_error_handler=false, the unbridged
1211    /// branch of handle_poll_result MUST emit ERROR for unhandled poll failure.
1212    #[tracing_test::traced_test]
1213    #[tokio::test]
1214    async fn unbridged_handle_poll_result_emits_error_loud() {
1215        let pool = sqlite_pool().await;
1216        // Do NOT create any table — fetch_all will fail in poll_database.
1217
1218        let mut config = config();
1219        config.bridge_error_handler = false;
1220        config.query = "select * from nonexistent_table".to_string();
1221        config.resolve_defaults();
1222        let consumer = SqlConsumer::new(
1223            config.clone(),
1224            Arc::new(OnceCell::new()),
1225            Arc::new(RecordingRuntime::new(Arc::new(Mutex::new(Vec::new())))),
1226        );
1227        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1228
1229        // Healthy downstream task; should not be reached for this poll-failure path.
1230        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1231        tokio::spawn(async move {
1232            while let Some(env) = rx.recv().await {
1233                if let Some(reply_tx) = env.reply_tx {
1234                    let _ = reply_tx.send(Ok(env.exchange));
1235                }
1236            }
1237        });
1238        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1239
1240        consumer.handle_poll_result(&pool, &ctx, &template).await;
1241
1242        assert!(
1243            logs_contain("ERROR"),
1244            "unbridged handle_poll_result failure MUST emit ERROR (consumer owns signal)"
1245        );
1246    }
1247
1248    #[tokio::test]
1249    async fn stream_list_consumer_emits_ndjson_body() {
1250        let pool = sqlite_pool().await;
1251        sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
1252            .execute(&pool)
1253            .await
1254            .expect("create table");
1255        sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')")
1256            .execute(&pool)
1257            .await
1258            .expect("seed rows");
1259
1260        let mut config = SqlEndpointConfig::from_uri(
1261            "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1262        )
1263        .unwrap();
1264        config.resolve_defaults();
1265
1266        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1267        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1268
1269        let (tx, rx) = mpsc::channel::<ExchangeEnvelope>(8);
1270        let (result_tx, result_rx) = tokio::sync::oneshot::channel::<Exchange>();
1271        tokio::spawn(async move {
1272            let mut rx = rx;
1273            if let Some(env) = rx.recv().await {
1274                if let Some(reply_tx) = env.reply_tx {
1275                    let _ = reply_tx.send(Ok(env.exchange.clone()));
1276                }
1277                let _ = result_tx.send(env.exchange);
1278            }
1279        });
1280        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "sql-test-route".to_string());
1281
1282        consumer
1283            .poll_database(&pool, &ctx, &template)
1284            .await
1285            .expect("poll must succeed");
1286
1287        let exchange = result_rx.await.expect("should have received one exchange");
1288
1289        match exchange.input.body {
1290            Body::Stream(ref stream_body) => {
1291                let stream = stream_body.stream.clone();
1292                let mut guard = stream.lock().await;
1293                let stream_opt = guard.take();
1294                assert!(stream_opt.is_some(), "stream should be present");
1295
1296                use futures::StreamExt;
1297                let mut collected = Vec::new();
1298                let mut stream = stream_opt.unwrap();
1299                while let Some(chunk) = stream.next().await {
1300                    let chunk = chunk.expect("stream chunk should not error");
1301                    collected.extend_from_slice(&chunk);
1302                }
1303
1304                let ndjson = String::from_utf8(collected).expect("valid utf8");
1305                let lines: Vec<&str> = ndjson.trim().lines().collect();
1306                assert_eq!(lines.len(), 3, "should have 3 NDJSON lines");
1307
1308                let row0: serde_json::Value =
1309                    serde_json::from_str(lines[0]).expect("valid json line 0");
1310                assert_eq!(row0["id"], 1);
1311                assert_eq!(row0["name"], "alpha");
1312
1313                let row1: serde_json::Value =
1314                    serde_json::from_str(lines[1]).expect("valid json line 1");
1315                assert_eq!(row1["id"], 2);
1316                assert_eq!(row1["name"], "beta");
1317
1318                let row2: serde_json::Value =
1319                    serde_json::from_str(lines[2]).expect("valid json line 2");
1320                assert_eq!(row2["id"], 3);
1321                assert_eq!(row2["name"], "gamma");
1322            }
1323            ref other => panic!("expected Body::Stream, got {:?}", other),
1324        }
1325    }
1326
1327    #[tokio::test]
1328    async fn stream_list_consumer_empty_result_set_emits_empty_stream() {
1329        let pool = sqlite_pool().await;
1330        sqlx::query("CREATE TABLE empty_items (id INTEGER PRIMARY KEY, name TEXT)")
1331            .execute(&pool)
1332            .await
1333            .expect("create table");
1334
1335        let mut config = SqlEndpointConfig::from_uri(
1336            "sql:select id, name from empty_items?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1337        )
1338        .unwrap();
1339        config.resolve_defaults();
1340
1341        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1342        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1343
1344        let (tx, rx) = tokio::sync::oneshot::channel();
1345        let (mpsc_tx, mut mpsc_rx) = mpsc::channel::<ExchangeEnvelope>(8);
1346        tokio::spawn(async move {
1347            while let Some(env) = mpsc_rx.recv().await {
1348                if let Some(reply_tx) = env.reply_tx {
1349                    let _ = reply_tx.send(Ok(env.exchange.clone()));
1350                }
1351                let _ = tx.send(env.exchange);
1352                break;
1353            }
1354        });
1355        let ctx = ConsumerContext::new(
1356            mpsc_tx,
1357            CancellationToken::new(),
1358            "sql-test-route".to_string(),
1359        );
1360
1361        consumer
1362            .poll_database(&pool, &ctx, &template)
1363            .await
1364            .expect("poll must succeed");
1365
1366        let exchange = rx
1367            .await
1368            .expect("StreamList should emit exchange even for empty results");
1369
1370        match exchange.input.body {
1371            Body::Stream(ref stream_body) => {
1372                let stream = stream_body.stream.clone();
1373                let mut guard = stream.lock().await;
1374                let stream_opt = guard.take();
1375
1376                use futures::StreamExt;
1377                let mut count = 0;
1378                if let Some(mut stream) = stream_opt {
1379                    while let Some(chunk) = stream.next().await {
1380                        let chunk = chunk.expect("stream chunk should not error");
1381                        count += chunk.len();
1382                    }
1383                }
1384                assert_eq!(count, 0, "empty table should produce zero stream bytes");
1385            }
1386            ref other => panic!("expected Body::Stream, got {:?}", other),
1387        }
1388    }
1389
1390    // ── ADR-0012 (g) regression tests ──────────────────────────────────
1391
1392    /// Fixture: captures `force_unhealthy_for_route` calls.
1393    #[derive(Debug, Default)]
1394    struct RecordingHealth {
1395        forced: Arc<Mutex<Vec<(String, String, String)>>>,
1396    }
1397
1398    impl HealthCheckRegistry for RecordingHealth {
1399        fn force_unhealthy_for_route(&self, route_id: &str, name: &str, reason: &str) {
1400            self.forced.lock().unwrap().push((
1401                route_id.to_string(),
1402                name.to_string(),
1403                reason.to_string(),
1404            ));
1405        }
1406    }
1407
1408    struct NoopMetricsForConsumer;
1409
1410    impl MetricsCollector for NoopMetricsForConsumer {
1411        fn record_exchange_duration(&self, _: &str, _: Duration) {}
1412        fn increment_errors(&self, _: &str, _: &str) {}
1413        fn increment_exchanges(&self, _: &str) {}
1414        fn set_queue_depth(&self, _: &str, _: usize) {}
1415        fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
1416    }
1417
1418    struct RecordingRuntimeWithHealth {
1419        health: Arc<RecordingHealth>,
1420    }
1421
1422    impl RuntimeObservability for RecordingRuntimeWithHealth {
1423        fn metrics(&self) -> Arc<dyn MetricsCollector> {
1424            Arc::new(NoopMetricsForConsumer)
1425        }
1426        fn health(&self) -> Arc<dyn HealthCheckRegistry> {
1427            self.health.clone()
1428        }
1429    }
1430
1431    /// Regression: consumer pool init failure calls force_unhealthy_for_route
1432    /// with correct route_id + name "g:sql:consumer-pool-init" + non-empty reason.
1433    #[tokio::test]
1434    async fn consumer_pool_init_failure_calls_force_unhealthy_for_route() {
1435        let health = Arc::new(RecordingHealth::default());
1436        let recorded_health = health.clone();
1437        let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntimeWithHealth { health });
1438
1439        let mut config = SqlEndpointConfig::from_uri(
1440            "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false&initialDelay=0&delay=1",
1441        )
1442        .unwrap();
1443        config.max_connections = Some(1);
1444        config.min_connections = Some(0);
1445        config.idle_timeout_secs = Some(300);
1446        config.max_lifetime_secs = Some(1800);
1447
1448        let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), rt);
1449
1450        let (tx, _rx) = mpsc::channel(8);
1451        let ctx = ConsumerContext::new(
1452            tx,
1453            CancellationToken::new(),
1454            "sql-consumer-test-route".to_string(),
1455        );
1456
1457        let result = consumer.start(ctx).await;
1458        assert!(result.is_err(), "pool init should fail with bad db_url");
1459
1460        let forced = recorded_health.forced.lock().unwrap();
1461        assert_eq!(
1462            forced.len(),
1463            1,
1464            "expected one force_unhealthy_for_route call"
1465        );
1466        assert_eq!(forced[0].0, "sql-consumer-test-route");
1467        assert_eq!(forced[0].1, "g:sql:consumer-pool-init");
1468        assert!(!forced[0].2.is_empty(), "reason should be non-empty");
1469    }
1470
1471    /// Regression: max_attempts=N → exactly N invocations (caught OpenSearch off-by-one 1f5c4c2a).
1472    /// Replicates the exact retry loop from SqlConsumer::start() (consumer.rs:343-367):
1473    ///   attempt starts at 0, incremented at top, should_retry(attempt), delay_for(attempt-1)
1474    #[tokio::test]
1475    async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
1476        use camel_component_api::NetworkRetryPolicy;
1477        use std::sync::Arc;
1478        use std::sync::atomic::{AtomicU32, Ordering};
1479
1480        let policy = NetworkRetryPolicy {
1481            max_attempts: 3,
1482            initial_delay: Duration::from_millis(1),
1483            max_delay: Duration::from_millis(1),
1484            multiplier: 1.0,
1485            ..NetworkRetryPolicy::default()
1486        };
1487
1488        let calls = Arc::new(AtomicU32::new(0));
1489        let calls_clone = Arc::clone(&calls);
1490
1491        let mut attempt: u32 = 0;
1492        let _result: Result<(), ()> = loop {
1493            attempt += 1;
1494            calls_clone.fetch_add(1, Ordering::SeqCst);
1495            let op_result: Result<(), ()> = Err(());
1496            match op_result {
1497                Ok(v) => break Ok(v),
1498                Err(_) if policy.should_retry(attempt) => {
1499                    let delay = policy.delay_for(attempt - 1);
1500                    tokio::time::sleep(delay).await;
1501                    continue;
1502                }
1503                Err(_) => break Err(()),
1504            }
1505        };
1506
1507        assert_eq!(
1508            calls.load(Ordering::SeqCst),
1509            3,
1510            "max_attempts=3 must yield exactly 3 invocations"
1511        );
1512    }
1513}