Skip to main content

camel_component_sql/
producer.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use bytes::Bytes;
9use serde_json::json;
10use sqlx::AnyPool;
11use sqlx::any::AnyRow;
12use sqlx::pool::PoolOptions;
13use tokio::sync::OnceCell;
14use tower::Service;
15use tracing::{debug, error, info, warn};
16
17use crate::config::{SqlEndpointConfig, SqlOutputType, enrich_db_url_with_ssl, redact_db_url};
18use crate::headers;
19use crate::query::{PreparedQuery, is_select_query, parse_query_template, resolve_params};
20use crate::utils::{bind_json_values, is_retryable_sqlx_error, row_to_json};
21use camel_component_api::retry_async;
22use camel_component_api::{
23    Body, CamelError, Exchange, Message, RuntimeObservability, StreamBody, StreamMetadata,
24};
25
26#[derive(Clone)]
27pub struct SqlProducer {
28    pub(crate) config: SqlEndpointConfig,
29    pub(crate) pool: Arc<OnceCell<AnyPool>>,
30    pub(crate) stopped: Arc<AtomicBool>,
31    pub(crate) runtime: Arc<dyn RuntimeObservability>,
32    /// Route identifier for ADR-0012 (g) health/observability calls.
33    pub(crate) route_id: String,
34}
35
36impl SqlProducer {
37    pub fn new(
38        config: SqlEndpointConfig,
39        pool: Arc<OnceCell<AnyPool>>,
40        runtime: Arc<dyn RuntimeObservability>,
41        route_id: impl Into<String>,
42    ) -> Self {
43        Self {
44            config,
45            pool,
46            stopped: Arc::new(AtomicBool::new(false)),
47            runtime,
48            route_id: route_id.into(),
49        }
50    }
51
52    pub fn stop(&self) {
53        self.stopped.store(true, Ordering::Relaxed);
54        // Close the pool asynchronously in the background with a timeout
55        if let Some(pool) = self.pool.get() {
56            let pool = pool.clone();
57            tokio::spawn(async move {
58                if tokio::time::timeout(Duration::from_secs(5), pool.close())
59                    .await
60                    .is_err()
61                {
62                    tracing::warn!("SQL producer pool did not close within 5s");
63                }
64            });
65        }
66    }
67
68    /// Resolves the query source based on priority:
69    /// 1. Header `CamelSql.Query`
70    /// 2. Body (if `use_message_body_for_sql` is true)
71    /// 3. Config query
72    pub(crate) fn resolve_query_source(exchange: &Exchange, config: &SqlEndpointConfig) -> String {
73        // Priority 1: Header
74        if let Some(query_value) = exchange.input.header(headers::QUERY)
75            && let Some(query_str) = query_value.as_str()
76        {
77            return query_str.to_string();
78        }
79
80        // Priority 2: Body (if use_message_body_for_sql)
81        if config.use_message_body_for_sql
82            && let Some(body_text) = exchange.input.body.as_text()
83        {
84            return body_text.to_string();
85        }
86
87        // Priority 3: Config query
88        config.query.clone()
89    }
90
91    /// Health check: runs a simple `SELECT 1` query against the connection pool
92    /// to verify database connectivity.
93    ///
94    /// Returns `Ok(())` if the database is reachable, or an error with details
95    /// if the connection fails.
96    pub async fn check_connection(&self) -> Result<(), CamelError> {
97        let pool = self.pool.get().ok_or_else(|| {
98            CamelError::ProcessorError("SQL connection pool not initialized".into())
99        })?;
100
101        debug!("Running health check: SELECT 1");
102        sqlx::query("SELECT 1").execute(pool).await.map_err(|e| {
103            warn!(error = %e, "SQL health check failed");
104            CamelError::ProcessorError(format!("SQL health check failed: {}", e))
105        })?;
106
107        debug!("SQL health check passed");
108        Ok(())
109    }
110}
111
112impl Service<Exchange> for SqlProducer {
113    type Response = Exchange;
114    type Error = CamelError;
115    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
116
117    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
118        if self.stopped.load(Ordering::Relaxed) {
119            return Poll::Ready(Err(CamelError::ProcessorError(
120                "SQL producer stopped".into(),
121            )));
122        }
123        if let Some(pool) = self.pool.get()
124            && pool.is_closed()
125        {
126            return Poll::Ready(Err(CamelError::ProcessorError(
127                "SQL connection pool is closed".into(),
128            )));
129        }
130        Poll::Ready(Ok(()))
131    }
132
133    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
134        let mut config = self.config.clone();
135        let pool_cell = Arc::clone(&self.pool);
136        let runtime = Arc::clone(&self.runtime);
137        let route_id = self.route_id.clone();
138
139        Box::pin(async move {
140            // Get or initialize the connection pool
141            let pool: &AnyPool = pool_cell
142                .get_or_try_init(|| async {
143                    // Defensive: ensure config is resolved even if caller didn't use create_endpoint
144                    config.resolve_defaults();
145                    // SQL-014: resolve file-based query asynchronously (not blocking)
146                    config.resolve_file_query().await?;
147                    let db_url = enrich_db_url_with_ssl(&config.db_url, &config)?;
148
149                    // Install all compiled-in sqlx drivers so AnyPool can resolve them.
150                    // This is idempotent; safe to call multiple times.
151                    sqlx::any::install_default_drivers();
152
153                    let max_conn = config.max_connections.ok_or_else(|| {
154                        CamelError::Config("max_connections not resolved for SQL pool".into())
155                    })?;
156                    let min_conn = config.min_connections.ok_or_else(|| {
157                        CamelError::Config("min_connections not resolved for SQL pool".into())
158                    })?;
159                    let idle_timeout = config.idle_timeout_secs.ok_or_else(|| {
160                        CamelError::Config("idle_timeout_secs not resolved for SQL pool".into())
161                    })?;
162                    let max_lifetime = config.max_lifetime_secs.ok_or_else(|| {
163                        CamelError::Config("max_lifetime_secs not resolved for SQL pool".into())
164                    })?;
165
166                    info!(
167                        db_url = %redact_db_url(&config.db_url),
168                        "SQL producer pool initializing"
169                    );
170                    let retry_policy = &config.retry;
171                    retry_async::<_, _, _, _, sqlx::Error>(
172                        retry_policy,
173                        Some("sql-producer"),
174                        || {
175                            PoolOptions::new()
176                                .max_connections(max_conn)
177                                .min_connections(min_conn)
178                                .idle_timeout(Duration::from_secs(idle_timeout))
179                                .max_lifetime(Duration::from_secs(max_lifetime))
180                                .connect(&db_url)
181                        },
182                        is_retryable_sqlx_error,
183                    )
184                    .await
185                    .map_err(|e| {
186                        runtime.health().force_unhealthy_for_route(
187                            &route_id,
188                            "g:sql:producer-pool-init",
189                            &e.to_string(),
190                        );
191                        // log-policy: outside-contract
192                        error!(error = %e, db_url = %redact_db_url(&config.db_url), "Failed to connect to database");
193                        CamelError::EndpointCreationFailed(format!(
194                            "Failed to connect to database: {}",
195                            e
196                        ))
197                    })
198                })
199                .await
200                .map_err(|e: CamelError| e.clone())?;
201
202            // Resolve query string
203            let query_str = Self::resolve_query_source(&exchange, &config);
204
205            // SQL-002: warn if Managed transaction mode requested
206            if config.transaction_mode == crate::config::TransactionMode::Managed {
207                warn!("transactionManager not yet implemented; using Auto mode");
208            }
209
210            debug!(
211                query = %query_str,
212                "executing SQL query"
213            );
214
215            // Execute based on mode
216            if config.batch {
217                // Batch mode: execute_batch handles its own template parsing per item
218                execute_batch(pool, &config, &mut exchange).await?;
219            } else if config.use_placeholder {
220                // Placeholder processing enabled (default): parse template, resolve params, apply header override
221                let template = parse_query_template(&query_str, config.placeholder)?;
222                let mut prepared = resolve_params(&template, &exchange, &config.in_separator)?;
223
224                // CamelSql.Parameters header override
225                if let Some(params_value) = exchange.input.header(headers::PARAMETERS) {
226                    if let Some(arr) = params_value.as_array() {
227                        if arr.len() != prepared.bindings.len() {
228                            warn!(
229                                expected = prepared.bindings.len(),
230                                got = arr.len(),
231                                header = headers::PARAMETERS,
232                                "Parameter count mismatch — SQL has {} placeholders but header provides {} values",
233                                prepared.bindings.len(),
234                                arr.len()
235                            );
236                        }
237                        debug!(
238                            "Overriding bindings from {} header with {} parameters",
239                            headers::PARAMETERS,
240                            arr.len()
241                        );
242                        prepared.bindings = arr.clone();
243                    } else {
244                        warn!(
245                            header = headers::PARAMETERS,
246                            "Header is present but not a JSON array — ignoring parameter override"
247                        );
248                    }
249                }
250
251                debug!(
252                    "Executing prepared SQL ({} bindings)",
253                    prepared.bindings.len()
254                );
255
256                if is_select_query(&prepared.sql) {
257                    execute_select(pool, &prepared, &config, &mut exchange).await?;
258                } else {
259                    execute_modify(pool, &prepared, &config, &mut exchange).await?;
260                }
261            } else {
262                // use_placeholder=false: execute query as-is without template parsing
263                debug!("Executing raw SQL (placeholder processing disabled)");
264                let prepared = PreparedQuery {
265                    sql: query_str,
266                    bindings: vec![],
267                };
268
269                if is_select_query(&prepared.sql) {
270                    execute_select(pool, &prepared, &config, &mut exchange).await?;
271                } else {
272                    execute_modify(pool, &prepared, &config, &mut exchange).await?;
273                }
274            }
275
276            Ok(exchange)
277        })
278    }
279}
280
281/// Executes a SELECT query and populates the exchange body with results.
282async fn execute_select(
283    pool: &AnyPool,
284    prepared: &PreparedQuery,
285    config: &SqlEndpointConfig,
286    exchange: &mut Exchange,
287) -> Result<(), CamelError> {
288    match config.output_type {
289        SqlOutputType::SelectOne => {
290            // fetch_all and take first row
291            let mut query = sqlx::query(&prepared.sql);
292            query = bind_json_values(query, &prepared.bindings);
293
294            let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
295                warn!(error = %e, "SQL query failed");
296                CamelError::ProcessorError(format!("Query execution failed: {}", e))
297            })?;
298
299            let count = rows.len();
300            debug!(rows = count, "SQL query completed");
301            let json_rows: Vec<serde_json::Value> = rows
302                .iter()
303                .map(row_to_json)
304                .collect::<Result<Vec<_>, _>>()?;
305
306            if let Some(first_row) = json_rows.into_iter().next() {
307                exchange.input.body = Body::Json(first_row);
308            } else {
309                exchange.input.body = Body::Empty;
310            }
311            debug!("SelectOne returned {} row", if count > 0 { 1 } else { 0 });
312            exchange
313                .input
314                .set_header(headers::ROW_COUNT, serde_json::json!(count));
315        }
316        SqlOutputType::SelectList => {
317            // fetch_all for list output
318            let mut query = sqlx::query(&prepared.sql);
319            query = bind_json_values(query, &prepared.bindings);
320
321            let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
322                warn!(error = %e, "SQL query failed");
323                CamelError::ProcessorError(format!("Query execution failed: {}", e))
324            })?;
325
326            let count = rows.len();
327            debug!(rows = count, "SQL query completed");
328            let json_rows: Vec<serde_json::Value> = rows
329                .iter()
330                .map(row_to_json)
331                .collect::<Result<Vec<_>, _>>()?;
332
333            exchange.input.body = Body::Json(serde_json::Value::Array(json_rows));
334            debug!("SelectList returned {} rows", count);
335            exchange
336                .input
337                .set_header(headers::ROW_COUNT, serde_json::json!(count));
338        }
339        SqlOutputType::StreamList => {
340            // Use fetch() for true streaming - avoids loading all rows into memory
341            use futures::TryStreamExt;
342
343            let pool_clone = pool.clone();
344            let sql_str = prepared.sql.clone();
345            let bindings = prepared.bindings.clone();
346
347            // Build the stream that reads rows on demand and serializes to NDJSON bytes
348            let byte_stream = async_stream::try_stream! {
349                let mut q = sqlx::query(&sql_str);
350                q = bind_json_values(q, &bindings);
351                let mut rows = q.fetch(&pool_clone);
352                while let Some(row) = rows.try_next().await.map_err(|e| {
353                    CamelError::ProcessorError(format!("Query execution failed: {}", e))
354                })? {
355                    let json_val = row_to_json(&row).map_err(|e| {
356                        CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
357                    })?;
358                    let mut bytes = serde_json::to_vec(&json_val)
359                        .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
360                    bytes.push(b'\n');
361                    yield Bytes::from(bytes);
362                }
363            };
364
365            exchange.input.body = Body::Stream(StreamBody {
366                stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
367                metadata: StreamMetadata {
368                    content_type: Some("application/x-ndjson".to_string()),
369                    size_hint: None,
370                    origin: None,
371                },
372            });
373            debug!("StreamList: created lazy stream (rows fetched on demand)");
374            // Note: ROW_COUNT not set for StreamList since row count is unknown until exhausted
375        }
376    }
377
378    Ok(())
379}
380
381/// Executes a modification query (INSERT/UPDATE/DELETE).
382async fn execute_modify(
383    pool: &AnyPool,
384    prepared: &PreparedQuery,
385    config: &SqlEndpointConfig,
386    exchange: &mut Exchange,
387) -> Result<(), CamelError> {
388    let mut query = sqlx::query(&prepared.sql);
389    query = bind_json_values(query, &prepared.bindings);
390
391    let result = query.execute(pool).await.map_err(|e| {
392        warn!(error = %e, "SQL query failed");
393        CamelError::ProcessorError(format!("Query execution failed: {}", e))
394    })?;
395
396    let rows_affected = result.rows_affected();
397
398    // Fix 4: Implement expected_update_count validation
399    if let Some(expected) = config.expected_update_count
400        && rows_affected as i64 != expected
401    {
402        warn!(expected, actual = rows_affected, "Row count mismatch");
403        return Err(CamelError::ProcessorError(format!(
404            "Expected {} rows affected, got {}",
405            expected, rows_affected
406        )));
407    }
408
409    exchange
410        .input
411        .set_header(headers::UPDATE_COUNT, serde_json::json!(rows_affected));
412
413    if config.noop {
414        // Preserve original body
415    } else {
416        exchange.input.body = Body::Json(json!({ "rowsAffected": rows_affected }));
417    }
418
419    debug!(rows = rows_affected, "SQL modify query completed");
420
421    Ok(())
422}
423
424/// Executes a batch of queries from a JSON array body.
425async fn execute_batch(
426    pool: &AnyPool,
427    config: &SqlEndpointConfig,
428    exchange: &mut Exchange,
429) -> Result<(), CamelError> {
430    // Body must be JSON array of arrays
431    let body_json = match &exchange.input.body {
432        Body::Json(val) => val,
433        _ => {
434            return Err(CamelError::ProcessorError(
435                "Batch mode requires body to be a JSON array of arrays".to_string(),
436            ));
437        }
438    };
439
440    let batch_data = body_json
441        .as_array()
442        .ok_or_else(|| {
443            CamelError::ProcessorError("Batch mode requires body to be a JSON array".to_string())
444        })?
445        .clone();
446
447    // Parse template from config query
448    let template = parse_query_template(&config.query, config.placeholder)?;
449
450    // Fix 2: Batch operations must be wrapped in a transaction
451    let mut tx = pool.begin().await.map_err(|e| {
452        // log-policy: handler-owned
453        // (category a: tx begin failure inside producer pipeline)
454        warn!(error = %e, "Failed to begin transaction");
455        CamelError::ProcessorError(format!("Failed to begin transaction: {}", e))
456    })?;
457
458    let mut total_rows_affected: u64 = 0;
459
460    for (batch_idx, params_array) in batch_data.into_iter().enumerate() {
461        // Each item must be an array of parameters
462        params_array.as_array().ok_or_else(|| {
463            CamelError::ProcessorError(format!(
464                "Batch item at index {} must be a JSON array of parameters",
465                batch_idx
466            ))
467        })?;
468
469        // Create a temporary exchange with the params as body for resolution
470        let temp_msg = Message::new(Body::Json(params_array.clone()));
471        let temp_exchange = Exchange::new(temp_msg);
472
473        // Resolve parameters for this batch item
474        let prepared = resolve_params(&template, &temp_exchange, &config.in_separator)?;
475
476        // Execute against transaction
477        let mut query = sqlx::query(&prepared.sql);
478        query = bind_json_values(query, &prepared.bindings);
479
480        let result = query.execute(&mut *tx).await.map_err(|e| {
481            // log-policy: handler-owned
482            // (category a: batch query execution failure inside producer pipeline)
483            warn!("Batch query execution failed at index {}: {}", batch_idx, e);
484            CamelError::ProcessorError(format!("Batch query execution failed: {}", e))
485        })?;
486
487        // Validate expected_update_count per batch item
488        if let Some(expected) = config.expected_update_count
489            && result.rows_affected() as i64 != expected
490        {
491            // log-policy: handler-owned
492            // (category a: expected_update_count mismatch inside producer pipeline)
493            warn!(
494                "Batch item {}: expected {} rows affected, got {}",
495                batch_idx,
496                expected,
497                result.rows_affected()
498            );
499            return Err(CamelError::ProcessorError(format!(
500                "Batch item {}: expected {} rows affected, got {}",
501                batch_idx,
502                expected,
503                result.rows_affected()
504            )));
505        }
506
507        total_rows_affected += result.rows_affected();
508    }
509
510    // Commit the transaction
511    tx.commit().await.map_err(|e| {
512        // log-policy: handler-owned
513        // (category a: tx commit failure inside producer pipeline)
514        warn!(error = %e, "Failed to commit transaction");
515        CamelError::ProcessorError(format!("Failed to commit transaction: {}", e))
516    })?;
517
518    exchange.input.set_header(
519        headers::UPDATE_COUNT,
520        serde_json::json!(total_rows_affected),
521    );
522
523    debug!(
524        "Batch execution completed, total rows affected: {}",
525        total_rows_affected
526    );
527
528    Ok(())
529}
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534    use camel_api::MetricsCollector;
535    use camel_component_api::HealthCheckRegistry;
536    use camel_component_api::test_support::PanicRuntimeObservability;
537    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
538        std::sync::Arc::new(PanicRuntimeObservability)
539    }
540    use camel_component_api::Message;
541    use camel_component_api::UriConfig;
542    use sqlx::any::AnyPoolOptions;
543    use std::sync::Arc;
544    use std::sync::Mutex;
545    use tokio::sync::OnceCell;
546
547    // ── Recording health fixture for ADR-0012 (g) tests ───────────────────
548
549    #[derive(Debug, Default)]
550    struct RecordingHealth {
551        forced: Mutex<Vec<(String, String, String)>>,
552    }
553
554    impl HealthCheckRegistry for RecordingHealth {
555        fn force_unhealthy_for_route(&self, route_id: &str, name: &str, reason: &str) {
556            self.forced.lock().unwrap().push((
557                route_id.to_string(),
558                name.to_string(),
559                reason.to_string(),
560            ));
561        }
562    }
563
564    struct NoopMetrics;
565
566    impl MetricsCollector for NoopMetrics {
567        fn record_exchange_duration(&self, _: &str, _: Duration) {}
568        fn increment_errors(&self, _: &str, _: &str) {}
569        fn increment_exchanges(&self, _: &str) {}
570        fn set_queue_depth(&self, _: &str, _: usize) {}
571        fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
572    }
573
574    struct RecordingRuntime {
575        health: Arc<RecordingHealth>,
576    }
577
578    impl RuntimeObservability for RecordingRuntime {
579        fn metrics(&self) -> Arc<dyn MetricsCollector> {
580            Arc::new(NoopMetrics)
581        }
582        fn health(&self) -> Arc<dyn HealthCheckRegistry> {
583            self.health.clone()
584        }
585    }
586
587    async fn sqlite_pool() -> AnyPool {
588        sqlx::any::install_default_drivers();
589        AnyPoolOptions::new()
590            .max_connections(1)
591            .connect("sqlite::memory:")
592            .await
593            .expect("sqlite pool")
594    }
595
596    async fn seed_items_table(pool: &AnyPool) {
597        sqlx::query(
598            "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, done INTEGER DEFAULT 0)",
599        )
600        .execute(pool)
601        .await
602        .expect("create table");
603        sqlx::query("INSERT INTO items (id, name, done) VALUES (1, 'a', 0), (2, 'b', 0)")
604            .execute(pool)
605            .await
606            .expect("seed rows");
607    }
608
609    fn config() -> SqlEndpointConfig {
610        let mut c =
611            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
612        c.resolve_defaults();
613        c
614    }
615
616    #[test]
617    fn producer_clone_shares_pool() {
618        let p1 = SqlProducer::new(
619            config(),
620            Arc::new(OnceCell::new()),
621            test_rt(),
622            "sql-producer-test-route",
623        );
624        let p2 = p1.clone();
625        assert!(Arc::ptr_eq(&p1.pool, &p2.pool));
626        assert!(Arc::ptr_eq(&p1.stopped, &p2.stopped));
627    }
628
629    #[test]
630    fn resolve_query_from_config() {
631        let config = config();
632        let ex = Exchange::new(Message::default());
633        let q = SqlProducer::resolve_query_source(&ex, &config);
634        assert_eq!(q, "select 1");
635    }
636
637    #[test]
638    fn resolve_query_from_header() {
639        let config = config();
640        let mut msg = Message::default();
641        msg.set_header(headers::QUERY, serde_json::json!("select 2"));
642        let ex = Exchange::new(msg);
643        let q = SqlProducer::resolve_query_source(&ex, &config);
644        assert_eq!(q, "select 2");
645    }
646
647    #[test]
648    fn resolve_query_from_body() {
649        let mut config = config();
650        config.use_message_body_for_sql = true;
651        let msg = Message::new(Body::Text("select 3".to_string()));
652        let ex = Exchange::new(msg);
653        let q = SqlProducer::resolve_query_source(&ex, &config);
654        assert_eq!(q, "select 3");
655    }
656
657    #[test]
658    fn resolve_query_header_priority_over_body() {
659        let mut config = config();
660        config.use_message_body_for_sql = true;
661        let mut msg = Message::new(Body::Text("select from body".to_string()));
662        msg.set_header(headers::QUERY, serde_json::json!("select from header"));
663        let ex = Exchange::new(msg);
664        let q = SqlProducer::resolve_query_source(&ex, &config);
665        assert_eq!(q, "select from header");
666    }
667
668    #[test]
669    fn resolve_query_body_priority_over_config() {
670        let mut config = config();
671        config.use_message_body_for_sql = true;
672        let msg = Message::new(Body::Text("select from body".to_string()));
673        let ex = Exchange::new(msg);
674        let q = SqlProducer::resolve_query_source(&ex, &config);
675        assert_eq!(q, "select from body");
676    }
677
678    #[test]
679    fn bind_json_null() {
680        let query = sqlx::query("SELECT ?");
681        let values = vec![serde_json::Value::Null];
682        let _bound = bind_json_values(query, &values);
683        // Compilation test - ensure it binds
684    }
685
686    #[test]
687    fn bind_json_bool() {
688        let query = sqlx::query("SELECT ?");
689        let values = vec![serde_json::Value::Bool(true)];
690        let _bound = bind_json_values(query, &values);
691    }
692
693    #[test]
694    fn bind_json_number_i64() {
695        let query = sqlx::query("SELECT ?");
696        let values = vec![serde_json::json!(42)];
697        let _bound = bind_json_values(query, &values);
698    }
699
700    #[test]
701    fn bind_json_number_f64() {
702        let query = sqlx::query("SELECT ?");
703        let values = vec![serde_json::json!(std::f64::consts::PI)];
704        let _bound = bind_json_values(query, &values);
705    }
706
707    #[test]
708    fn bind_json_string() {
709        let query = sqlx::query("SELECT ?");
710        let values = vec![serde_json::json!("hello world")];
711        let _bound = bind_json_values(query, &values);
712    }
713
714    #[test]
715    fn bind_json_array() {
716        let query = sqlx::query("SELECT ?");
717        let values = vec![serde_json::json!([1, 2, 3])];
718        let _bound = bind_json_values(query, &values);
719    }
720
721    #[test]
722    fn bind_json_object() {
723        let query = sqlx::query("SELECT ?");
724        let values = vec![serde_json::json!({"key": "value"})];
725        let _bound = bind_json_values(query, &values);
726    }
727
728    #[test]
729    fn bind_multiple_values() {
730        let query = sqlx::query("SELECT ?, ?, ?");
731        let values = vec![
732            serde_json::json!(1),
733            serde_json::json!("test"),
734            serde_json::Value::Null,
735        ];
736        let _bound = bind_json_values(query, &values);
737    }
738
739    // Test for Fix 4: expected_update_count config field presence
740    #[test]
741    fn expected_update_count_validation() {
742        // Test that expected_update_count is parsed from URI
743        let config = SqlEndpointConfig::from_uri(
744            "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=5",
745        )
746        .unwrap();
747        assert_eq!(config.expected_update_count, Some(5));
748
749        // Test default (no expected_update_count)
750        let config_default = self::config();
751        assert_eq!(config_default.expected_update_count, None);
752
753        // Test negative value (should parse)
754        let config_neg = SqlEndpointConfig::from_uri(
755            "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=-1",
756        )
757        .unwrap();
758        assert_eq!(config_neg.expected_update_count, Some(-1));
759    }
760
761    // Test for Fix 3: parameters header override logic
762    #[test]
763    fn parameters_header_override_logic() {
764        // Create a PreparedQuery manually
765        let mut prepared = PreparedQuery {
766            sql: "SELECT * FROM t WHERE id = $1".to_string(),
767            bindings: vec![serde_json::json!(42)],
768        };
769
770        // Simulate the header override logic
771        let header_params = serde_json::json!([99, "extra"]);
772        if let Some(arr) = header_params.as_array() {
773            prepared.bindings = arr.clone();
774        }
775
776        // Verify bindings were overridden
777        assert_eq!(prepared.bindings.len(), 2);
778        assert_eq!(prepared.bindings[0], serde_json::json!(99));
779        assert_eq!(prepared.bindings[1], serde_json::json!("extra"));
780
781        // Test with non-array header (should not override)
782        let mut prepared2 = PreparedQuery {
783            sql: "SELECT * FROM t WHERE id = $1".to_string(),
784            bindings: vec![serde_json::json!(42)],
785        };
786        let header_non_array = serde_json::json!({"not": "an array"});
787        if let Some(arr) = header_non_array.as_array() {
788            prepared2.bindings = arr.clone();
789        }
790        // Should remain unchanged
791        assert_eq!(prepared2.bindings.len(), 1);
792        assert_eq!(prepared2.bindings[0], serde_json::json!(42));
793    }
794
795    #[tokio::test]
796    async fn execute_select_one_sets_body_and_row_count() {
797        let pool = sqlite_pool().await;
798        seed_items_table(&pool).await;
799
800        let mut config = SqlEndpointConfig::from_uri(
801            "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=SelectOne",
802        )
803        .unwrap();
804        config.resolve_defaults();
805
806        let prepared = PreparedQuery {
807            sql: "select id, name from items order by id".to_string(),
808            bindings: vec![],
809        };
810        let mut exchange = Exchange::new(Message::default());
811
812        execute_select(&pool, &prepared, &config, &mut exchange)
813            .await
814            .expect("select one");
815
816        assert_eq!(exchange.input.header(headers::ROW_COUNT), Some(&json!(2)));
817        assert_eq!(
818            exchange.input.body,
819            Body::Json(json!({"id": 1, "name": "a"}))
820        );
821    }
822
823    #[tokio::test]
824    async fn execute_stream_list_materializes_ndjson() {
825        let pool = sqlite_pool().await;
826        seed_items_table(&pool).await;
827
828        let mut config = SqlEndpointConfig::from_uri(
829            "sql:select id from items order by id?db_url=sqlite::memory:&outputType=StreamList",
830        )
831        .unwrap();
832        config.resolve_defaults();
833
834        let prepared = PreparedQuery {
835            sql: "select id from items order by id".to_string(),
836            bindings: vec![],
837        };
838        let mut exchange = Exchange::new(Message::default());
839
840        execute_select(&pool, &prepared, &config, &mut exchange)
841            .await
842            .expect("stream list");
843
844        let bytes = exchange
845            .input
846            .body
847            .clone()
848            .into_bytes(1024)
849            .await
850            .expect("stream bytes");
851        let text = String::from_utf8(bytes.to_vec()).expect("utf8");
852        assert!(text.contains("{\"id\":1}"));
853        assert!(text.contains("{\"id\":2}"));
854        assert_eq!(exchange.input.header(headers::ROW_COUNT), None);
855    }
856
857    #[tokio::test]
858    async fn execute_modify_expected_update_count_mismatch_returns_error() {
859        let pool = sqlite_pool().await;
860        seed_items_table(&pool).await;
861
862        let mut config = SqlEndpointConfig::from_uri(
863            "sql:update items set done=1 where id = #?db_url=sqlite::memory:&expectedUpdateCount=2",
864        )
865        .unwrap();
866        config.resolve_defaults();
867
868        let prepared = PreparedQuery {
869            sql: "update items set done=1 where id = $1".to_string(),
870            bindings: vec![json!(1)],
871        };
872        let mut exchange = Exchange::new(Message::default());
873
874        let err = execute_modify(&pool, &prepared, &config, &mut exchange)
875            .await
876            .expect_err("must fail due expected row count mismatch");
877        assert!(err.to_string().contains("Expected 2 rows affected, got 1"));
878    }
879
880    #[tokio::test]
881    async fn execute_batch_rollback_when_any_item_fails_expected_count() {
882        let pool = sqlite_pool().await;
883        seed_items_table(&pool).await;
884
885        let mut config = SqlEndpointConfig::from_uri(
886            "sql:update items set done=1 where id = #?db_url=sqlite::memory:&batch=true&expectedUpdateCount=1",
887        )
888        .unwrap();
889        config.resolve_defaults();
890
891        let mut exchange = Exchange::new(Message::new(Body::Json(json!([[1], [999]]))));
892
893        let err = execute_batch(&pool, &config, &mut exchange)
894            .await
895            .expect_err("second batch item should fail expectedUpdateCount");
896        assert!(
897            err.to_string()
898                .contains("Batch item 1: expected 1 rows affected, got 0")
899        );
900
901        let row = sqlx::query("select done from items where id = 1")
902            .fetch_one(&pool)
903            .await
904            .expect("query row");
905        let done: i64 = sqlx::Row::try_get(&row, 0).expect("done column");
906        assert_eq!(done, 0, "transaction must rollback first update");
907    }
908
909    // --- Phase B hardening tests ---
910
911    // SQL-001: Direct producer construction without resolve_defaults does not panic.
912    // The producer defensively calls resolve_defaults() during pool init, so the pool
913    // fields get resolved. This test verifies no panic occurs and the pool initializes.
914    #[tokio::test]
915    async fn producer_no_panic_without_prior_resolve_defaults() {
916        // Create config without calling resolve_defaults() — pool fields are None
917        let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
918        assert!(config.max_connections.is_none());
919
920        let mut producer = SqlProducer::new(
921            config,
922            Arc::new(OnceCell::new()),
923            test_rt(),
924            "sql-producer-test-route",
925        );
926        let exchange = Exchange::new(Message::default());
927
928        // Should NOT panic — producer calls resolve_defaults() defensively
929        let result = producer.call(exchange).await;
930        assert!(
931            result.is_ok(),
932            "Producer should initialize pool without panic, got: {:?}",
933            result
934        );
935    }
936
937    // SQL-001: Pool init returns CamelError::Config when pool params cannot be resolved
938    #[tokio::test]
939    async fn producer_pool_init_returns_config_error_for_invalid_db() {
940        // Create config with an invalid db_url that will fail to connect.
941        // Disable retry for this test — the purpose is to verify error handling
942        // on a known-bad connect, not to exercise the retry loop.
943        let mut config = SqlEndpointConfig::from_uri(
944            "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false",
945        )
946        .unwrap();
947        // Set pool params explicitly so resolve_defaults doesn't help with connection
948        config.max_connections = Some(1);
949        config.min_connections = Some(0);
950        config.idle_timeout_secs = Some(300);
951        config.max_lifetime_secs = Some(1800);
952
953        let health = Arc::new(RecordingHealth::default());
954        let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntime {
955            health: health.clone(),
956        });
957        let mut producer = SqlProducer::new(
958            config,
959            Arc::new(OnceCell::new()),
960            rt,
961            "sql-producer-test-route",
962        );
963        let exchange = Exchange::new(Message::default());
964
965        let result = producer.call(exchange).await;
966        assert!(result.is_err());
967        // Error should be EndpointCreationFailed (connection error), not a panic
968        let err_msg = result.unwrap_err().to_string();
969        assert!(
970            err_msg.contains("Failed to connect") || err_msg.contains("database"),
971            "Expected connection error, got: {}",
972            err_msg
973        );
974        // Verify force_unhealthy_for_route was also called
975        let forced = health.forced.lock().unwrap();
976        assert_eq!(
977            forced.len(),
978            1,
979            "expected one force_unhealthy_for_route call"
980        );
981        assert_eq!(forced[0].0, "sql-producer-test-route");
982        assert_eq!(forced[0].1, "g:sql:producer-pool-init");
983    }
984
985    // SQL-007: poll_ready returns Ready (pool lazily initialized on first call)
986    #[test]
987    fn poll_ready_returns_ready_for_uninitialized_pool() {
988        let config = {
989            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
990            c.resolve_defaults();
991            c
992        };
993        let mut producer = SqlProducer::new(
994            config,
995            Arc::new(OnceCell::new()),
996            test_rt(),
997            "sql-producer-test-route",
998        );
999        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1000        let result = producer.poll_ready(&mut cx);
1001        assert!(matches!(result, Poll::Ready(Ok(()))));
1002    }
1003
1004    // SQL-007: poll_ready returns error when stopped flag is set
1005    #[test]
1006    fn poll_ready_returns_error_when_stopped() {
1007        let config = {
1008            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1009            c.resolve_defaults();
1010            c
1011        };
1012        let mut producer = SqlProducer::new(
1013            config,
1014            Arc::new(OnceCell::new()),
1015            test_rt(),
1016            "sql-producer-test-route",
1017        );
1018        producer.stop();
1019        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1020        let result = producer.poll_ready(&mut cx);
1021        assert!(matches!(result, Poll::Ready(Err(_))));
1022        let err_msg = match result {
1023            Poll::Ready(Err(e)) => e.to_string(),
1024            _ => unreachable!(),
1025        };
1026        assert!(err_msg.contains("SQL producer stopped"));
1027    }
1028
1029    // SQL-007: poll_ready returns error when pool is closed
1030    #[tokio::test]
1031    async fn poll_ready_returns_error_when_pool_closed() {
1032        let pool = sqlite_pool().await;
1033        pool.close().await;
1034
1035        let config = {
1036            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1037            c.resolve_defaults();
1038            c
1039        };
1040        let pool_cell = Arc::new(OnceCell::new());
1041        pool_cell.set(pool).unwrap();
1042
1043        let mut producer =
1044            SqlProducer::new(config, pool_cell, test_rt(), "sql-producer-test-route");
1045        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1046        let result = producer.poll_ready(&mut cx);
1047        assert!(matches!(result, Poll::Ready(Err(_))));
1048        let err_msg = match result {
1049            Poll::Ready(Err(e)) => e.to_string(),
1050            _ => unreachable!(),
1051        };
1052        assert!(err_msg.contains("SQL connection pool is closed"));
1053    }
1054
1055    // SQL-007: poll_ready returns Ok for healthy initialized pool
1056    #[tokio::test]
1057    async fn poll_ready_returns_ok_for_healthy_pool() {
1058        let pool = sqlite_pool().await;
1059
1060        let config = {
1061            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1062            c.resolve_defaults();
1063            c
1064        };
1065        let pool_cell = Arc::new(OnceCell::new());
1066        pool_cell.set(pool).unwrap();
1067
1068        let mut producer =
1069            SqlProducer::new(config, pool_cell, test_rt(), "sql-producer-test-route");
1070        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1071        let result = producer.poll_ready(&mut cx);
1072        assert!(matches!(result, Poll::Ready(Ok(()))));
1073    }
1074
1075    // SQL-008: stop() closes the pool
1076    #[tokio::test]
1077    async fn test_sql_stop_closes_pool() {
1078        let pool = sqlite_pool().await;
1079
1080        let config = {
1081            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1082            c.resolve_defaults();
1083            c
1084        };
1085        let pool_cell = Arc::new(OnceCell::new());
1086        pool_cell.set(pool.clone()).unwrap();
1087
1088        let producer = SqlProducer::new(
1089            config,
1090            pool_cell.clone(),
1091            test_rt(),
1092            "sql-producer-test-route",
1093        );
1094        assert!(!pool.is_closed(), "Pool should be open before stop");
1095
1096        producer.stop();
1097
1098        // Allow background close task to complete
1099        tokio::time::sleep(Duration::from_millis(100)).await;
1100
1101        assert!(
1102            pool.is_closed(),
1103            "Pool should be closed after producer.stop()"
1104        );
1105
1106        // Subsequent poll_ready should fail
1107        let mut producer2 = SqlProducer::new(
1108            {
1109                let mut c =
1110                    SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1111                c.resolve_defaults();
1112                c
1113            },
1114            pool_cell.clone(),
1115            test_rt(),
1116            "sql-producer-test-route",
1117        );
1118        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1119        let result = producer2.poll_ready(&mut cx);
1120        assert!(
1121            matches!(result, Poll::Ready(Err(_))),
1122            "poll_ready should fail after pool closed"
1123        );
1124    }
1125
1126    // SQL-004: use_placeholder=false skips template parsing
1127    #[tokio::test]
1128    async fn use_placeholder_false_executes_raw_sql() {
1129        let pool = sqlite_pool().await;
1130        seed_items_table(&pool).await;
1131
1132        let mut config = SqlEndpointConfig::from_uri(
1133            "sql:select id, name from items order by id?db_url=sqlite::memory:&usePlaceholder=false",
1134        )
1135        .unwrap();
1136        config.resolve_defaults();
1137        assert!(!config.use_placeholder);
1138
1139        let mut producer = SqlProducer::new(
1140            config,
1141            Arc::new(OnceCell::new()),
1142            test_rt(),
1143            "sql-producer-test-route",
1144        );
1145        // Pre-initialize the pool so we don't hit the pool init path
1146        producer.pool.set(pool.clone()).unwrap();
1147
1148        let exchange = Exchange::new(Message::default());
1149        let result = producer.call(exchange).await;
1150        assert!(result.is_ok());
1151        let exchange = result.unwrap();
1152        // Should return results, not rowsAffected
1153        assert!(matches!(exchange.input.body, Body::Json(_)));
1154    }
1155
1156    // SQL-004: use_placeholder=true (default) processes placeholders normally
1157    #[tokio::test]
1158    async fn use_placeholder_true_processes_placeholders() {
1159        let pool = sqlite_pool().await;
1160        seed_items_table(&pool).await;
1161
1162        let mut config = SqlEndpointConfig::from_uri(
1163            "sql:select id, name from items where id = #?db_url=sqlite::memory:",
1164        )
1165        .unwrap();
1166        config.resolve_defaults();
1167        assert!(config.use_placeholder);
1168
1169        let mut producer = SqlProducer::new(
1170            config,
1171            Arc::new(OnceCell::new()),
1172            test_rt(),
1173            "sql-producer-test-route",
1174        );
1175        producer.pool.set(pool.clone()).unwrap();
1176
1177        let msg = Message::new(Body::Json(json!([1])));
1178        let exchange = Exchange::new(msg);
1179        let result = producer.call(exchange).await;
1180        assert!(result.is_ok());
1181    }
1182
1183    /// Regression: max_attempts=N → exactly N invocations (caught OpenSearch off-by-one 1f5c4c2a).
1184    /// Replicates the exact retry loop from SqlProducer::start() (producer.rs:161-185):
1185    ///   attempt starts at 0, incremented at top, should_retry(attempt), delay_for(attempt-1)
1186    #[tokio::test]
1187    async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
1188        use camel_component_api::NetworkRetryPolicy;
1189        use std::sync::Arc;
1190        use std::sync::atomic::{AtomicU32, Ordering};
1191
1192        let policy = NetworkRetryPolicy {
1193            max_attempts: 3,
1194            initial_delay: Duration::from_millis(1),
1195            max_delay: Duration::from_millis(1),
1196            multiplier: 1.0,
1197            ..NetworkRetryPolicy::default()
1198        };
1199
1200        let calls = Arc::new(AtomicU32::new(0));
1201        let calls_clone = Arc::clone(&calls);
1202
1203        let mut attempt: u32 = 0;
1204        let _result: Result<(), ()> = loop {
1205            attempt += 1;
1206            calls_clone.fetch_add(1, Ordering::SeqCst);
1207            let op_result: Result<(), ()> = Err(());
1208            match op_result {
1209                Ok(v) => break Ok(v),
1210                Err(_) if policy.should_retry(attempt) => {
1211                    let delay = policy.delay_for(attempt - 1);
1212                    tokio::time::sleep(delay).await;
1213                    continue;
1214                }
1215                Err(_) => break Err(()),
1216            }
1217        };
1218
1219        assert_eq!(
1220            calls.load(Ordering::SeqCst),
1221            3,
1222            "max_attempts=3 must yield exactly 3 invocations"
1223        );
1224    }
1225
1226    // ── ADR-0012 (g) regression tests ──────────────────────────────────
1227
1228    /// Regression: producer pool init failure calls force_unhealthy_for_route
1229    /// with correct route_id + name "g:sql:producer-pool-init" + non-empty reason.
1230    #[tokio::test]
1231    async fn producer_pool_init_failure_calls_force_unhealthy_for_route() {
1232        let health = Arc::new(RecordingHealth::default());
1233        let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntime {
1234            health: health.clone(),
1235        });
1236
1237        let mut config = SqlEndpointConfig::from_uri(
1238            "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false",
1239        )
1240        .unwrap();
1241        config.max_connections = Some(1);
1242        config.min_connections = Some(0);
1243        config.idle_timeout_secs = Some(300);
1244        config.max_lifetime_secs = Some(1800);
1245
1246        let mut producer = SqlProducer::new(
1247            config,
1248            Arc::new(OnceCell::new()),
1249            rt,
1250            "sql-producer-test-route",
1251        );
1252        let exchange = Exchange::new(Message::default());
1253
1254        let result = producer.call(exchange).await;
1255        assert!(result.is_err());
1256
1257        let forced = health.forced.lock().unwrap();
1258        assert_eq!(
1259            forced.len(),
1260            1,
1261            "expected one force_unhealthy_for_route call"
1262        );
1263        assert_eq!(forced[0].0, "sql-producer-test-route");
1264        assert_eq!(forced[0].1, "g:sql:producer-pool-init");
1265        assert!(!forced[0].2.is_empty(), "reason should be non-empty");
1266    }
1267}