Skip to main content

camel_component_sql/
producer.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use bytes::Bytes;
9use camel_api::datasource::DatasourceCatalog;
10use serde_json::json;
11use sqlx::AnyPool;
12use sqlx::any::AnyRow;
13use sqlx::pool::PoolOptions;
14use tokio::sync::OnceCell;
15use tower::Service;
16use tracing::{debug, error, info, warn};
17
18use crate::config::{SqlEndpointConfig, SqlOutputType, enrich_db_url_with_ssl, redact_db_url};
19use crate::headers;
20use crate::query::{PreparedQuery, is_select_query, parse_query_template, resolve_params};
21use crate::utils::{bind_json_values, is_retryable_sqlx_error, row_to_json};
22use camel_component_api::retry_async;
23use camel_component_api::{
24    Body, CamelError, Exchange, Message, RuntimeObservability, StreamBody, StreamMetadata,
25};
26
27#[derive(Clone)]
28pub struct SqlProducer {
29    pub(crate) config: SqlEndpointConfig,
30    pub(crate) pool: Arc<OnceCell<Arc<AnyPool>>>,
31    pub(crate) catalog: Option<Arc<dyn DatasourceCatalog>>,
32    pub(crate) stopped: Arc<AtomicBool>,
33    pub(crate) runtime: Arc<dyn RuntimeObservability>,
34    /// Route identifier for ADR-0012 (g) health/observability calls.
35    pub(crate) route_id: String,
36}
37
38impl SqlProducer {
39    pub fn new(
40        config: SqlEndpointConfig,
41        pool: Arc<OnceCell<Arc<AnyPool>>>,
42        catalog: Option<Arc<dyn DatasourceCatalog>>,
43        runtime: Arc<dyn RuntimeObservability>,
44        route_id: impl Into<String>,
45    ) -> Self {
46        Self {
47            config,
48            pool,
49            catalog,
50            stopped: Arc::new(AtomicBool::new(false)),
51            runtime,
52            route_id: route_id.into(),
53        }
54    }
55
56    pub fn stop(&self) {
57        self.stopped.store(true, Ordering::Relaxed);
58        // Close the pool asynchronously in the background with a timeout
59        if let Some(pool) = self.pool.get() {
60            let pool = pool.clone();
61            tokio::spawn(async move {
62                if tokio::time::timeout(Duration::from_secs(5), pool.close())
63                    .await
64                    .is_err()
65                {
66                    tracing::warn!("SQL producer pool did not close within 5s");
67                }
68            });
69        }
70    }
71
72    /// Resolves the query source based on priority:
73    /// 1. Header `CamelSql.Query`
74    /// 2. Body (if `use_message_body_for_sql` is true)
75    /// 3. Config query
76    pub(crate) fn resolve_query_source(exchange: &Exchange, config: &SqlEndpointConfig) -> String {
77        // Priority 1: Header
78        if let Some(query_value) = exchange.input.header(headers::QUERY)
79            && let Some(query_str) = query_value.as_str()
80        {
81            return query_str.to_string();
82        }
83
84        // Priority 2: Body (if use_message_body_for_sql)
85        if config.use_message_body_for_sql
86            && let Some(body_text) = exchange.input.body.as_text()
87        {
88            return body_text.to_string();
89        }
90
91        // Priority 3: Config query
92        config.query.clone()
93    }
94
95    /// Health check: runs a simple `SELECT 1` query against the connection pool
96    /// to verify database connectivity.
97    ///
98    /// Returns `Ok(())` if the database is reachable, or an error with details
99    /// if the connection fails.
100    pub async fn check_connection(&self) -> Result<(), CamelError> {
101        let pool =
102            if let (Some(catalog), Some(ds_name)) = (&self.catalog, &self.config.datasource_name) {
103                let handle = catalog.get_pool(ds_name).await?;
104                let arc_pool: Arc<AnyPool> = handle.downcast()?;
105                // We don't cache this here — check_connection is a one-shot call
106                arc_pool
107            } else {
108                self.pool
109                    .get()
110                    .ok_or_else(|| {
111                        CamelError::ProcessorError("SQL connection pool not initialized".into())
112                    })?
113                    .clone()
114            };
115
116        debug!("Running health check: SELECT 1");
117        sqlx::query("SELECT 1").execute(&*pool).await.map_err(|e| {
118            warn!(error = %e, "SQL health check failed");
119            CamelError::ProcessorError(format!("SQL health check failed: {}", e))
120        })?;
121
122        debug!("SQL health check passed");
123        Ok(())
124    }
125}
126
127impl Service<Exchange> for SqlProducer {
128    type Response = Exchange;
129    type Error = CamelError;
130    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
131
132    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
133        if self.stopped.load(Ordering::Relaxed) {
134            return Poll::Ready(Err(CamelError::ProcessorError(
135                "SQL producer stopped".into(),
136            )));
137        }
138        if let Some(pool) = self.pool.get()
139            && pool.is_closed()
140        {
141            return Poll::Ready(Err(CamelError::ProcessorError(
142                "SQL connection pool is closed".into(),
143            )));
144        }
145        Poll::Ready(Ok(()))
146    }
147
148    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
149        let mut config = self.config.clone();
150        let pool_cell = Arc::clone(&self.pool);
151        let catalog = self.catalog.clone();
152        let runtime = Arc::clone(&self.runtime);
153        let route_id = self.route_id.clone();
154
155        Box::pin(async move {
156            // SQL-014: resolve file-based query before pool init, regardless of pool source
157            config.resolve_defaults();
158            config.resolve_file_query().await?;
159
160            // Get or initialize the connection pool
161            let pool: &Arc<AnyPool> = pool_cell
162                .get_or_try_init(|| async {
163                    // Catalog path: resolve shared pool from the datasource catalog
164                    let ds_name = config.datasource_name.clone();
165                    if let (Some(ref cat), Some(ref name)) = (catalog, ds_name) {
166                        let handle = cat.get_pool(name).await?;
167                        return handle.downcast::<AnyPool>();
168                    }
169
170                    let db_url = enrich_db_url_with_ssl(&config.db_url, &config)?;
171
172                    // Install all compiled-in sqlx drivers so AnyPool can resolve them.
173                    // This is idempotent; safe to call multiple times.
174                    sqlx::any::install_default_drivers();
175
176                    let max_conn = config.max_connections.ok_or_else(|| {
177                        CamelError::Config("max_connections not resolved for SQL pool".into())
178                    })?;
179                    let min_conn = config.min_connections.ok_or_else(|| {
180                        CamelError::Config("min_connections not resolved for SQL pool".into())
181                    })?;
182                    let idle_timeout = config.idle_timeout_secs.ok_or_else(|| {
183                        CamelError::Config("idle_timeout_secs not resolved for SQL pool".into())
184                    })?;
185                    let max_lifetime = config.max_lifetime_secs.ok_or_else(|| {
186                        CamelError::Config("max_lifetime_secs not resolved for SQL pool".into())
187                    })?;
188
189                    info!(
190                        db_url = %redact_db_url(&config.db_url),
191                        "SQL producer pool initializing"
192                    );
193                    let retry_policy = &config.retry;
194                    let pool = retry_async::<_, _, _, _, sqlx::Error>(
195                        retry_policy,
196                        Some("sql-producer"),
197                        || {
198                            async {
199                                PoolOptions::new()
200                                    .max_connections(max_conn)
201                                    .min_connections(min_conn)
202                                    .idle_timeout(Duration::from_secs(idle_timeout))
203                                    .max_lifetime(Duration::from_secs(max_lifetime))
204                                    .connect(&db_url)
205                                    .await
206                            }
207                        },
208                        is_retryable_sqlx_error,
209                    )
210                    .await
211                    .map_err(|e| {
212                        runtime.health().force_unhealthy_for_route(
213                            &route_id,
214                            "g:sql:producer-pool-init",
215                            &e.to_string(),
216                        );
217                        // log-policy: outside-contract
218                        error!(error = %e, db_url = %redact_db_url(&config.db_url), "Failed to connect to database");
219                        CamelError::EndpointCreationFailed(format!(
220                            "Failed to connect to database: {}",
221                            e
222                        ))
223                    })?;
224                    Ok(Arc::new(pool))
225                })
226                .await
227                .map_err(|e: CamelError| e.clone())?;
228
229            // Resolve query string
230            let query_str = Self::resolve_query_source(&exchange, &config);
231
232            // SQL-002: warn if Managed transaction mode requested
233            if config.transaction_mode == crate::config::TransactionMode::Managed {
234                warn!("transactionManager not yet implemented; using Auto mode");
235            }
236
237            debug!(
238                query = %query_str,
239                "executing SQL query"
240            );
241
242            // Execute based on mode
243            if config.batch {
244                // Batch mode: execute_batch handles its own template parsing per item
245                execute_batch(pool.as_ref(), &config, &mut exchange).await?;
246            } else if config.use_placeholder {
247                // Placeholder processing enabled (default): parse template, resolve params, apply header override
248                let template = parse_query_template(&query_str, config.placeholder)?;
249                let mut prepared = resolve_params(&template, &exchange, &config.in_separator)?;
250
251                // CamelSql.Parameters header override
252                if let Some(params_value) = exchange.input.header(headers::PARAMETERS) {
253                    if let Some(arr) = params_value.as_array() {
254                        if arr.len() != prepared.bindings.len() {
255                            warn!(
256                                expected = prepared.bindings.len(),
257                                got = arr.len(),
258                                header = headers::PARAMETERS,
259                                "Parameter count mismatch — SQL has {} placeholders but header provides {} values",
260                                prepared.bindings.len(),
261                                arr.len()
262                            );
263                        }
264                        debug!(
265                            "Overriding bindings from {} header with {} parameters",
266                            headers::PARAMETERS,
267                            arr.len()
268                        );
269                        prepared.bindings = arr.clone();
270                    } else {
271                        warn!(
272                            header = headers::PARAMETERS,
273                            "Header is present but not a JSON array — ignoring parameter override"
274                        );
275                    }
276                }
277
278                debug!(
279                    "Executing prepared SQL ({} bindings)",
280                    prepared.bindings.len()
281                );
282
283                if is_select_query(&prepared.sql) {
284                    execute_select(pool.as_ref(), &prepared, &config, &mut exchange).await?;
285                } else {
286                    execute_modify(pool.as_ref(), &prepared, &config, &mut exchange).await?;
287                }
288            } else {
289                // use_placeholder=false: execute query as-is without template parsing
290                debug!("Executing raw SQL (placeholder processing disabled)");
291                let prepared = PreparedQuery {
292                    sql: query_str,
293                    bindings: vec![],
294                };
295
296                if is_select_query(&prepared.sql) {
297                    execute_select(pool.as_ref(), &prepared, &config, &mut exchange).await?;
298                } else {
299                    execute_modify(pool.as_ref(), &prepared, &config, &mut exchange).await?;
300                }
301            }
302
303            Ok(exchange)
304        })
305    }
306}
307
308/// Executes a SELECT query and populates the exchange body with results.
309async fn execute_select(
310    pool: &AnyPool,
311    prepared: &PreparedQuery,
312    config: &SqlEndpointConfig,
313    exchange: &mut Exchange,
314) -> Result<(), CamelError> {
315    match config.output_type {
316        SqlOutputType::SelectOne => {
317            // fetch_all and take first row
318            let mut query = sqlx::query(&prepared.sql);
319            query = bind_json_values(query, &prepared.bindings);
320
321            let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
322                warn!(error = %e, "SQL query failed");
323                CamelError::ProcessorError(format!("Query execution failed: {}", e))
324            })?;
325
326            let count = rows.len();
327            debug!(rows = count, "SQL query completed");
328            let json_rows: Vec<serde_json::Value> = rows
329                .iter()
330                .map(row_to_json)
331                .collect::<Result<Vec<_>, _>>()?;
332
333            if let Some(first_row) = json_rows.into_iter().next() {
334                exchange.input.body = Body::Json(first_row);
335            } else {
336                exchange.input.body = Body::Empty;
337            }
338            debug!("SelectOne returned {} row", if count > 0 { 1 } else { 0 });
339            exchange
340                .input
341                .set_header(headers::ROW_COUNT, serde_json::json!(count));
342        }
343        SqlOutputType::SelectList => {
344            // fetch_all for list output
345            let mut query = sqlx::query(&prepared.sql);
346            query = bind_json_values(query, &prepared.bindings);
347
348            let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
349                warn!(error = %e, "SQL query failed");
350                CamelError::ProcessorError(format!("Query execution failed: {}", e))
351            })?;
352
353            let count = rows.len();
354            debug!(rows = count, "SQL query completed");
355            let json_rows: Vec<serde_json::Value> = rows
356                .iter()
357                .map(row_to_json)
358                .collect::<Result<Vec<_>, _>>()?;
359
360            exchange.input.body = Body::Json(serde_json::Value::Array(json_rows));
361            debug!("SelectList returned {} rows", count);
362            exchange
363                .input
364                .set_header(headers::ROW_COUNT, serde_json::json!(count));
365        }
366        SqlOutputType::StreamList => {
367            // Use fetch() for true streaming - avoids loading all rows into memory
368            use futures::TryStreamExt;
369
370            let pool_clone = pool.clone();
371            let sql_str = prepared.sql.clone();
372            let bindings = prepared.bindings.clone();
373
374            // Build the stream that reads rows on demand and serializes to NDJSON bytes
375            let byte_stream = async_stream::try_stream! {
376                let mut q = sqlx::query(&sql_str);
377                q = bind_json_values(q, &bindings);
378                let mut rows = q.fetch(&pool_clone);
379                while let Some(row) = rows.try_next().await.map_err(|e| {
380                    CamelError::ProcessorError(format!("Query execution failed: {}", e))
381                })? {
382                    let json_val = row_to_json(&row).map_err(|e| {
383                        CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
384                    })?;
385                    let mut bytes = serde_json::to_vec(&json_val)
386                        .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
387                    bytes.push(b'\n');
388                    yield Bytes::from(bytes);
389                }
390            };
391
392            exchange.input.body = Body::Stream(StreamBody {
393                stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
394                metadata: StreamMetadata {
395                    content_type: Some("application/x-ndjson".to_string()),
396                    size_hint: None,
397                    origin: None,
398                },
399            });
400            debug!("StreamList: created lazy stream (rows fetched on demand)");
401            // Note: ROW_COUNT not set for StreamList since row count is unknown until exhausted
402        }
403    }
404
405    Ok(())
406}
407
408/// Executes a modification query (INSERT/UPDATE/DELETE).
409async fn execute_modify(
410    pool: &AnyPool,
411    prepared: &PreparedQuery,
412    config: &SqlEndpointConfig,
413    exchange: &mut Exchange,
414) -> Result<(), CamelError> {
415    let mut query = sqlx::query(&prepared.sql);
416    query = bind_json_values(query, &prepared.bindings);
417
418    let result = query.execute(pool).await.map_err(|e| {
419        warn!(error = %e, "SQL query failed");
420        CamelError::ProcessorError(format!("Query execution failed: {}", e))
421    })?;
422
423    let rows_affected = result.rows_affected();
424
425    // Fix 4: Implement expected_update_count validation
426    if let Some(expected) = config.expected_update_count
427        && rows_affected as i64 != expected
428    {
429        warn!(expected, actual = rows_affected, "Row count mismatch");
430        return Err(CamelError::ProcessorError(format!(
431            "Expected {} rows affected, got {}",
432            expected, rows_affected
433        )));
434    }
435
436    exchange
437        .input
438        .set_header(headers::UPDATE_COUNT, serde_json::json!(rows_affected));
439
440    if config.noop {
441        // Preserve original body
442    } else {
443        exchange.input.body = Body::Json(json!({ "rowsAffected": rows_affected }));
444    }
445
446    debug!(rows = rows_affected, "SQL modify query completed");
447
448    Ok(())
449}
450
451/// Executes a batch of queries from a JSON array body.
452async fn execute_batch(
453    pool: &AnyPool,
454    config: &SqlEndpointConfig,
455    exchange: &mut Exchange,
456) -> Result<(), CamelError> {
457    // Body must be JSON array of arrays
458    let body_json = match &exchange.input.body {
459        Body::Json(val) => val,
460        _ => {
461            return Err(CamelError::ProcessorError(
462                "Batch mode requires body to be a JSON array of arrays".to_string(),
463            ));
464        }
465    };
466
467    let batch_data = body_json
468        .as_array()
469        .ok_or_else(|| {
470            CamelError::ProcessorError("Batch mode requires body to be a JSON array".to_string())
471        })?
472        .clone();
473
474    // Parse template from config query
475    let template = parse_query_template(&config.query, config.placeholder)?;
476
477    // Fix 2: Batch operations must be wrapped in a transaction
478    let mut tx = pool.begin().await.map_err(|e| {
479        // log-policy: handler-owned
480        // (category a: tx begin failure inside producer pipeline)
481        warn!(error = %e, "Failed to begin transaction");
482        CamelError::ProcessorError(format!("Failed to begin transaction: {}", e))
483    })?;
484
485    let mut total_rows_affected: u64 = 0;
486
487    for (batch_idx, params_array) in batch_data.into_iter().enumerate() {
488        // Each item must be an array of parameters
489        params_array.as_array().ok_or_else(|| {
490            CamelError::ProcessorError(format!(
491                "Batch item at index {} must be a JSON array of parameters",
492                batch_idx
493            ))
494        })?;
495
496        // Create a temporary exchange with the params as body for resolution
497        let temp_msg = Message::new(Body::Json(params_array.clone()));
498        let temp_exchange = Exchange::new(temp_msg);
499
500        // Resolve parameters for this batch item
501        let prepared = resolve_params(&template, &temp_exchange, &config.in_separator)?;
502
503        // Execute against transaction
504        let mut query = sqlx::query(&prepared.sql);
505        query = bind_json_values(query, &prepared.bindings);
506
507        let result = query.execute(&mut *tx).await.map_err(|e| {
508            // log-policy: handler-owned
509            // (category a: batch query execution failure inside producer pipeline)
510            warn!("Batch query execution failed at index {}: {}", batch_idx, e);
511            CamelError::ProcessorError(format!("Batch query execution failed: {}", e))
512        })?;
513
514        // Validate expected_update_count per batch item
515        if let Some(expected) = config.expected_update_count
516            && result.rows_affected() as i64 != expected
517        {
518            // log-policy: handler-owned
519            // (category a: expected_update_count mismatch inside producer pipeline)
520            warn!(
521                "Batch item {}: expected {} rows affected, got {}",
522                batch_idx,
523                expected,
524                result.rows_affected()
525            );
526            return Err(CamelError::ProcessorError(format!(
527                "Batch item {}: expected {} rows affected, got {}",
528                batch_idx,
529                expected,
530                result.rows_affected()
531            )));
532        }
533
534        total_rows_affected += result.rows_affected();
535    }
536
537    // Commit the transaction
538    tx.commit().await.map_err(|e| {
539        // log-policy: handler-owned
540        // (category a: tx commit failure inside producer pipeline)
541        warn!(error = %e, "Failed to commit transaction");
542        CamelError::ProcessorError(format!("Failed to commit transaction: {}", e))
543    })?;
544
545    exchange.input.set_header(
546        headers::UPDATE_COUNT,
547        serde_json::json!(total_rows_affected),
548    );
549
550    debug!(
551        "Batch execution completed, total rows affected: {}",
552        total_rows_affected
553    );
554
555    Ok(())
556}
557
558#[cfg(test)]
559mod tests {
560    use super::*;
561    use camel_api::MetricsCollector;
562    use camel_component_api::HealthCheckRegistry;
563    use camel_component_api::test_support::PanicRuntimeObservability;
564    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
565        std::sync::Arc::new(PanicRuntimeObservability)
566    }
567    use camel_component_api::Message;
568    use camel_component_api::UriConfig;
569    use sqlx::any::AnyPoolOptions;
570    use std::sync::Arc;
571    use std::sync::Mutex;
572    use tokio::sync::OnceCell;
573
574    // ── Recording health fixture for ADR-0012 (g) tests ───────────────────
575
576    #[derive(Debug, Default)]
577    struct RecordingHealth {
578        forced: Mutex<Vec<(String, String, String)>>,
579    }
580
581    impl HealthCheckRegistry for RecordingHealth {
582        fn force_unhealthy_for_route(&self, route_id: &str, name: &str, reason: &str) {
583            self.forced.lock().unwrap().push((
584                route_id.to_string(),
585                name.to_string(),
586                reason.to_string(),
587            ));
588        }
589    }
590
591    struct NoopMetrics;
592
593    impl MetricsCollector for NoopMetrics {
594        fn record_exchange_duration(&self, _: &str, _: Duration) {}
595        fn increment_errors(&self, _: &str, _: &str) {}
596        fn increment_exchanges(&self, _: &str) {}
597        fn set_queue_depth(&self, _: &str, _: usize) {}
598        fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
599    }
600
601    struct RecordingRuntime {
602        health: Arc<RecordingHealth>,
603    }
604
605    impl RuntimeObservability for RecordingRuntime {
606        fn metrics(&self) -> Arc<dyn MetricsCollector> {
607            Arc::new(NoopMetrics)
608        }
609        fn health(&self) -> Arc<dyn HealthCheckRegistry> {
610            self.health.clone()
611        }
612    }
613
614    async fn sqlite_pool() -> AnyPool {
615        sqlx::any::install_default_drivers();
616        AnyPoolOptions::new()
617            .max_connections(1)
618            .connect("sqlite::memory:")
619            .await
620            .expect("sqlite pool")
621    }
622
623    async fn seed_items_table(pool: &AnyPool) {
624        sqlx::query(
625            "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, done INTEGER DEFAULT 0)",
626        )
627        .execute(pool)
628        .await
629        .expect("create table");
630        sqlx::query("INSERT INTO items (id, name, done) VALUES (1, 'a', 0), (2, 'b', 0)")
631            .execute(pool)
632            .await
633            .expect("seed rows");
634    }
635
636    fn config() -> SqlEndpointConfig {
637        let mut c =
638            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
639        c.resolve_defaults();
640        c
641    }
642
643    #[test]
644    fn producer_clone_shares_pool() {
645        let p1 = SqlProducer::new(
646            config(),
647            Arc::new(OnceCell::new()),
648            None,
649            test_rt(),
650            "sql-producer-test-route",
651        );
652        let p2 = p1.clone();
653        assert!(Arc::ptr_eq(&p1.pool, &p2.pool));
654        assert!(Arc::ptr_eq(&p1.stopped, &p2.stopped));
655    }
656
657    #[test]
658    fn resolve_query_from_config() {
659        let config = config();
660        let ex = Exchange::new(Message::default());
661        let q = SqlProducer::resolve_query_source(&ex, &config);
662        assert_eq!(q, "select 1");
663    }
664
665    #[test]
666    fn resolve_query_from_header() {
667        let config = config();
668        let mut msg = Message::default();
669        msg.set_header(headers::QUERY, serde_json::json!("select 2"));
670        let ex = Exchange::new(msg);
671        let q = SqlProducer::resolve_query_source(&ex, &config);
672        assert_eq!(q, "select 2");
673    }
674
675    #[test]
676    fn resolve_query_from_body() {
677        let mut config = config();
678        config.use_message_body_for_sql = true;
679        let msg = Message::new(Body::Text("select 3".to_string()));
680        let ex = Exchange::new(msg);
681        let q = SqlProducer::resolve_query_source(&ex, &config);
682        assert_eq!(q, "select 3");
683    }
684
685    #[test]
686    fn resolve_query_header_priority_over_body() {
687        let mut config = config();
688        config.use_message_body_for_sql = true;
689        let mut msg = Message::new(Body::Text("select from body".to_string()));
690        msg.set_header(headers::QUERY, serde_json::json!("select from header"));
691        let ex = Exchange::new(msg);
692        let q = SqlProducer::resolve_query_source(&ex, &config);
693        assert_eq!(q, "select from header");
694    }
695
696    #[test]
697    fn resolve_query_body_priority_over_config() {
698        let mut config = config();
699        config.use_message_body_for_sql = true;
700        let msg = Message::new(Body::Text("select from body".to_string()));
701        let ex = Exchange::new(msg);
702        let q = SqlProducer::resolve_query_source(&ex, &config);
703        assert_eq!(q, "select from body");
704    }
705
706    #[test]
707    fn bind_json_null() {
708        let query = sqlx::query("SELECT ?");
709        let values = vec![serde_json::Value::Null];
710        let _bound = bind_json_values(query, &values);
711        // Compilation test - ensure it binds
712    }
713
714    #[test]
715    fn bind_json_bool() {
716        let query = sqlx::query("SELECT ?");
717        let values = vec![serde_json::Value::Bool(true)];
718        let _bound = bind_json_values(query, &values);
719    }
720
721    #[test]
722    fn bind_json_number_i64() {
723        let query = sqlx::query("SELECT ?");
724        let values = vec![serde_json::json!(42)];
725        let _bound = bind_json_values(query, &values);
726    }
727
728    #[test]
729    fn bind_json_number_f64() {
730        let query = sqlx::query("SELECT ?");
731        let values = vec![serde_json::json!(std::f64::consts::PI)];
732        let _bound = bind_json_values(query, &values);
733    }
734
735    #[test]
736    fn bind_json_string() {
737        let query = sqlx::query("SELECT ?");
738        let values = vec![serde_json::json!("hello world")];
739        let _bound = bind_json_values(query, &values);
740    }
741
742    #[test]
743    fn bind_json_array() {
744        let query = sqlx::query("SELECT ?");
745        let values = vec![serde_json::json!([1, 2, 3])];
746        let _bound = bind_json_values(query, &values);
747    }
748
749    #[test]
750    fn bind_json_object() {
751        let query = sqlx::query("SELECT ?");
752        let values = vec![serde_json::json!({"key": "value"})];
753        let _bound = bind_json_values(query, &values);
754    }
755
756    #[test]
757    fn bind_multiple_values() {
758        let query = sqlx::query("SELECT ?, ?, ?");
759        let values = vec![
760            serde_json::json!(1),
761            serde_json::json!("test"),
762            serde_json::Value::Null,
763        ];
764        let _bound = bind_json_values(query, &values);
765    }
766
767    // Test for Fix 4: expected_update_count config field presence
768    #[test]
769    fn expected_update_count_validation() {
770        // Test that expected_update_count is parsed from URI
771        let config = SqlEndpointConfig::from_uri(
772            "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=5",
773        )
774        .unwrap();
775        assert_eq!(config.expected_update_count, Some(5));
776
777        // Test default (no expected_update_count)
778        let config_default = self::config();
779        assert_eq!(config_default.expected_update_count, None);
780
781        // Test negative value (should parse)
782        let config_neg = SqlEndpointConfig::from_uri(
783            "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=-1",
784        )
785        .unwrap();
786        assert_eq!(config_neg.expected_update_count, Some(-1));
787    }
788
789    // Test for Fix 3: parameters header override logic
790    #[test]
791    fn parameters_header_override_logic() {
792        // Create a PreparedQuery manually
793        let mut prepared = PreparedQuery {
794            sql: "SELECT * FROM t WHERE id = $1".to_string(),
795            bindings: vec![serde_json::json!(42)],
796        };
797
798        // Simulate the header override logic
799        let header_params = serde_json::json!([99, "extra"]);
800        if let Some(arr) = header_params.as_array() {
801            prepared.bindings = arr.clone();
802        }
803
804        // Verify bindings were overridden
805        assert_eq!(prepared.bindings.len(), 2);
806        assert_eq!(prepared.bindings[0], serde_json::json!(99));
807        assert_eq!(prepared.bindings[1], serde_json::json!("extra"));
808
809        // Test with non-array header (should not override)
810        let mut prepared2 = PreparedQuery {
811            sql: "SELECT * FROM t WHERE id = $1".to_string(),
812            bindings: vec![serde_json::json!(42)],
813        };
814        let header_non_array = serde_json::json!({"not": "an array"});
815        if let Some(arr) = header_non_array.as_array() {
816            prepared2.bindings = arr.clone();
817        }
818        // Should remain unchanged
819        assert_eq!(prepared2.bindings.len(), 1);
820        assert_eq!(prepared2.bindings[0], serde_json::json!(42));
821    }
822
823    #[tokio::test]
824    async fn execute_select_one_sets_body_and_row_count() {
825        let pool = sqlite_pool().await;
826        seed_items_table(&pool).await;
827
828        let mut config = SqlEndpointConfig::from_uri(
829            "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=SelectOne",
830        )
831        .unwrap();
832        config.resolve_defaults();
833
834        let prepared = PreparedQuery {
835            sql: "select id, name from items order by id".to_string(),
836            bindings: vec![],
837        };
838        let mut exchange = Exchange::new(Message::default());
839
840        execute_select(&pool, &prepared, &config, &mut exchange)
841            .await
842            .expect("select one");
843
844        assert_eq!(exchange.input.header(headers::ROW_COUNT), Some(&json!(2)));
845        assert_eq!(
846            exchange.input.body,
847            Body::Json(json!({"id": 1, "name": "a"}))
848        );
849    }
850
851    #[tokio::test]
852    async fn execute_stream_list_materializes_ndjson() {
853        let pool = sqlite_pool().await;
854        seed_items_table(&pool).await;
855
856        let mut config = SqlEndpointConfig::from_uri(
857            "sql:select id from items order by id?db_url=sqlite::memory:&outputType=StreamList",
858        )
859        .unwrap();
860        config.resolve_defaults();
861
862        let prepared = PreparedQuery {
863            sql: "select id from items order by id".to_string(),
864            bindings: vec![],
865        };
866        let mut exchange = Exchange::new(Message::default());
867
868        execute_select(&pool, &prepared, &config, &mut exchange)
869            .await
870            .expect("stream list");
871
872        let bytes = exchange
873            .input
874            .body
875            .clone()
876            .into_bytes(1024)
877            .await
878            .expect("stream bytes");
879        let text = String::from_utf8(bytes.to_vec()).expect("utf8");
880        assert!(text.contains("{\"id\":1}"));
881        assert!(text.contains("{\"id\":2}"));
882        assert_eq!(exchange.input.header(headers::ROW_COUNT), None);
883    }
884
885    #[tokio::test]
886    async fn execute_modify_expected_update_count_mismatch_returns_error() {
887        let pool = sqlite_pool().await;
888        seed_items_table(&pool).await;
889
890        let mut config = SqlEndpointConfig::from_uri(
891            "sql:update items set done=1 where id = #?db_url=sqlite::memory:&expectedUpdateCount=2",
892        )
893        .unwrap();
894        config.resolve_defaults();
895
896        let prepared = PreparedQuery {
897            sql: "update items set done=1 where id = $1".to_string(),
898            bindings: vec![json!(1)],
899        };
900        let mut exchange = Exchange::new(Message::default());
901
902        let err = execute_modify(&pool, &prepared, &config, &mut exchange)
903            .await
904            .expect_err("must fail due expected row count mismatch");
905        assert!(err.to_string().contains("Expected 2 rows affected, got 1"));
906    }
907
908    #[tokio::test]
909    async fn execute_batch_rollback_when_any_item_fails_expected_count() {
910        let pool = sqlite_pool().await;
911        seed_items_table(&pool).await;
912
913        let mut config = SqlEndpointConfig::from_uri(
914            "sql:update items set done=1 where id = #?db_url=sqlite::memory:&batch=true&expectedUpdateCount=1",
915        )
916        .unwrap();
917        config.resolve_defaults();
918
919        let mut exchange = Exchange::new(Message::new(Body::Json(json!([[1], [999]]))));
920
921        let err = execute_batch(&pool, &config, &mut exchange)
922            .await
923            .expect_err("second batch item should fail expectedUpdateCount");
924        assert!(
925            err.to_string()
926                .contains("Batch item 1: expected 1 rows affected, got 0")
927        );
928
929        let row = sqlx::query("select done from items where id = 1")
930            .fetch_one(&pool)
931            .await
932            .expect("query row");
933        let done: i64 = sqlx::Row::try_get(&row, 0).expect("done column");
934        assert_eq!(done, 0, "transaction must rollback first update");
935    }
936
937    // --- Phase B hardening tests ---
938
939    // SQL-001: Direct producer construction without resolve_defaults does not panic.
940    // The producer defensively calls resolve_defaults() during pool init, so the pool
941    // fields get resolved. This test verifies no panic occurs and the pool initializes.
942    #[tokio::test]
943    async fn producer_no_panic_without_prior_resolve_defaults() {
944        // Create config without calling resolve_defaults() — pool fields are None
945        let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
946        assert!(config.max_connections.is_none());
947
948        let mut producer = SqlProducer::new(
949            config,
950            Arc::new(OnceCell::new()),
951            None,
952            test_rt(),
953            "sql-producer-test-route",
954        );
955        let exchange = Exchange::new(Message::default());
956
957        // Should NOT panic — producer calls resolve_defaults() defensively
958        let result = producer.call(exchange).await;
959        assert!(
960            result.is_ok(),
961            "Producer should initialize pool without panic, got: {:?}",
962            result
963        );
964    }
965
966    // SQL-001: Pool init returns CamelError::Config when pool params cannot be resolved
967    #[tokio::test]
968    async fn producer_pool_init_returns_config_error_for_invalid_db() {
969        // Create config with an invalid db_url that will fail to connect.
970        // Disable retry for this test — the purpose is to verify error handling
971        // on a known-bad connect, not to exercise the retry loop.
972        let mut config = SqlEndpointConfig::from_uri(
973            "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false",
974        )
975        .unwrap();
976        // Set pool params explicitly so resolve_defaults doesn't help with connection
977        config.max_connections = Some(1);
978        config.min_connections = Some(0);
979        config.idle_timeout_secs = Some(300);
980        config.max_lifetime_secs = Some(1800);
981
982        let health = Arc::new(RecordingHealth::default());
983        let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntime {
984            health: health.clone(),
985        });
986        let mut producer = SqlProducer::new(
987            config,
988            Arc::new(OnceCell::new()),
989            None,
990            rt,
991            "sql-producer-test-route",
992        );
993        let exchange = Exchange::new(Message::default());
994
995        let result = producer.call(exchange).await;
996        assert!(result.is_err());
997        // Error should be EndpointCreationFailed (connection error), not a panic
998        let err_msg = result.unwrap_err().to_string();
999        assert!(
1000            err_msg.contains("Failed to connect") || err_msg.contains("database"),
1001            "Expected connection error, got: {}",
1002            err_msg
1003        );
1004        // Verify force_unhealthy_for_route was also called
1005        let forced = health.forced.lock().unwrap();
1006        assert_eq!(
1007            forced.len(),
1008            1,
1009            "expected one force_unhealthy_for_route call"
1010        );
1011        assert_eq!(forced[0].0, "sql-producer-test-route");
1012        assert_eq!(forced[0].1, "g:sql:producer-pool-init");
1013    }
1014
1015    // SQL-007: poll_ready returns Ready (pool lazily initialized on first call)
1016    #[test]
1017    fn poll_ready_returns_ready_for_uninitialized_pool() {
1018        let config = {
1019            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1020            c.resolve_defaults();
1021            c
1022        };
1023        let mut producer = SqlProducer::new(
1024            config,
1025            Arc::new(OnceCell::new()),
1026            None,
1027            test_rt(),
1028            "sql-producer-test-route",
1029        );
1030        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1031        let result = producer.poll_ready(&mut cx);
1032        assert!(matches!(result, Poll::Ready(Ok(()))));
1033    }
1034
1035    // SQL-007: poll_ready returns error when stopped flag is set
1036    #[test]
1037    fn poll_ready_returns_error_when_stopped() {
1038        let config = {
1039            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1040            c.resolve_defaults();
1041            c
1042        };
1043        let mut producer = SqlProducer::new(
1044            config,
1045            Arc::new(OnceCell::new()),
1046            None,
1047            test_rt(),
1048            "sql-producer-test-route",
1049        );
1050        producer.stop();
1051        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1052        let result = producer.poll_ready(&mut cx);
1053        assert!(matches!(result, Poll::Ready(Err(_))));
1054        let err_msg = match result {
1055            Poll::Ready(Err(e)) => e.to_string(),
1056            _ => unreachable!(),
1057        };
1058        assert!(err_msg.contains("SQL producer stopped"));
1059    }
1060
1061    // SQL-007: poll_ready returns error when pool is closed
1062    #[tokio::test]
1063    async fn poll_ready_returns_error_when_pool_closed() {
1064        let pool = sqlite_pool().await;
1065        pool.close().await;
1066
1067        let config = {
1068            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1069            c.resolve_defaults();
1070            c
1071        };
1072        let pool_cell = Arc::new(OnceCell::new());
1073        pool_cell.set(Arc::new(pool)).unwrap();
1074
1075        let mut producer = SqlProducer::new(
1076            config,
1077            pool_cell,
1078            None,
1079            test_rt(),
1080            "sql-producer-test-route",
1081        );
1082        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1083        let result = producer.poll_ready(&mut cx);
1084        assert!(matches!(result, Poll::Ready(Err(_))));
1085        let err_msg = match result {
1086            Poll::Ready(Err(e)) => e.to_string(),
1087            _ => unreachable!(),
1088        };
1089        assert!(err_msg.contains("SQL connection pool is closed"));
1090    }
1091
1092    // SQL-007: poll_ready returns Ok for healthy initialized pool
1093    #[tokio::test]
1094    async fn poll_ready_returns_ok_for_healthy_pool() {
1095        let pool = sqlite_pool().await;
1096
1097        let config = {
1098            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1099            c.resolve_defaults();
1100            c
1101        };
1102        let pool_cell = Arc::new(OnceCell::new());
1103        pool_cell.set(Arc::new(pool)).unwrap();
1104
1105        let mut producer = SqlProducer::new(
1106            config,
1107            pool_cell,
1108            None,
1109            test_rt(),
1110            "sql-producer-test-route",
1111        );
1112        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1113        let result = producer.poll_ready(&mut cx);
1114        assert!(matches!(result, Poll::Ready(Ok(()))));
1115    }
1116
1117    // SQL-008: stop() closes the pool
1118    #[tokio::test]
1119    async fn test_sql_stop_closes_pool() {
1120        let pool = sqlite_pool().await;
1121
1122        let config = {
1123            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1124            c.resolve_defaults();
1125            c
1126        };
1127        let pool_cell = Arc::new(OnceCell::new());
1128        pool_cell.set(Arc::new(pool.clone())).unwrap();
1129
1130        let producer = SqlProducer::new(
1131            config,
1132            pool_cell.clone(),
1133            None,
1134            test_rt(),
1135            "sql-producer-test-route",
1136        );
1137        assert!(!pool.is_closed(), "Pool should be open before stop");
1138
1139        producer.stop();
1140
1141        // Allow background close task to complete
1142        tokio::time::sleep(Duration::from_millis(100)).await;
1143
1144        assert!(
1145            pool.is_closed(),
1146            "Pool should be closed after producer.stop()"
1147        );
1148
1149        // Subsequent poll_ready should fail
1150        let mut producer2 = SqlProducer::new(
1151            {
1152                let mut c =
1153                    SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1154                c.resolve_defaults();
1155                c
1156            },
1157            pool_cell.clone(),
1158            None,
1159            test_rt(),
1160            "sql-producer-test-route",
1161        );
1162        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1163        let result = producer2.poll_ready(&mut cx);
1164        assert!(
1165            matches!(result, Poll::Ready(Err(_))),
1166            "poll_ready should fail after pool closed"
1167        );
1168    }
1169
1170    // SQL-004: use_placeholder=false skips template parsing
1171    #[tokio::test]
1172    async fn use_placeholder_false_executes_raw_sql() {
1173        let pool = sqlite_pool().await;
1174        seed_items_table(&pool).await;
1175
1176        let mut config = SqlEndpointConfig::from_uri(
1177            "sql:select id, name from items order by id?db_url=sqlite::memory:&usePlaceholder=false",
1178        )
1179        .unwrap();
1180        config.resolve_defaults();
1181        assert!(!config.use_placeholder);
1182
1183        let mut producer = SqlProducer::new(
1184            config,
1185            Arc::new(OnceCell::new()),
1186            None,
1187            test_rt(),
1188            "sql-producer-test-route",
1189        );
1190        // Pre-initialize the pool so we don't hit the pool init path
1191        producer.pool.set(Arc::new(pool.clone())).unwrap();
1192
1193        let exchange = Exchange::new(Message::default());
1194        let result = producer.call(exchange).await;
1195        assert!(result.is_ok());
1196        let exchange = result.unwrap();
1197        // Should return results, not rowsAffected
1198        assert!(matches!(exchange.input.body, Body::Json(_)));
1199    }
1200
1201    // SQL-004: use_placeholder=true (default) processes placeholders normally
1202    #[tokio::test]
1203    async fn use_placeholder_true_processes_placeholders() {
1204        let pool = sqlite_pool().await;
1205        seed_items_table(&pool).await;
1206
1207        let mut config = SqlEndpointConfig::from_uri(
1208            "sql:select id, name from items where id = #?db_url=sqlite::memory:",
1209        )
1210        .unwrap();
1211        config.resolve_defaults();
1212        assert!(config.use_placeholder);
1213
1214        let mut producer = SqlProducer::new(
1215            config,
1216            Arc::new(OnceCell::new()),
1217            None,
1218            test_rt(),
1219            "sql-producer-test-route",
1220        );
1221        producer.pool.set(Arc::new(pool.clone())).unwrap();
1222
1223        let msg = Message::new(Body::Json(json!([1])));
1224        let exchange = Exchange::new(msg);
1225        let result = producer.call(exchange).await;
1226        assert!(result.is_ok());
1227    }
1228
1229    /// Regression: max_attempts=N → exactly N invocations (caught OpenSearch off-by-one 1f5c4c2a).
1230    /// Replicates the exact retry loop from SqlProducer::start() (producer.rs:161-185):
1231    ///   attempt starts at 0, incremented at top, should_retry(attempt), delay_for(attempt-1)
1232    #[tokio::test]
1233    async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
1234        use camel_component_api::NetworkRetryPolicy;
1235        use std::sync::Arc;
1236        use std::sync::atomic::{AtomicU32, Ordering};
1237
1238        let policy = NetworkRetryPolicy {
1239            max_attempts: 3,
1240            initial_delay: Duration::from_millis(1),
1241            max_delay: Duration::from_millis(1),
1242            multiplier: 1.0,
1243            ..NetworkRetryPolicy::default()
1244        };
1245
1246        let calls = Arc::new(AtomicU32::new(0));
1247        let calls_clone = Arc::clone(&calls);
1248
1249        let mut attempt: u32 = 0;
1250        let _result: Result<(), ()> = loop {
1251            attempt += 1;
1252            calls_clone.fetch_add(1, Ordering::SeqCst);
1253            let op_result: Result<(), ()> = Err(());
1254            match op_result {
1255                Ok(v) => break Ok(v),
1256                Err(_) if policy.should_retry(attempt) => {
1257                    let delay = policy.delay_for(attempt - 1);
1258                    tokio::time::sleep(delay).await;
1259                    continue;
1260                }
1261                Err(_) => break Err(()),
1262            }
1263        };
1264
1265        assert_eq!(
1266            calls.load(Ordering::SeqCst),
1267            3,
1268            "max_attempts=3 must yield exactly 3 invocations"
1269        );
1270    }
1271
1272    // ── ADR-0012 (g) regression tests ──────────────────────────────────
1273
1274    /// Regression: producer pool init failure calls force_unhealthy_for_route
1275    /// with correct route_id + name "g:sql:producer-pool-init" + non-empty reason.
1276    #[tokio::test]
1277    async fn producer_pool_init_failure_calls_force_unhealthy_for_route() {
1278        let health = Arc::new(RecordingHealth::default());
1279        let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntime {
1280            health: health.clone(),
1281        });
1282
1283        let mut config = SqlEndpointConfig::from_uri(
1284            "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false",
1285        )
1286        .unwrap();
1287        config.max_connections = Some(1);
1288        config.min_connections = Some(0);
1289        config.idle_timeout_secs = Some(300);
1290        config.max_lifetime_secs = Some(1800);
1291
1292        let mut producer = SqlProducer::new(
1293            config,
1294            Arc::new(OnceCell::new()),
1295            None,
1296            rt,
1297            "sql-producer-test-route",
1298        );
1299        let exchange = Exchange::new(Message::default());
1300
1301        let result = producer.call(exchange).await;
1302        assert!(result.is_err());
1303
1304        let forced = health.forced.lock().unwrap();
1305        assert_eq!(
1306            forced.len(),
1307            1,
1308            "expected one force_unhealthy_for_route call"
1309        );
1310        assert_eq!(forced[0].0, "sql-producer-test-route");
1311        assert_eq!(forced[0].1, "g:sql:producer-pool-init");
1312        assert!(!forced[0].2.is_empty(), "reason should be non-empty");
1313    }
1314}