Skip to main content

camel_component_sql/
producer.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use bytes::Bytes;
9use serde_json::json;
10use sqlx::AnyPool;
11use sqlx::any::AnyRow;
12use sqlx::pool::PoolOptions;
13use tokio::sync::OnceCell;
14use tower::Service;
15use tracing::{debug, error, info, warn};
16
17use crate::config::{SqlEndpointConfig, SqlOutputType, enrich_db_url_with_ssl, redact_db_url};
18use crate::headers;
19use crate::query::{PreparedQuery, is_select_query, parse_query_template, resolve_params};
20use crate::utils::{bind_json_values, row_to_json};
21use camel_component_api::{Body, CamelError, Exchange, Message, StreamBody, StreamMetadata};
22
23#[derive(Clone)]
24pub struct SqlProducer {
25    pub(crate) config: SqlEndpointConfig,
26    pub(crate) pool: Arc<OnceCell<AnyPool>>,
27    pub(crate) stopped: Arc<AtomicBool>,
28}
29
30impl SqlProducer {
31    pub fn new(config: SqlEndpointConfig, pool: Arc<OnceCell<AnyPool>>) -> Self {
32        Self {
33            config,
34            pool,
35            stopped: Arc::new(AtomicBool::new(false)),
36        }
37    }
38
39    pub fn stop(&self) {
40        self.stopped.store(true, Ordering::Relaxed);
41    }
42
43    /// Resolves the query source based on priority:
44    /// 1. Header `CamelSql.Query`
45    /// 2. Body (if `use_message_body_for_sql` is true)
46    /// 3. Config query
47    pub(crate) fn resolve_query_source(exchange: &Exchange, config: &SqlEndpointConfig) -> String {
48        // Priority 1: Header
49        if let Some(query_value) = exchange.input.header(headers::QUERY)
50            && let Some(query_str) = query_value.as_str()
51        {
52            return query_str.to_string();
53        }
54
55        // Priority 2: Body (if use_message_body_for_sql)
56        if config.use_message_body_for_sql
57            && let Some(body_text) = exchange.input.body.as_text()
58        {
59            return body_text.to_string();
60        }
61
62        // Priority 3: Config query
63        config.query.clone()
64    }
65}
66
67impl Service<Exchange> for SqlProducer {
68    type Response = Exchange;
69    type Error = CamelError;
70    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
71
72    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73        if self.stopped.load(Ordering::Relaxed) {
74            return Poll::Ready(Err(CamelError::ProcessorError(
75                "SQL producer stopped".into(),
76            )));
77        }
78        if let Some(pool) = self.pool.get()
79            && pool.is_closed()
80        {
81            return Poll::Ready(Err(CamelError::ProcessorError(
82                "SQL connection pool is closed".into(),
83            )));
84        }
85        Poll::Ready(Ok(()))
86    }
87
88    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
89        let mut config = self.config.clone();
90        let pool_cell = Arc::clone(&self.pool);
91
92        Box::pin(async move {
93            // Get or initialize the connection pool
94            let pool: &AnyPool = pool_cell
95                .get_or_try_init(|| async {
96                    // Defensive: ensure config is resolved even if caller didn't use create_endpoint
97                    config.resolve_defaults();
98                    let db_url = enrich_db_url_with_ssl(&config.db_url, &config)?;
99
100                    // Install all compiled-in sqlx drivers so AnyPool can resolve them.
101                    // This is idempotent; safe to call multiple times.
102                    sqlx::any::install_default_drivers();
103
104                    let max_conn = config.max_connections.ok_or_else(|| {
105                        CamelError::Config("max_connections not resolved for SQL pool".into())
106                    })?;
107                    let min_conn = config.min_connections.ok_or_else(|| {
108                        CamelError::Config("min_connections not resolved for SQL pool".into())
109                    })?;
110                    let idle_timeout = config.idle_timeout_secs.ok_or_else(|| {
111                        CamelError::Config("idle_timeout_secs not resolved for SQL pool".into())
112                    })?;
113                    let max_lifetime = config.max_lifetime_secs.ok_or_else(|| {
114                        CamelError::Config("max_lifetime_secs not resolved for SQL pool".into())
115                    })?;
116
117                    let opts: PoolOptions<sqlx::Any> = PoolOptions::new()
118                        .max_connections(max_conn)
119                        .min_connections(min_conn)
120                        .idle_timeout(Duration::from_secs(idle_timeout))
121                        .max_lifetime(Duration::from_secs(max_lifetime));
122
123                    info!(
124                        db_url = %redact_db_url(&config.db_url),
125                        "SQL producer pool initializing"
126                    );
127                    opts.connect(&db_url).await.map_err(|e| {
128                        error!(error = %e, db_url = %redact_db_url(&config.db_url), "Failed to connect to database");
129                        CamelError::EndpointCreationFailed(format!(
130                            "Failed to connect to database: {}",
131                            e
132                        ))
133                    })
134                })
135                .await
136                .map_err(|e: CamelError| {
137                    error!("SQL producer pool initialization failed: {}", e);
138                    e.clone()
139                })?;
140
141            // Resolve query string
142            let query_str = Self::resolve_query_source(&exchange, &config);
143
144            debug!(
145                "Executing SQL query (config query length: {})",
146                query_str.len()
147            );
148
149            // Execute based on mode
150            if config.batch {
151                // Batch mode: execute_batch handles its own template parsing per item
152                execute_batch(pool, &config, &mut exchange).await?;
153            } else if config.use_placeholder {
154                // Placeholder processing enabled (default): parse template, resolve params, apply header override
155                let template = parse_query_template(&query_str, config.placeholder)?;
156                let mut prepared = resolve_params(&template, &exchange, &config.in_separator)?;
157
158                // CamelSql.Parameters header override
159                if let Some(params_value) = exchange.input.header(headers::PARAMETERS) {
160                    if let Some(arr) = params_value.as_array() {
161                        if arr.len() != prepared.bindings.len() {
162                            warn!(
163                                expected = prepared.bindings.len(),
164                                got = arr.len(),
165                                header = headers::PARAMETERS,
166                                "Parameter count mismatch — SQL has {} placeholders but header provides {} values",
167                                prepared.bindings.len(),
168                                arr.len()
169                            );
170                        }
171                        debug!(
172                            "Overriding bindings from {} header with {} parameters",
173                            headers::PARAMETERS,
174                            arr.len()
175                        );
176                        prepared.bindings = arr.clone();
177                    } else {
178                        warn!(
179                            header = headers::PARAMETERS,
180                            "Header is present but not a JSON array — ignoring parameter override"
181                        );
182                    }
183                }
184
185                debug!(
186                    "Executing prepared SQL ({} bindings)",
187                    prepared.bindings.len()
188                );
189
190                if is_select_query(&prepared.sql) {
191                    execute_select(pool, &prepared, &config, &mut exchange).await?;
192                } else {
193                    execute_modify(pool, &prepared, &config, &mut exchange).await?;
194                }
195            } else {
196                // use_placeholder=false: execute query as-is without template parsing
197                debug!("Executing raw SQL (placeholder processing disabled)");
198                let prepared = PreparedQuery {
199                    sql: query_str,
200                    bindings: vec![],
201                };
202
203                if is_select_query(&prepared.sql) {
204                    execute_select(pool, &prepared, &config, &mut exchange).await?;
205                } else {
206                    execute_modify(pool, &prepared, &config, &mut exchange).await?;
207                }
208            }
209
210            Ok(exchange)
211        })
212    }
213}
214
215/// Executes a SELECT query and populates the exchange body with results.
216async fn execute_select(
217    pool: &AnyPool,
218    prepared: &PreparedQuery,
219    config: &SqlEndpointConfig,
220    exchange: &mut Exchange,
221) -> Result<(), CamelError> {
222    match config.output_type {
223        SqlOutputType::SelectOne => {
224            // fetch_all and take first row
225            let mut query = sqlx::query(&prepared.sql);
226            query = bind_json_values(query, &prepared.bindings);
227
228            let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
229                error!("Query execution failed: {}", e);
230                CamelError::ProcessorError(format!("Query execution failed: {}", e))
231            })?;
232
233            let count = rows.len();
234            let json_rows: Vec<serde_json::Value> = rows
235                .iter()
236                .map(row_to_json)
237                .collect::<Result<Vec<_>, _>>()?;
238
239            if let Some(first_row) = json_rows.into_iter().next() {
240                exchange.input.body = Body::Json(first_row);
241            } else {
242                exchange.input.body = Body::Empty;
243            }
244            debug!("SelectOne returned {} row", if count > 0 { 1 } else { 0 });
245            exchange
246                .input
247                .set_header(headers::ROW_COUNT, serde_json::json!(count));
248        }
249        SqlOutputType::SelectList => {
250            // fetch_all for list output
251            let mut query = sqlx::query(&prepared.sql);
252            query = bind_json_values(query, &prepared.bindings);
253
254            let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
255                error!("Query execution failed: {}", e);
256                CamelError::ProcessorError(format!("Query execution failed: {}", e))
257            })?;
258
259            let count = rows.len();
260            let json_rows: Vec<serde_json::Value> = rows
261                .iter()
262                .map(row_to_json)
263                .collect::<Result<Vec<_>, _>>()?;
264
265            exchange.input.body = Body::Json(serde_json::Value::Array(json_rows));
266            debug!("SelectList returned {} rows", count);
267            exchange
268                .input
269                .set_header(headers::ROW_COUNT, serde_json::json!(count));
270        }
271        SqlOutputType::StreamList => {
272            // Use fetch() for true streaming - avoids loading all rows into memory
273            use futures::TryStreamExt;
274
275            let pool_clone = pool.clone();
276            let sql_str = prepared.sql.clone();
277            let bindings = prepared.bindings.clone();
278
279            // Build the stream that reads rows on demand and serializes to NDJSON bytes
280            let byte_stream = async_stream::try_stream! {
281                let mut q = sqlx::query(&sql_str);
282                q = bind_json_values(q, &bindings);
283                let mut rows = q.fetch(&pool_clone);
284                while let Some(row) = rows.try_next().await.map_err(|e| {
285                    CamelError::ProcessorError(format!("Query execution failed: {}", e))
286                })? {
287                    let json_val = row_to_json(&row).map_err(|e| {
288                        CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
289                    })?;
290                    let mut bytes = serde_json::to_vec(&json_val)
291                        .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
292                    bytes.push(b'\n');
293                    yield Bytes::from(bytes);
294                }
295            };
296
297            exchange.input.body = Body::Stream(StreamBody {
298                stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
299                metadata: StreamMetadata {
300                    content_type: Some("application/x-ndjson".to_string()),
301                    size_hint: None,
302                    origin: None,
303                },
304            });
305            debug!("StreamList: created lazy stream (rows fetched on demand)");
306            // Note: ROW_COUNT not set for StreamList since row count is unknown until exhausted
307        }
308    }
309
310    Ok(())
311}
312
313/// Executes a modification query (INSERT/UPDATE/DELETE).
314async fn execute_modify(
315    pool: &AnyPool,
316    prepared: &PreparedQuery,
317    config: &SqlEndpointConfig,
318    exchange: &mut Exchange,
319) -> Result<(), CamelError> {
320    let mut query = sqlx::query(&prepared.sql);
321    query = bind_json_values(query, &prepared.bindings);
322
323    let result = query.execute(pool).await.map_err(|e| {
324        error!("Query execution failed: {}", e);
325        CamelError::ProcessorError(format!("Query execution failed: {}", e))
326    })?;
327
328    let rows_affected = result.rows_affected();
329
330    // Fix 4: Implement expected_update_count validation
331    if let Some(expected) = config.expected_update_count
332        && rows_affected as i64 != expected
333    {
334        error!("Expected {} rows affected, got {}", expected, rows_affected);
335        return Err(CamelError::ProcessorError(format!(
336            "Expected {} rows affected, got {}",
337            expected, rows_affected
338        )));
339    }
340
341    exchange
342        .input
343        .set_header(headers::UPDATE_COUNT, serde_json::json!(rows_affected));
344
345    if config.noop {
346        // Preserve original body
347    } else {
348        exchange.input.body = Body::Json(json!({ "rowsAffected": rows_affected }));
349    }
350
351    debug!("Modify query affected {} rows", rows_affected);
352
353    Ok(())
354}
355
356/// Executes a batch of queries from a JSON array body.
357async fn execute_batch(
358    pool: &AnyPool,
359    config: &SqlEndpointConfig,
360    exchange: &mut Exchange,
361) -> Result<(), CamelError> {
362    // Body must be JSON array of arrays
363    let body_json = match &exchange.input.body {
364        Body::Json(val) => val,
365        _ => {
366            return Err(CamelError::ProcessorError(
367                "Batch mode requires body to be a JSON array of arrays".to_string(),
368            ));
369        }
370    };
371
372    let batch_data = body_json
373        .as_array()
374        .ok_or_else(|| {
375            CamelError::ProcessorError("Batch mode requires body to be a JSON array".to_string())
376        })?
377        .clone();
378
379    // Parse template from config query
380    let template = parse_query_template(&config.query, config.placeholder)?;
381
382    // Fix 2: Batch operations must be wrapped in a transaction
383    let mut tx = pool.begin().await.map_err(|e| {
384        error!("Failed to begin transaction: {}", e);
385        CamelError::ProcessorError(format!("Failed to begin transaction: {}", e))
386    })?;
387
388    let mut total_rows_affected: u64 = 0;
389
390    for (batch_idx, params_array) in batch_data.into_iter().enumerate() {
391        // Each item must be an array of parameters
392        params_array.as_array().ok_or_else(|| {
393            CamelError::ProcessorError(format!(
394                "Batch item at index {} must be a JSON array of parameters",
395                batch_idx
396            ))
397        })?;
398
399        // Create a temporary exchange with the params as body for resolution
400        let temp_msg = Message::new(Body::Json(params_array.clone()));
401        let temp_exchange = Exchange::new(temp_msg);
402
403        // Resolve parameters for this batch item
404        let prepared = resolve_params(&template, &temp_exchange, &config.in_separator)?;
405
406        // Execute against transaction
407        let mut query = sqlx::query(&prepared.sql);
408        query = bind_json_values(query, &prepared.bindings);
409
410        let result = query.execute(&mut *tx).await.map_err(|e| {
411            error!("Batch query execution failed at index {}: {}", batch_idx, e);
412            CamelError::ProcessorError(format!("Batch query execution failed: {}", e))
413        })?;
414
415        // Validate expected_update_count per batch item
416        if let Some(expected) = config.expected_update_count
417            && result.rows_affected() as i64 != expected
418        {
419            error!(
420                "Batch item {}: expected {} rows affected, got {}",
421                batch_idx,
422                expected,
423                result.rows_affected()
424            );
425            return Err(CamelError::ProcessorError(format!(
426                "Batch item {}: expected {} rows affected, got {}",
427                batch_idx,
428                expected,
429                result.rows_affected()
430            )));
431        }
432
433        total_rows_affected += result.rows_affected();
434    }
435
436    // Commit the transaction
437    tx.commit().await.map_err(|e| {
438        error!("Failed to commit transaction: {}", e);
439        CamelError::ProcessorError(format!("Failed to commit transaction: {}", e))
440    })?;
441
442    exchange.input.set_header(
443        headers::UPDATE_COUNT,
444        serde_json::json!(total_rows_affected),
445    );
446
447    debug!(
448        "Batch execution completed, total rows affected: {}",
449        total_rows_affected
450    );
451
452    Ok(())
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458    use camel_component_api::Message;
459    use camel_component_api::UriConfig;
460    use sqlx::any::AnyPoolOptions;
461    use std::sync::Arc;
462    use tokio::sync::OnceCell;
463
464    async fn sqlite_pool() -> AnyPool {
465        sqlx::any::install_default_drivers();
466        AnyPoolOptions::new()
467            .max_connections(1)
468            .connect("sqlite::memory:")
469            .await
470            .expect("sqlite pool")
471    }
472
473    async fn seed_items_table(pool: &AnyPool) {
474        sqlx::query(
475            "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, done INTEGER DEFAULT 0)",
476        )
477        .execute(pool)
478        .await
479        .expect("create table");
480        sqlx::query("INSERT INTO items (id, name, done) VALUES (1, 'a', 0), (2, 'b', 0)")
481            .execute(pool)
482            .await
483            .expect("seed rows");
484    }
485
486    fn config() -> SqlEndpointConfig {
487        let mut c =
488            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
489        c.resolve_defaults();
490        c
491    }
492
493    #[test]
494    fn producer_clone_shares_pool() {
495        let p1 = SqlProducer::new(config(), Arc::new(OnceCell::new()));
496        let p2 = p1.clone();
497        assert!(Arc::ptr_eq(&p1.pool, &p2.pool));
498        assert!(Arc::ptr_eq(&p1.stopped, &p2.stopped));
499    }
500
501    #[test]
502    fn resolve_query_from_config() {
503        let config = config();
504        let ex = Exchange::new(Message::default());
505        let q = SqlProducer::resolve_query_source(&ex, &config);
506        assert_eq!(q, "select 1");
507    }
508
509    #[test]
510    fn resolve_query_from_header() {
511        let config = config();
512        let mut msg = Message::default();
513        msg.set_header(headers::QUERY, serde_json::json!("select 2"));
514        let ex = Exchange::new(msg);
515        let q = SqlProducer::resolve_query_source(&ex, &config);
516        assert_eq!(q, "select 2");
517    }
518
519    #[test]
520    fn resolve_query_from_body() {
521        let mut config = config();
522        config.use_message_body_for_sql = true;
523        let msg = Message::new(Body::Text("select 3".to_string()));
524        let ex = Exchange::new(msg);
525        let q = SqlProducer::resolve_query_source(&ex, &config);
526        assert_eq!(q, "select 3");
527    }
528
529    #[test]
530    fn resolve_query_header_priority_over_body() {
531        let mut config = config();
532        config.use_message_body_for_sql = true;
533        let mut msg = Message::new(Body::Text("select from body".to_string()));
534        msg.set_header(headers::QUERY, serde_json::json!("select from header"));
535        let ex = Exchange::new(msg);
536        let q = SqlProducer::resolve_query_source(&ex, &config);
537        assert_eq!(q, "select from header");
538    }
539
540    #[test]
541    fn resolve_query_body_priority_over_config() {
542        let mut config = config();
543        config.use_message_body_for_sql = true;
544        let msg = Message::new(Body::Text("select from body".to_string()));
545        let ex = Exchange::new(msg);
546        let q = SqlProducer::resolve_query_source(&ex, &config);
547        assert_eq!(q, "select from body");
548    }
549
550    #[test]
551    fn bind_json_null() {
552        let query = sqlx::query("SELECT ?");
553        let values = vec![serde_json::Value::Null];
554        let _bound = bind_json_values(query, &values);
555        // Compilation test - ensure it binds
556    }
557
558    #[test]
559    fn bind_json_bool() {
560        let query = sqlx::query("SELECT ?");
561        let values = vec![serde_json::Value::Bool(true)];
562        let _bound = bind_json_values(query, &values);
563    }
564
565    #[test]
566    fn bind_json_number_i64() {
567        let query = sqlx::query("SELECT ?");
568        let values = vec![serde_json::json!(42)];
569        let _bound = bind_json_values(query, &values);
570    }
571
572    #[test]
573    fn bind_json_number_f64() {
574        let query = sqlx::query("SELECT ?");
575        let values = vec![serde_json::json!(std::f64::consts::PI)];
576        let _bound = bind_json_values(query, &values);
577    }
578
579    #[test]
580    fn bind_json_string() {
581        let query = sqlx::query("SELECT ?");
582        let values = vec![serde_json::json!("hello world")];
583        let _bound = bind_json_values(query, &values);
584    }
585
586    #[test]
587    fn bind_json_array() {
588        let query = sqlx::query("SELECT ?");
589        let values = vec![serde_json::json!([1, 2, 3])];
590        let _bound = bind_json_values(query, &values);
591    }
592
593    #[test]
594    fn bind_json_object() {
595        let query = sqlx::query("SELECT ?");
596        let values = vec![serde_json::json!({"key": "value"})];
597        let _bound = bind_json_values(query, &values);
598    }
599
600    #[test]
601    fn bind_multiple_values() {
602        let query = sqlx::query("SELECT ?, ?, ?");
603        let values = vec![
604            serde_json::json!(1),
605            serde_json::json!("test"),
606            serde_json::Value::Null,
607        ];
608        let _bound = bind_json_values(query, &values);
609    }
610
611    // Test for Fix 4: expected_update_count config field presence
612    #[test]
613    fn expected_update_count_validation() {
614        // Test that expected_update_count is parsed from URI
615        let config = SqlEndpointConfig::from_uri(
616            "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=5",
617        )
618        .unwrap();
619        assert_eq!(config.expected_update_count, Some(5));
620
621        // Test default (no expected_update_count)
622        let config_default = self::config();
623        assert_eq!(config_default.expected_update_count, None);
624
625        // Test negative value (should parse)
626        let config_neg = SqlEndpointConfig::from_uri(
627            "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=-1",
628        )
629        .unwrap();
630        assert_eq!(config_neg.expected_update_count, Some(-1));
631    }
632
633    // Test for Fix 3: parameters header override logic
634    #[test]
635    fn parameters_header_override_logic() {
636        // Create a PreparedQuery manually
637        let mut prepared = PreparedQuery {
638            sql: "SELECT * FROM t WHERE id = $1".to_string(),
639            bindings: vec![serde_json::json!(42)],
640        };
641
642        // Simulate the header override logic
643        let header_params = serde_json::json!([99, "extra"]);
644        if let Some(arr) = header_params.as_array() {
645            prepared.bindings = arr.clone();
646        }
647
648        // Verify bindings were overridden
649        assert_eq!(prepared.bindings.len(), 2);
650        assert_eq!(prepared.bindings[0], serde_json::json!(99));
651        assert_eq!(prepared.bindings[1], serde_json::json!("extra"));
652
653        // Test with non-array header (should not override)
654        let mut prepared2 = PreparedQuery {
655            sql: "SELECT * FROM t WHERE id = $1".to_string(),
656            bindings: vec![serde_json::json!(42)],
657        };
658        let header_non_array = serde_json::json!({"not": "an array"});
659        if let Some(arr) = header_non_array.as_array() {
660            prepared2.bindings = arr.clone();
661        }
662        // Should remain unchanged
663        assert_eq!(prepared2.bindings.len(), 1);
664        assert_eq!(prepared2.bindings[0], serde_json::json!(42));
665    }
666
667    #[tokio::test]
668    async fn execute_select_one_sets_body_and_row_count() {
669        let pool = sqlite_pool().await;
670        seed_items_table(&pool).await;
671
672        let mut config = SqlEndpointConfig::from_uri(
673            "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=SelectOne",
674        )
675        .unwrap();
676        config.resolve_defaults();
677
678        let prepared = PreparedQuery {
679            sql: "select id, name from items order by id".to_string(),
680            bindings: vec![],
681        };
682        let mut exchange = Exchange::new(Message::default());
683
684        execute_select(&pool, &prepared, &config, &mut exchange)
685            .await
686            .expect("select one");
687
688        assert_eq!(exchange.input.header(headers::ROW_COUNT), Some(&json!(2)));
689        assert_eq!(
690            exchange.input.body,
691            Body::Json(json!({"id": 1, "name": "a"}))
692        );
693    }
694
695    #[tokio::test]
696    async fn execute_stream_list_materializes_ndjson() {
697        let pool = sqlite_pool().await;
698        seed_items_table(&pool).await;
699
700        let mut config = SqlEndpointConfig::from_uri(
701            "sql:select id from items order by id?db_url=sqlite::memory:&outputType=StreamList",
702        )
703        .unwrap();
704        config.resolve_defaults();
705
706        let prepared = PreparedQuery {
707            sql: "select id from items order by id".to_string(),
708            bindings: vec![],
709        };
710        let mut exchange = Exchange::new(Message::default());
711
712        execute_select(&pool, &prepared, &config, &mut exchange)
713            .await
714            .expect("stream list");
715
716        let bytes = exchange
717            .input
718            .body
719            .clone()
720            .into_bytes(1024)
721            .await
722            .expect("stream bytes");
723        let text = String::from_utf8(bytes.to_vec()).expect("utf8");
724        assert!(text.contains("{\"id\":1}"));
725        assert!(text.contains("{\"id\":2}"));
726        assert_eq!(exchange.input.header(headers::ROW_COUNT), None);
727    }
728
729    #[tokio::test]
730    async fn execute_modify_expected_update_count_mismatch_returns_error() {
731        let pool = sqlite_pool().await;
732        seed_items_table(&pool).await;
733
734        let mut config = SqlEndpointConfig::from_uri(
735            "sql:update items set done=1 where id = #?db_url=sqlite::memory:&expectedUpdateCount=2",
736        )
737        .unwrap();
738        config.resolve_defaults();
739
740        let prepared = PreparedQuery {
741            sql: "update items set done=1 where id = $1".to_string(),
742            bindings: vec![json!(1)],
743        };
744        let mut exchange = Exchange::new(Message::default());
745
746        let err = execute_modify(&pool, &prepared, &config, &mut exchange)
747            .await
748            .expect_err("must fail due expected row count mismatch");
749        assert!(err.to_string().contains("Expected 2 rows affected, got 1"));
750    }
751
752    #[tokio::test]
753    async fn execute_batch_rollback_when_any_item_fails_expected_count() {
754        let pool = sqlite_pool().await;
755        seed_items_table(&pool).await;
756
757        let mut config = SqlEndpointConfig::from_uri(
758            "sql:update items set done=1 where id = #?db_url=sqlite::memory:&batch=true&expectedUpdateCount=1",
759        )
760        .unwrap();
761        config.resolve_defaults();
762
763        let mut exchange = Exchange::new(Message::new(Body::Json(json!([[1], [999]]))));
764
765        let err = execute_batch(&pool, &config, &mut exchange)
766            .await
767            .expect_err("second batch item should fail expectedUpdateCount");
768        assert!(
769            err.to_string()
770                .contains("Batch item 1: expected 1 rows affected, got 0")
771        );
772
773        let row = sqlx::query("select done from items where id = 1")
774            .fetch_one(&pool)
775            .await
776            .expect("query row");
777        let done: i64 = sqlx::Row::try_get(&row, 0).expect("done column");
778        assert_eq!(done, 0, "transaction must rollback first update");
779    }
780
781    // --- Phase B hardening tests ---
782
783    // SQL-001: Direct producer construction without resolve_defaults does not panic.
784    // The producer defensively calls resolve_defaults() during pool init, so the pool
785    // fields get resolved. This test verifies no panic occurs and the pool initializes.
786    #[tokio::test]
787    async fn producer_no_panic_without_prior_resolve_defaults() {
788        // Create config without calling resolve_defaults() — pool fields are None
789        let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
790        assert!(config.max_connections.is_none());
791
792        let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()));
793        let exchange = Exchange::new(Message::default());
794
795        // Should NOT panic — producer calls resolve_defaults() defensively
796        let result = producer.call(exchange).await;
797        assert!(
798            result.is_ok(),
799            "Producer should initialize pool without panic, got: {:?}",
800            result
801        );
802    }
803
804    // SQL-001: Pool init returns CamelError::Config when pool params cannot be resolved
805    #[tokio::test]
806    async fn producer_pool_init_returns_config_error_for_invalid_db() {
807        // Create config with an invalid db_url that will fail to connect
808        let mut config = SqlEndpointConfig::from_uri(
809            "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db",
810        )
811        .unwrap();
812        // Set pool params explicitly so resolve_defaults doesn't help with connection
813        config.max_connections = Some(1);
814        config.min_connections = Some(0);
815        config.idle_timeout_secs = Some(300);
816        config.max_lifetime_secs = Some(1800);
817
818        let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()));
819        let exchange = Exchange::new(Message::default());
820
821        let result = producer.call(exchange).await;
822        assert!(result.is_err());
823        // Error should be EndpointCreationFailed (connection error), not a panic
824        let err_msg = result.unwrap_err().to_string();
825        assert!(
826            err_msg.contains("Failed to connect") || err_msg.contains("database"),
827            "Expected connection error, got: {}",
828            err_msg
829        );
830    }
831
832    // SQL-007: poll_ready returns Ready (pool lazily initialized on first call)
833    #[test]
834    fn poll_ready_returns_ready_for_uninitialized_pool() {
835        let config = {
836            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
837            c.resolve_defaults();
838            c
839        };
840        let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()));
841        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
842        let result = producer.poll_ready(&mut cx);
843        assert!(matches!(result, Poll::Ready(Ok(()))));
844    }
845
846    // SQL-007: poll_ready returns error when stopped flag is set
847    #[test]
848    fn poll_ready_returns_error_when_stopped() {
849        let config = {
850            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
851            c.resolve_defaults();
852            c
853        };
854        let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()));
855        producer.stop();
856        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
857        let result = producer.poll_ready(&mut cx);
858        assert!(matches!(result, Poll::Ready(Err(_))));
859        let err_msg = match result {
860            Poll::Ready(Err(e)) => e.to_string(),
861            _ => unreachable!(),
862        };
863        assert!(err_msg.contains("SQL producer stopped"));
864    }
865
866    // SQL-007: poll_ready returns error when pool is closed
867    #[tokio::test]
868    async fn poll_ready_returns_error_when_pool_closed() {
869        let pool = sqlite_pool().await;
870        pool.close().await;
871
872        let config = {
873            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
874            c.resolve_defaults();
875            c
876        };
877        let pool_cell = Arc::new(OnceCell::new());
878        pool_cell.set(pool).unwrap();
879
880        let mut producer = SqlProducer::new(config, pool_cell);
881        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
882        let result = producer.poll_ready(&mut cx);
883        assert!(matches!(result, Poll::Ready(Err(_))));
884        let err_msg = match result {
885            Poll::Ready(Err(e)) => e.to_string(),
886            _ => unreachable!(),
887        };
888        assert!(err_msg.contains("SQL connection pool is closed"));
889    }
890
891    // SQL-007: poll_ready returns Ok for healthy initialized pool
892    #[tokio::test]
893    async fn poll_ready_returns_ok_for_healthy_pool() {
894        let pool = sqlite_pool().await;
895
896        let config = {
897            let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
898            c.resolve_defaults();
899            c
900        };
901        let pool_cell = Arc::new(OnceCell::new());
902        pool_cell.set(pool).unwrap();
903
904        let mut producer = SqlProducer::new(config, pool_cell);
905        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
906        let result = producer.poll_ready(&mut cx);
907        assert!(matches!(result, Poll::Ready(Ok(()))));
908    }
909
910    // SQL-004: use_placeholder=false skips template parsing
911    #[tokio::test]
912    async fn use_placeholder_false_executes_raw_sql() {
913        let pool = sqlite_pool().await;
914        seed_items_table(&pool).await;
915
916        let mut config = SqlEndpointConfig::from_uri(
917            "sql:select id, name from items order by id?db_url=sqlite::memory:&usePlaceholder=false",
918        )
919        .unwrap();
920        config.resolve_defaults();
921        assert!(!config.use_placeholder);
922
923        let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()));
924        // Pre-initialize the pool so we don't hit the pool init path
925        producer.pool.set(pool.clone()).unwrap();
926
927        let exchange = Exchange::new(Message::default());
928        let result = producer.call(exchange).await;
929        assert!(result.is_ok());
930        let exchange = result.unwrap();
931        // Should return results, not rowsAffected
932        assert!(matches!(exchange.input.body, Body::Json(_)));
933    }
934
935    // SQL-004: use_placeholder=true (default) processes placeholders normally
936    #[tokio::test]
937    async fn use_placeholder_true_processes_placeholders() {
938        let pool = sqlite_pool().await;
939        seed_items_table(&pool).await;
940
941        let mut config = SqlEndpointConfig::from_uri(
942            "sql:select id, name from items where id = #?db_url=sqlite::memory:",
943        )
944        .unwrap();
945        config.resolve_defaults();
946        assert!(config.use_placeholder);
947
948        let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()));
949        producer.pool.set(pool.clone()).unwrap();
950
951        let msg = Message::new(Body::Json(json!([1])));
952        let exchange = Exchange::new(msg);
953        let result = producer.call(exchange).await;
954        assert!(result.is_ok());
955    }
956}