Skip to main content

camel_component_sql/
producer.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use bytes::Bytes;
9use serde_json::json;
10use sqlx::AnyPool;
11use sqlx::any::AnyRow;
12use sqlx::pool::PoolOptions;
13use tokio::sync::OnceCell;
14use tower::Service;
15use tracing::{debug, error, info, warn};
16
17use crate::config::{SqlEndpointConfig, SqlOutputType, enrich_db_url_with_ssl, redact_db_url};
18use crate::headers;
19use crate::query::{PreparedQuery, is_select_query, parse_query_template, resolve_params};
20use crate::utils::{bind_json_values, is_retryable_sqlx_error, row_to_json};
21use camel_component_api::retry_async;
22use camel_component_api::{
23    Body, CamelError, Exchange, Message, RuntimeObservability, StreamBody, StreamMetadata,
24};
25
26#[derive(Clone)]
27pub struct SqlProducer {
28    pub(crate) config: SqlEndpointConfig,
29    pub(crate) pool: Arc<OnceCell<AnyPool>>,
30    pub(crate) stopped: Arc<AtomicBool>,
31    /// Phase B will use this for `rt.metrics().increment_errors(...)` and
32    /// `rt.health().force_unhealthy_for_route(...)` calls per ADR-0012.
33    #[allow(dead_code)]
34    pub(crate) runtime: Arc<dyn RuntimeObservability>,
35}
36
37impl SqlProducer {
38    pub fn new(
39        config: SqlEndpointConfig,
40        pool: Arc<OnceCell<AnyPool>>,
41        runtime: Arc<dyn RuntimeObservability>,
42    ) -> Self {
43        Self {
44            config,
45            pool,
46            stopped: Arc::new(AtomicBool::new(false)),
47            runtime,
48        }
49    }
50
51    pub fn stop(&self) {
52        self.stopped.store(true, Ordering::Relaxed);
53        // Close the pool asynchronously in the background with a timeout
54        if let Some(pool) = self.pool.get() {
55            let pool = pool.clone();
56            tokio::spawn(async move {
57                if tokio::time::timeout(Duration::from_secs(5), pool.close())
58                    .await
59                    .is_err()
60                {
61                    tracing::warn!("SQL producer pool did not close within 5s");
62                }
63            });
64        }
65    }
66
67    /// Resolves the query source based on priority:
68    /// 1. Header `CamelSql.Query`
69    /// 2. Body (if `use_message_body_for_sql` is true)
70    /// 3. Config query
71    pub(crate) fn resolve_query_source(exchange: &Exchange, config: &SqlEndpointConfig) -> String {
72        // Priority 1: Header
73        if let Some(query_value) = exchange.input.header(headers::QUERY)
74            && let Some(query_str) = query_value.as_str()
75        {
76            return query_str.to_string();
77        }
78
79        // Priority 2: Body (if use_message_body_for_sql)
80        if config.use_message_body_for_sql
81            && let Some(body_text) = exchange.input.body.as_text()
82        {
83            return body_text.to_string();
84        }
85
86        // Priority 3: Config query
87        config.query.clone()
88    }
89
90    /// Health check: runs a simple `SELECT 1` query against the connection pool
91    /// to verify database connectivity.
92    ///
93    /// Returns `Ok(())` if the database is reachable, or an error with details
94    /// if the connection fails.
95    pub async fn check_connection(&self) -> Result<(), CamelError> {
96        let pool = self.pool.get().ok_or_else(|| {
97            CamelError::ProcessorError("SQL connection pool not initialized".into())
98        })?;
99
100        debug!("Running health check: SELECT 1");
101        sqlx::query("SELECT 1").execute(pool).await.map_err(|e| {
102            warn!(error = %e, "SQL health check failed");
103            CamelError::ProcessorError(format!("SQL health check failed: {}", e))
104        })?;
105
106        debug!("SQL health check passed");
107        Ok(())
108    }
109}
110
111impl Service<Exchange> for SqlProducer {
112    type Response = Exchange;
113    type Error = CamelError;
114    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
115
116    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
117        if self.stopped.load(Ordering::Relaxed) {
118            return Poll::Ready(Err(CamelError::ProcessorError(
119                "SQL producer stopped".into(),
120            )));
121        }
122        if let Some(pool) = self.pool.get()
123            && pool.is_closed()
124        {
125            return Poll::Ready(Err(CamelError::ProcessorError(
126                "SQL connection pool is closed".into(),
127            )));
128        }
129        Poll::Ready(Ok(()))
130    }
131
132    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
133        let mut config = self.config.clone();
134        let pool_cell = Arc::clone(&self.pool);
135
136        Box::pin(async move {
137            // Get or initialize the connection pool
138            let pool: &AnyPool = pool_cell
139                .get_or_try_init(|| async {
140                    // Defensive: ensure config is resolved even if caller didn't use create_endpoint
141                    config.resolve_defaults();
142                    // SQL-014: resolve file-based query asynchronously (not blocking)
143                    config.resolve_file_query().await?;
144                    let db_url = enrich_db_url_with_ssl(&config.db_url, &config)?;
145
146                    // Install all compiled-in sqlx drivers so AnyPool can resolve them.
147                    // This is idempotent; safe to call multiple times.
148                    sqlx::any::install_default_drivers();
149
150                    let max_conn = config.max_connections.ok_or_else(|| {
151                        CamelError::Config("max_connections not resolved for SQL pool".into())
152                    })?;
153                    let min_conn = config.min_connections.ok_or_else(|| {
154                        CamelError::Config("min_connections not resolved for SQL pool".into())
155                    })?;
156                    let idle_timeout = config.idle_timeout_secs.ok_or_else(|| {
157                        CamelError::Config("idle_timeout_secs not resolved for SQL pool".into())
158                    })?;
159                    let max_lifetime = config.max_lifetime_secs.ok_or_else(|| {
160                        CamelError::Config("max_lifetime_secs not resolved for SQL pool".into())
161                    })?;
162
163                    info!(
164                        db_url = %redact_db_url(&config.db_url),
165                        "SQL producer pool initializing"
166                    );
167                    let retry_policy = &config.retry;
168                    retry_async::<_, _, _, _, sqlx::Error>(
169                        retry_policy,
170                        Some("sql-producer"),
171                        || {
172                            PoolOptions::new()
173                                .max_connections(max_conn)
174                                .min_connections(min_conn)
175                                .idle_timeout(Duration::from_secs(idle_timeout))
176                                .max_lifetime(Duration::from_secs(max_lifetime))
177                                .connect(&db_url)
178                        },
179                        is_retryable_sqlx_error,
180                    )
181                    .await
182                    .map_err(|e| {
183                        // TODO(ADR-0012-g): replace with force_unhealthy_for_route once bd rc-1mo lands
184                        error!(error = %e, db_url = %redact_db_url(&config.db_url), "Failed to connect to database"); // allow-log-levels
185                        CamelError::EndpointCreationFailed(format!(
186                            "Failed to connect to database: {}",
187                            e
188                        ))
189                    })
190                })
191                .await
192                .map_err(|e: CamelError| e.clone())?;
193
194            // Resolve query string
195            let query_str = Self::resolve_query_source(&exchange, &config);
196
197            // SQL-002: warn if Managed transaction mode requested
198            if config.transaction_mode == crate::config::TransactionMode::Managed {
199                warn!("transactionManager not yet implemented; using Auto mode");
200            }
201
202            debug!(
203                query = %query_str,
204                "executing SQL query"
205            );
206
207            // Execute based on mode
208            if config.batch {
209                // Batch mode: execute_batch handles its own template parsing per item
210                execute_batch(pool, &config, &mut exchange).await?;
211            } else if config.use_placeholder {
212                // Placeholder processing enabled (default): parse template, resolve params, apply header override
213                let template = parse_query_template(&query_str, config.placeholder)?;
214                let mut prepared = resolve_params(&template, &exchange, &config.in_separator)?;
215
216                // CamelSql.Parameters header override
217                if let Some(params_value) = exchange.input.header(headers::PARAMETERS) {
218                    if let Some(arr) = params_value.as_array() {
219                        if arr.len() != prepared.bindings.len() {
220                            warn!(
221                                expected = prepared.bindings.len(),
222                                got = arr.len(),
223                                header = headers::PARAMETERS,
224                                "Parameter count mismatch — SQL has {} placeholders but header provides {} values",
225                                prepared.bindings.len(),
226                                arr.len()
227                            );
228                        }
229                        debug!(
230                            "Overriding bindings from {} header with {} parameters",
231                            headers::PARAMETERS,
232                            arr.len()
233                        );
234                        prepared.bindings = arr.clone();
235                    } else {
236                        warn!(
237                            header = headers::PARAMETERS,
238                            "Header is present but not a JSON array — ignoring parameter override"
239                        );
240                    }
241                }
242
243                debug!(
244                    "Executing prepared SQL ({} bindings)",
245                    prepared.bindings.len()
246                );
247
248                if is_select_query(&prepared.sql) {
249                    execute_select(pool, &prepared, &config, &mut exchange).await?;
250                } else {
251                    execute_modify(pool, &prepared, &config, &mut exchange).await?;
252                }
253            } else {
254                // use_placeholder=false: execute query as-is without template parsing
255                debug!("Executing raw SQL (placeholder processing disabled)");
256                let prepared = PreparedQuery {
257                    sql: query_str,
258                    bindings: vec![],
259                };
260
261                if is_select_query(&prepared.sql) {
262                    execute_select(pool, &prepared, &config, &mut exchange).await?;
263                } else {
264                    execute_modify(pool, &prepared, &config, &mut exchange).await?;
265                }
266            }
267
268            Ok(exchange)
269        })
270    }
271}
272
273/// Executes a SELECT query and populates the exchange body with results.
274async fn execute_select(
275    pool: &AnyPool,
276    prepared: &PreparedQuery,
277    config: &SqlEndpointConfig,
278    exchange: &mut Exchange,
279) -> Result<(), CamelError> {
280    match config.output_type {
281        SqlOutputType::SelectOne => {
282            // fetch_all and take first row
283            let mut query = sqlx::query(&prepared.sql);
284            query = bind_json_values(query, &prepared.bindings);
285
286            let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
287                warn!(error = %e, "SQL query failed");
288                CamelError::ProcessorError(format!("Query execution failed: {}", e))
289            })?;
290
291            let count = rows.len();
292            debug!(rows = count, "SQL query completed");
293            let json_rows: Vec<serde_json::Value> = rows
294                .iter()
295                .map(row_to_json)
296                .collect::<Result<Vec<_>, _>>()?;
297
298            if let Some(first_row) = json_rows.into_iter().next() {
299                exchange.input.body = Body::Json(first_row);
300            } else {
301                exchange.input.body = Body::Empty;
302            }
303            debug!("SelectOne returned {} row", if count > 0 { 1 } else { 0 });
304            exchange
305                .input
306                .set_header(headers::ROW_COUNT, serde_json::json!(count));
307        }
308        SqlOutputType::SelectList => {
309            // fetch_all for list output
310            let mut query = sqlx::query(&prepared.sql);
311            query = bind_json_values(query, &prepared.bindings);
312
313            let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
314                warn!(error = %e, "SQL query failed");
315                CamelError::ProcessorError(format!("Query execution failed: {}", e))
316            })?;
317
318            let count = rows.len();
319            debug!(rows = count, "SQL query completed");
320            let json_rows: Vec<serde_json::Value> = rows
321                .iter()
322                .map(row_to_json)
323                .collect::<Result<Vec<_>, _>>()?;
324
325            exchange.input.body = Body::Json(serde_json::Value::Array(json_rows));
326            debug!("SelectList returned {} rows", count);
327            exchange
328                .input
329                .set_header(headers::ROW_COUNT, serde_json::json!(count));
330        }
331        SqlOutputType::StreamList => {
332            // Use fetch() for true streaming - avoids loading all rows into memory
333            use futures::TryStreamExt;
334
335            let pool_clone = pool.clone();
336            let sql_str = prepared.sql.clone();
337            let bindings = prepared.bindings.clone();
338
339            // Build the stream that reads rows on demand and serializes to NDJSON bytes
340            let byte_stream = async_stream::try_stream! {
341                let mut q = sqlx::query(&sql_str);
342                q = bind_json_values(q, &bindings);
343                let mut rows = q.fetch(&pool_clone);
344                while let Some(row) = rows.try_next().await.map_err(|e| {
345                    CamelError::ProcessorError(format!("Query execution failed: {}", e))
346                })? {
347                    let json_val = row_to_json(&row).map_err(|e| {
348                        CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
349                    })?;
350                    let mut bytes = serde_json::to_vec(&json_val)
351                        .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
352                    bytes.push(b'\n');
353                    yield Bytes::from(bytes);
354                }
355            };
356
357            exchange.input.body = Body::Stream(StreamBody {
358                stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
359                metadata: StreamMetadata {
360                    content_type: Some("application/x-ndjson".to_string()),
361                    size_hint: None,
362                    origin: None,
363                },
364            });
365            debug!("StreamList: created lazy stream (rows fetched on demand)");
366            // Note: ROW_COUNT not set for StreamList since row count is unknown until exhausted
367        }
368    }
369
370    Ok(())
371}
372
373/// Executes a modification query (INSERT/UPDATE/DELETE).
374async fn execute_modify(
375    pool: &AnyPool,
376    prepared: &PreparedQuery,
377    config: &SqlEndpointConfig,
378    exchange: &mut Exchange,
379) -> Result<(), CamelError> {
380    let mut query = sqlx::query(&prepared.sql);
381    query = bind_json_values(query, &prepared.bindings);
382
383    let result = query.execute(pool).await.map_err(|e| {
384        warn!(error = %e, "SQL query failed");
385        CamelError::ProcessorError(format!("Query execution failed: {}", e))
386    })?;
387
388    let rows_affected = result.rows_affected();
389
390    // Fix 4: Implement expected_update_count validation
391    if let Some(expected) = config.expected_update_count
392        && rows_affected as i64 != expected
393    {
394        warn!(expected, actual = rows_affected, "Row count mismatch");
395        return Err(CamelError::ProcessorError(format!(
396            "Expected {} rows affected, got {}",
397            expected, rows_affected
398        )));
399    }
400
401    exchange
402        .input
403        .set_header(headers::UPDATE_COUNT, serde_json::json!(rows_affected));
404
405    if config.noop {
406        // Preserve original body
407    } else {
408        exchange.input.body = Body::Json(json!({ "rowsAffected": rows_affected }));
409    }
410
411    debug!(rows = rows_affected, "SQL modify query completed");
412
413    Ok(())
414}
415
416/// Executes a batch of queries from a JSON array body.
417async fn execute_batch(
418    pool: &AnyPool,
419    config: &SqlEndpointConfig,
420    exchange: &mut Exchange,
421) -> Result<(), CamelError> {
422    // Body must be JSON array of arrays
423    let body_json = match &exchange.input.body {
424        Body::Json(val) => val,
425        _ => {
426            return Err(CamelError::ProcessorError(
427                "Batch mode requires body to be a JSON array of arrays".to_string(),
428            ));
429        }
430    };
431
432    let batch_data = body_json
433        .as_array()
434        .ok_or_else(|| {
435            CamelError::ProcessorError("Batch mode requires body to be a JSON array".to_string())
436        })?
437        .clone();
438
439    // Parse template from config query
440    let template = parse_query_template(&config.query, config.placeholder)?;
441
442    // Fix 2: Batch operations must be wrapped in a transaction
443    let mut tx = pool.begin().await.map_err(|e| {
444        // log-policy: handler-owned
445        // (category a: tx begin failure inside producer pipeline)
446        warn!(error = %e, "Failed to begin transaction");
447        CamelError::ProcessorError(format!("Failed to begin transaction: {}", e))
448    })?;
449
450    let mut total_rows_affected: u64 = 0;
451
452    for (batch_idx, params_array) in batch_data.into_iter().enumerate() {
453        // Each item must be an array of parameters
454        params_array.as_array().ok_or_else(|| {
455            CamelError::ProcessorError(format!(
456                "Batch item at index {} must be a JSON array of parameters",
457                batch_idx
458            ))
459        })?;
460
461        // Create a temporary exchange with the params as body for resolution
462        let temp_msg = Message::new(Body::Json(params_array.clone()));
463        let temp_exchange = Exchange::new(temp_msg);
464
465        // Resolve parameters for this batch item
466        let prepared = resolve_params(&template, &temp_exchange, &config.in_separator)?;
467
468        // Execute against transaction
469        let mut query = sqlx::query(&prepared.sql);
470        query = bind_json_values(query, &prepared.bindings);
471
472        let result = query.execute(&mut *tx).await.map_err(|e| {
473            // log-policy: handler-owned
474            // (category a: batch query execution failure inside producer pipeline)
475            warn!("Batch query execution failed at index {}: {}", batch_idx, e);
476            CamelError::ProcessorError(format!("Batch query execution failed: {}", e))
477        })?;
478
479        // Validate expected_update_count per batch item
480        if let Some(expected) = config.expected_update_count
481            && result.rows_affected() as i64 != expected
482        {
483            // log-policy: handler-owned
484            // (category a: expected_update_count mismatch inside producer pipeline)
485            warn!(
486                "Batch item {}: expected {} rows affected, got {}",
487                batch_idx,
488                expected,
489                result.rows_affected()
490            );
491            return Err(CamelError::ProcessorError(format!(
492                "Batch item {}: expected {} rows affected, got {}",
493                batch_idx,
494                expected,
495                result.rows_affected()
496            )));
497        }
498
499        total_rows_affected += result.rows_affected();
500    }
501
502    // Commit the transaction
503    tx.commit().await.map_err(|e| {
504        // log-policy: handler-owned
505        // (category a: tx commit failure inside producer pipeline)
506        warn!(error = %e, "Failed to commit transaction");
507        CamelError::ProcessorError(format!("Failed to commit transaction: {}", e))
508    })?;
509
510    exchange.input.set_header(
511        headers::UPDATE_COUNT,
512        serde_json::json!(total_rows_affected),
513    );
514
515    debug!(
516        "Batch execution completed, total rows affected: {}",
517        total_rows_affected
518    );
519
520    Ok(())
521}
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526    use camel_component_api::test_support::PanicRuntimeObservability;
527    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
528        std::sync::Arc::new(PanicRuntimeObservability)
529    }
530    use camel_component_api::Message;
531    use camel_component_api::UriConfig;
532    use sqlx::any::AnyPoolOptions;
533    use std::sync::Arc;
534    use tokio::sync::OnceCell;
535
536    async fn sqlite_pool() -> AnyPool {
537        sqlx::any::install_default_drivers();
538        AnyPoolOptions::new()
539            .max_connections(1)
540            .connect("sqlite::memory:")
541            .await
542            .expect("sqlite pool")
543    }
544
545    async fn seed_items_table(pool: &AnyPool) {
546        sqlx::query(
547            "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, done INTEGER DEFAULT 0)",
548        )
549        .execute(pool)
550        .await
551        .expect("create table");
552        sqlx::query("INSERT INTO items (id, name, done) VALUES (1, 'a', 0), (2, 'b', 0)")
553            .execute(pool)
554            .await
555            .expect("seed rows");
556    }
557
558    fn config() -> SqlEndpointConfig {
559        let mut c =
560            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
561        c.resolve_defaults();
562        c
563    }
564
565    #[test]
566    fn producer_clone_shares_pool() {
567        let p1 = SqlProducer::new(config(), Arc::new(OnceCell::new()), test_rt());
568        let p2 = p1.clone();
569        assert!(Arc::ptr_eq(&p1.pool, &p2.pool));
570        assert!(Arc::ptr_eq(&p1.stopped, &p2.stopped));
571    }
572
573    #[test]
574    fn resolve_query_from_config() {
575        let config = config();
576        let ex = Exchange::new(Message::default());
577        let q = SqlProducer::resolve_query_source(&ex, &config);
578        assert_eq!(q, "select 1");
579    }
580
581    #[test]
582    fn resolve_query_from_header() {
583        let config = config();
584        let mut msg = Message::default();
585        msg.set_header(headers::QUERY, serde_json::json!("select 2"));
586        let ex = Exchange::new(msg);
587        let q = SqlProducer::resolve_query_source(&ex, &config);
588        assert_eq!(q, "select 2");
589    }
590
591    #[test]
592    fn resolve_query_from_body() {
593        let mut config = config();
594        config.use_message_body_for_sql = true;
595        let msg = Message::new(Body::Text("select 3".to_string()));
596        let ex = Exchange::new(msg);
597        let q = SqlProducer::resolve_query_source(&ex, &config);
598        assert_eq!(q, "select 3");
599    }
600
601    #[test]
602    fn resolve_query_header_priority_over_body() {
603        let mut config = config();
604        config.use_message_body_for_sql = true;
605        let mut msg = Message::new(Body::Text("select from body".to_string()));
606        msg.set_header(headers::QUERY, serde_json::json!("select from header"));
607        let ex = Exchange::new(msg);
608        let q = SqlProducer::resolve_query_source(&ex, &config);
609        assert_eq!(q, "select from header");
610    }
611
612    #[test]
613    fn resolve_query_body_priority_over_config() {
614        let mut config = config();
615        config.use_message_body_for_sql = true;
616        let msg = Message::new(Body::Text("select from body".to_string()));
617        let ex = Exchange::new(msg);
618        let q = SqlProducer::resolve_query_source(&ex, &config);
619        assert_eq!(q, "select from body");
620    }
621
622    #[test]
623    fn bind_json_null() {
624        let query = sqlx::query("SELECT ?");
625        let values = vec![serde_json::Value::Null];
626        let _bound = bind_json_values(query, &values);
627        // Compilation test - ensure it binds
628    }
629
630    #[test]
631    fn bind_json_bool() {
632        let query = sqlx::query("SELECT ?");
633        let values = vec![serde_json::Value::Bool(true)];
634        let _bound = bind_json_values(query, &values);
635    }
636
637    #[test]
638    fn bind_json_number_i64() {
639        let query = sqlx::query("SELECT ?");
640        let values = vec![serde_json::json!(42)];
641        let _bound = bind_json_values(query, &values);
642    }
643
644    #[test]
645    fn bind_json_number_f64() {
646        let query = sqlx::query("SELECT ?");
647        let values = vec![serde_json::json!(std::f64::consts::PI)];
648        let _bound = bind_json_values(query, &values);
649    }
650
651    #[test]
652    fn bind_json_string() {
653        let query = sqlx::query("SELECT ?");
654        let values = vec![serde_json::json!("hello world")];
655        let _bound = bind_json_values(query, &values);
656    }
657
658    #[test]
659    fn bind_json_array() {
660        let query = sqlx::query("SELECT ?");
661        let values = vec![serde_json::json!([1, 2, 3])];
662        let _bound = bind_json_values(query, &values);
663    }
664
665    #[test]
666    fn bind_json_object() {
667        let query = sqlx::query("SELECT ?");
668        let values = vec![serde_json::json!({"key": "value"})];
669        let _bound = bind_json_values(query, &values);
670    }
671
672    #[test]
673    fn bind_multiple_values() {
674        let query = sqlx::query("SELECT ?, ?, ?");
675        let values = vec![
676            serde_json::json!(1),
677            serde_json::json!("test"),
678            serde_json::Value::Null,
679        ];
680        let _bound = bind_json_values(query, &values);
681    }
682
683    // Test for Fix 4: expected_update_count config field presence
684    #[test]
685    fn expected_update_count_validation() {
686        // Test that expected_update_count is parsed from URI
687        let config = SqlEndpointConfig::from_uri(
688            "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=5",
689        )
690        .unwrap();
691        assert_eq!(config.expected_update_count, Some(5));
692
693        // Test default (no expected_update_count)
694        let config_default = self::config();
695        assert_eq!(config_default.expected_update_count, None);
696
697        // Test negative value (should parse)
698        let config_neg = SqlEndpointConfig::from_uri(
699            "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=-1",
700        )
701        .unwrap();
702        assert_eq!(config_neg.expected_update_count, Some(-1));
703    }
704
705    // Test for Fix 3: parameters header override logic
706    #[test]
707    fn parameters_header_override_logic() {
708        // Create a PreparedQuery manually
709        let mut prepared = PreparedQuery {
710            sql: "SELECT * FROM t WHERE id = $1".to_string(),
711            bindings: vec![serde_json::json!(42)],
712        };
713
714        // Simulate the header override logic
715        let header_params = serde_json::json!([99, "extra"]);
716        if let Some(arr) = header_params.as_array() {
717            prepared.bindings = arr.clone();
718        }
719
720        // Verify bindings were overridden
721        assert_eq!(prepared.bindings.len(), 2);
722        assert_eq!(prepared.bindings[0], serde_json::json!(99));
723        assert_eq!(prepared.bindings[1], serde_json::json!("extra"));
724
725        // Test with non-array header (should not override)
726        let mut prepared2 = PreparedQuery {
727            sql: "SELECT * FROM t WHERE id = $1".to_string(),
728            bindings: vec![serde_json::json!(42)],
729        };
730        let header_non_array = serde_json::json!({"not": "an array"});
731        if let Some(arr) = header_non_array.as_array() {
732            prepared2.bindings = arr.clone();
733        }
734        // Should remain unchanged
735        assert_eq!(prepared2.bindings.len(), 1);
736        assert_eq!(prepared2.bindings[0], serde_json::json!(42));
737    }
738
739    #[tokio::test]
740    async fn execute_select_one_sets_body_and_row_count() {
741        let pool = sqlite_pool().await;
742        seed_items_table(&pool).await;
743
744        let mut config = SqlEndpointConfig::from_uri(
745            "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=SelectOne",
746        )
747        .unwrap();
748        config.resolve_defaults();
749
750        let prepared = PreparedQuery {
751            sql: "select id, name from items order by id".to_string(),
752            bindings: vec![],
753        };
754        let mut exchange = Exchange::new(Message::default());
755
756        execute_select(&pool, &prepared, &config, &mut exchange)
757            .await
758            .expect("select one");
759
760        assert_eq!(exchange.input.header(headers::ROW_COUNT), Some(&json!(2)));
761        assert_eq!(
762            exchange.input.body,
763            Body::Json(json!({"id": 1, "name": "a"}))
764        );
765    }
766
767    #[tokio::test]
768    async fn execute_stream_list_materializes_ndjson() {
769        let pool = sqlite_pool().await;
770        seed_items_table(&pool).await;
771
772        let mut config = SqlEndpointConfig::from_uri(
773            "sql:select id from items order by id?db_url=sqlite::memory:&outputType=StreamList",
774        )
775        .unwrap();
776        config.resolve_defaults();
777
778        let prepared = PreparedQuery {
779            sql: "select id from items order by id".to_string(),
780            bindings: vec![],
781        };
782        let mut exchange = Exchange::new(Message::default());
783
784        execute_select(&pool, &prepared, &config, &mut exchange)
785            .await
786            .expect("stream list");
787
788        let bytes = exchange
789            .input
790            .body
791            .clone()
792            .into_bytes(1024)
793            .await
794            .expect("stream bytes");
795        let text = String::from_utf8(bytes.to_vec()).expect("utf8");
796        assert!(text.contains("{\"id\":1}"));
797        assert!(text.contains("{\"id\":2}"));
798        assert_eq!(exchange.input.header(headers::ROW_COUNT), None);
799    }
800
801    #[tokio::test]
802    async fn execute_modify_expected_update_count_mismatch_returns_error() {
803        let pool = sqlite_pool().await;
804        seed_items_table(&pool).await;
805
806        let mut config = SqlEndpointConfig::from_uri(
807            "sql:update items set done=1 where id = #?db_url=sqlite::memory:&expectedUpdateCount=2",
808        )
809        .unwrap();
810        config.resolve_defaults();
811
812        let prepared = PreparedQuery {
813            sql: "update items set done=1 where id = $1".to_string(),
814            bindings: vec![json!(1)],
815        };
816        let mut exchange = Exchange::new(Message::default());
817
818        let err = execute_modify(&pool, &prepared, &config, &mut exchange)
819            .await
820            .expect_err("must fail due expected row count mismatch");
821        assert!(err.to_string().contains("Expected 2 rows affected, got 1"));
822    }
823
824    #[tokio::test]
825    async fn execute_batch_rollback_when_any_item_fails_expected_count() {
826        let pool = sqlite_pool().await;
827        seed_items_table(&pool).await;
828
829        let mut config = SqlEndpointConfig::from_uri(
830            "sql:update items set done=1 where id = #?db_url=sqlite::memory:&batch=true&expectedUpdateCount=1",
831        )
832        .unwrap();
833        config.resolve_defaults();
834
835        let mut exchange = Exchange::new(Message::new(Body::Json(json!([[1], [999]]))));
836
837        let err = execute_batch(&pool, &config, &mut exchange)
838            .await
839            .expect_err("second batch item should fail expectedUpdateCount");
840        assert!(
841            err.to_string()
842                .contains("Batch item 1: expected 1 rows affected, got 0")
843        );
844
845        let row = sqlx::query("select done from items where id = 1")
846            .fetch_one(&pool)
847            .await
848            .expect("query row");
849        let done: i64 = sqlx::Row::try_get(&row, 0).expect("done column");
850        assert_eq!(done, 0, "transaction must rollback first update");
851    }
852
853    // --- Phase B hardening tests ---
854
855    // SQL-001: Direct producer construction without resolve_defaults does not panic.
856    // The producer defensively calls resolve_defaults() during pool init, so the pool
857    // fields get resolved. This test verifies no panic occurs and the pool initializes.
858    #[tokio::test]
859    async fn producer_no_panic_without_prior_resolve_defaults() {
860        // Create config without calling resolve_defaults() — pool fields are None
861        let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
862        assert!(config.max_connections.is_none());
863
864        let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()), test_rt());
865        let exchange = Exchange::new(Message::default());
866
867        // Should NOT panic — producer calls resolve_defaults() defensively
868        let result = producer.call(exchange).await;
869        assert!(
870            result.is_ok(),
871            "Producer should initialize pool without panic, got: {:?}",
872            result
873        );
874    }
875
876    // SQL-001: Pool init returns CamelError::Config when pool params cannot be resolved
877    #[tokio::test]
878    async fn producer_pool_init_returns_config_error_for_invalid_db() {
879        // Create config with an invalid db_url that will fail to connect.
880        // Disable retry for this test — the purpose is to verify error handling
881        // on a known-bad connect, not to exercise the retry loop.
882        let mut config = SqlEndpointConfig::from_uri(
883            "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false",
884        )
885        .unwrap();
886        // Set pool params explicitly so resolve_defaults doesn't help with connection
887        config.max_connections = Some(1);
888        config.min_connections = Some(0);
889        config.idle_timeout_secs = Some(300);
890        config.max_lifetime_secs = Some(1800);
891
892        let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()), test_rt());
893        let exchange = Exchange::new(Message::default());
894
895        let result = producer.call(exchange).await;
896        assert!(result.is_err());
897        // Error should be EndpointCreationFailed (connection error), not a panic
898        let err_msg = result.unwrap_err().to_string();
899        assert!(
900            err_msg.contains("Failed to connect") || err_msg.contains("database"),
901            "Expected connection error, got: {}",
902            err_msg
903        );
904    }
905
906    // SQL-007: poll_ready returns Ready (pool lazily initialized on first call)
907    #[test]
908    fn poll_ready_returns_ready_for_uninitialized_pool() {
909        let config = {
910            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
911            c.resolve_defaults();
912            c
913        };
914        let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()), test_rt());
915        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
916        let result = producer.poll_ready(&mut cx);
917        assert!(matches!(result, Poll::Ready(Ok(()))));
918    }
919
920    // SQL-007: poll_ready returns error when stopped flag is set
921    #[test]
922    fn poll_ready_returns_error_when_stopped() {
923        let config = {
924            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
925            c.resolve_defaults();
926            c
927        };
928        let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()), test_rt());
929        producer.stop();
930        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
931        let result = producer.poll_ready(&mut cx);
932        assert!(matches!(result, Poll::Ready(Err(_))));
933        let err_msg = match result {
934            Poll::Ready(Err(e)) => e.to_string(),
935            _ => unreachable!(),
936        };
937        assert!(err_msg.contains("SQL producer stopped"));
938    }
939
940    // SQL-007: poll_ready returns error when pool is closed
941    #[tokio::test]
942    async fn poll_ready_returns_error_when_pool_closed() {
943        let pool = sqlite_pool().await;
944        pool.close().await;
945
946        let config = {
947            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
948            c.resolve_defaults();
949            c
950        };
951        let pool_cell = Arc::new(OnceCell::new());
952        pool_cell.set(pool).unwrap();
953
954        let mut producer = SqlProducer::new(config, pool_cell, test_rt());
955        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
956        let result = producer.poll_ready(&mut cx);
957        assert!(matches!(result, Poll::Ready(Err(_))));
958        let err_msg = match result {
959            Poll::Ready(Err(e)) => e.to_string(),
960            _ => unreachable!(),
961        };
962        assert!(err_msg.contains("SQL connection pool is closed"));
963    }
964
965    // SQL-007: poll_ready returns Ok for healthy initialized pool
966    #[tokio::test]
967    async fn poll_ready_returns_ok_for_healthy_pool() {
968        let pool = sqlite_pool().await;
969
970        let config = {
971            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
972            c.resolve_defaults();
973            c
974        };
975        let pool_cell = Arc::new(OnceCell::new());
976        pool_cell.set(pool).unwrap();
977
978        let mut producer = SqlProducer::new(config, pool_cell, test_rt());
979        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
980        let result = producer.poll_ready(&mut cx);
981        assert!(matches!(result, Poll::Ready(Ok(()))));
982    }
983
984    // SQL-008: stop() closes the pool
985    #[tokio::test]
986    async fn test_sql_stop_closes_pool() {
987        let pool = sqlite_pool().await;
988
989        let config = {
990            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
991            c.resolve_defaults();
992            c
993        };
994        let pool_cell = Arc::new(OnceCell::new());
995        pool_cell.set(pool.clone()).unwrap();
996
997        let producer = SqlProducer::new(config, pool_cell.clone(), test_rt());
998        assert!(!pool.is_closed(), "Pool should be open before stop");
999
1000        producer.stop();
1001
1002        // Allow background close task to complete
1003        tokio::time::sleep(Duration::from_millis(100)).await;
1004
1005        assert!(
1006            pool.is_closed(),
1007            "Pool should be closed after producer.stop()"
1008        );
1009
1010        // Subsequent poll_ready should fail
1011        let mut producer2 = SqlProducer::new(
1012            {
1013                let mut c =
1014                    SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1015                c.resolve_defaults();
1016                c
1017            },
1018            pool_cell.clone(),
1019            test_rt(),
1020        );
1021        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1022        let result = producer2.poll_ready(&mut cx);
1023        assert!(
1024            matches!(result, Poll::Ready(Err(_))),
1025            "poll_ready should fail after pool closed"
1026        );
1027    }
1028
1029    // SQL-004: use_placeholder=false skips template parsing
1030    #[tokio::test]
1031    async fn use_placeholder_false_executes_raw_sql() {
1032        let pool = sqlite_pool().await;
1033        seed_items_table(&pool).await;
1034
1035        let mut config = SqlEndpointConfig::from_uri(
1036            "sql:select id, name from items order by id?db_url=sqlite::memory:&usePlaceholder=false",
1037        )
1038        .unwrap();
1039        config.resolve_defaults();
1040        assert!(!config.use_placeholder);
1041
1042        let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()), test_rt());
1043        // Pre-initialize the pool so we don't hit the pool init path
1044        producer.pool.set(pool.clone()).unwrap();
1045
1046        let exchange = Exchange::new(Message::default());
1047        let result = producer.call(exchange).await;
1048        assert!(result.is_ok());
1049        let exchange = result.unwrap();
1050        // Should return results, not rowsAffected
1051        assert!(matches!(exchange.input.body, Body::Json(_)));
1052    }
1053
1054    // SQL-004: use_placeholder=true (default) processes placeholders normally
1055    #[tokio::test]
1056    async fn use_placeholder_true_processes_placeholders() {
1057        let pool = sqlite_pool().await;
1058        seed_items_table(&pool).await;
1059
1060        let mut config = SqlEndpointConfig::from_uri(
1061            "sql:select id, name from items where id = #?db_url=sqlite::memory:",
1062        )
1063        .unwrap();
1064        config.resolve_defaults();
1065        assert!(config.use_placeholder);
1066
1067        let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()), test_rt());
1068        producer.pool.set(pool.clone()).unwrap();
1069
1070        let msg = Message::new(Body::Json(json!([1])));
1071        let exchange = Exchange::new(msg);
1072        let result = producer.call(exchange).await;
1073        assert!(result.is_ok());
1074    }
1075
1076    /// Regression: max_attempts=N → exactly N invocations (caught OpenSearch off-by-one 1f5c4c2a).
1077    /// Replicates the exact retry loop from SqlProducer::start() (producer.rs:161-185):
1078    ///   attempt starts at 0, incremented at top, should_retry(attempt), delay_for(attempt-1)
1079    #[tokio::test]
1080    async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
1081        use camel_component_api::NetworkRetryPolicy;
1082        use std::sync::Arc;
1083        use std::sync::atomic::{AtomicU32, Ordering};
1084
1085        let policy = NetworkRetryPolicy {
1086            max_attempts: 3,
1087            initial_delay: Duration::from_millis(1),
1088            max_delay: Duration::from_millis(1),
1089            multiplier: 1.0,
1090            ..NetworkRetryPolicy::default()
1091        };
1092
1093        let calls = Arc::new(AtomicU32::new(0));
1094        let calls_clone = Arc::clone(&calls);
1095
1096        let mut attempt: u32 = 0;
1097        let _result: Result<(), ()> = loop {
1098            attempt += 1;
1099            calls_clone.fetch_add(1, Ordering::SeqCst);
1100            let op_result: Result<(), ()> = Err(());
1101            match op_result {
1102                Ok(v) => break Ok(v),
1103                Err(_) if policy.should_retry(attempt) => {
1104                    let delay = policy.delay_for(attempt - 1);
1105                    tokio::time::sleep(delay).await;
1106                    continue;
1107                }
1108                Err(_) => break Err(()),
1109            }
1110        };
1111
1112        assert_eq!(
1113            calls.load(Ordering::SeqCst),
1114            3,
1115            "max_attempts=3 must yield exactly 3 invocations"
1116        );
1117    }
1118}