1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use bytes::Bytes;
9use serde_json::json;
10use sqlx::AnyPool;
11use sqlx::any::AnyRow;
12use sqlx::pool::PoolOptions;
13use tokio::sync::OnceCell;
14use tower::Service;
15use tracing::{debug, error, info, warn};
16
17use crate::config::{SqlEndpointConfig, SqlOutputType, enrich_db_url_with_ssl, redact_db_url};
18use crate::headers;
19use crate::query::{PreparedQuery, is_select_query, parse_query_template, resolve_params};
20use crate::utils::{bind_json_values, is_retryable_sqlx_error, row_to_json};
21use camel_component_api::retry_async;
22use camel_component_api::{
23 Body, CamelError, Exchange, Message, RuntimeObservability, StreamBody, StreamMetadata,
24};
25
26#[derive(Clone)]
27pub struct SqlProducer {
28 pub(crate) config: SqlEndpointConfig,
29 pub(crate) pool: Arc<OnceCell<AnyPool>>,
30 pub(crate) stopped: Arc<AtomicBool>,
31 #[allow(dead_code)]
34 pub(crate) runtime: Arc<dyn RuntimeObservability>,
35}
36
37impl SqlProducer {
38 pub fn new(
39 config: SqlEndpointConfig,
40 pool: Arc<OnceCell<AnyPool>>,
41 runtime: Arc<dyn RuntimeObservability>,
42 ) -> Self {
43 Self {
44 config,
45 pool,
46 stopped: Arc::new(AtomicBool::new(false)),
47 runtime,
48 }
49 }
50
51 pub fn stop(&self) {
52 self.stopped.store(true, Ordering::Relaxed);
53 if let Some(pool) = self.pool.get() {
55 let pool = pool.clone();
56 tokio::spawn(async move {
57 if tokio::time::timeout(Duration::from_secs(5), pool.close())
58 .await
59 .is_err()
60 {
61 tracing::warn!("SQL producer pool did not close within 5s");
62 }
63 });
64 }
65 }
66
67 pub(crate) fn resolve_query_source(exchange: &Exchange, config: &SqlEndpointConfig) -> String {
72 if let Some(query_value) = exchange.input.header(headers::QUERY)
74 && let Some(query_str) = query_value.as_str()
75 {
76 return query_str.to_string();
77 }
78
79 if config.use_message_body_for_sql
81 && let Some(body_text) = exchange.input.body.as_text()
82 {
83 return body_text.to_string();
84 }
85
86 config.query.clone()
88 }
89
90 pub async fn check_connection(&self) -> Result<(), CamelError> {
96 let pool = self.pool.get().ok_or_else(|| {
97 CamelError::ProcessorError("SQL connection pool not initialized".into())
98 })?;
99
100 debug!("Running health check: SELECT 1");
101 sqlx::query("SELECT 1").execute(pool).await.map_err(|e| {
102 warn!(error = %e, "SQL health check failed");
103 CamelError::ProcessorError(format!("SQL health check failed: {}", e))
104 })?;
105
106 debug!("SQL health check passed");
107 Ok(())
108 }
109}
110
111impl Service<Exchange> for SqlProducer {
112 type Response = Exchange;
113 type Error = CamelError;
114 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
115
116 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
117 if self.stopped.load(Ordering::Relaxed) {
118 return Poll::Ready(Err(CamelError::ProcessorError(
119 "SQL producer stopped".into(),
120 )));
121 }
122 if let Some(pool) = self.pool.get()
123 && pool.is_closed()
124 {
125 return Poll::Ready(Err(CamelError::ProcessorError(
126 "SQL connection pool is closed".into(),
127 )));
128 }
129 Poll::Ready(Ok(()))
130 }
131
132 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
133 let mut config = self.config.clone();
134 let pool_cell = Arc::clone(&self.pool);
135
136 Box::pin(async move {
137 let pool: &AnyPool = pool_cell
139 .get_or_try_init(|| async {
140 config.resolve_defaults();
142 config.resolve_file_query().await?;
144 let db_url = enrich_db_url_with_ssl(&config.db_url, &config)?;
145
146 sqlx::any::install_default_drivers();
149
150 let max_conn = config.max_connections.ok_or_else(|| {
151 CamelError::Config("max_connections not resolved for SQL pool".into())
152 })?;
153 let min_conn = config.min_connections.ok_or_else(|| {
154 CamelError::Config("min_connections not resolved for SQL pool".into())
155 })?;
156 let idle_timeout = config.idle_timeout_secs.ok_or_else(|| {
157 CamelError::Config("idle_timeout_secs not resolved for SQL pool".into())
158 })?;
159 let max_lifetime = config.max_lifetime_secs.ok_or_else(|| {
160 CamelError::Config("max_lifetime_secs not resolved for SQL pool".into())
161 })?;
162
163 info!(
164 db_url = %redact_db_url(&config.db_url),
165 "SQL producer pool initializing"
166 );
167 let retry_policy = &config.retry;
168 retry_async::<_, _, _, _, sqlx::Error>(
169 retry_policy,
170 Some("sql-producer"),
171 || {
172 PoolOptions::new()
173 .max_connections(max_conn)
174 .min_connections(min_conn)
175 .idle_timeout(Duration::from_secs(idle_timeout))
176 .max_lifetime(Duration::from_secs(max_lifetime))
177 .connect(&db_url)
178 },
179 is_retryable_sqlx_error,
180 )
181 .await
182 .map_err(|e| {
183 error!(error = %e, db_url = %redact_db_url(&config.db_url), "Failed to connect to database"); CamelError::EndpointCreationFailed(format!(
186 "Failed to connect to database: {}",
187 e
188 ))
189 })
190 })
191 .await
192 .map_err(|e: CamelError| e.clone())?;
193
194 let query_str = Self::resolve_query_source(&exchange, &config);
196
197 if config.transaction_mode == crate::config::TransactionMode::Managed {
199 warn!("transactionManager not yet implemented; using Auto mode");
200 }
201
202 debug!(
203 query = %query_str,
204 "executing SQL query"
205 );
206
207 if config.batch {
209 execute_batch(pool, &config, &mut exchange).await?;
211 } else if config.use_placeholder {
212 let template = parse_query_template(&query_str, config.placeholder)?;
214 let mut prepared = resolve_params(&template, &exchange, &config.in_separator)?;
215
216 if let Some(params_value) = exchange.input.header(headers::PARAMETERS) {
218 if let Some(arr) = params_value.as_array() {
219 if arr.len() != prepared.bindings.len() {
220 warn!(
221 expected = prepared.bindings.len(),
222 got = arr.len(),
223 header = headers::PARAMETERS,
224 "Parameter count mismatch — SQL has {} placeholders but header provides {} values",
225 prepared.bindings.len(),
226 arr.len()
227 );
228 }
229 debug!(
230 "Overriding bindings from {} header with {} parameters",
231 headers::PARAMETERS,
232 arr.len()
233 );
234 prepared.bindings = arr.clone();
235 } else {
236 warn!(
237 header = headers::PARAMETERS,
238 "Header is present but not a JSON array — ignoring parameter override"
239 );
240 }
241 }
242
243 debug!(
244 "Executing prepared SQL ({} bindings)",
245 prepared.bindings.len()
246 );
247
248 if is_select_query(&prepared.sql) {
249 execute_select(pool, &prepared, &config, &mut exchange).await?;
250 } else {
251 execute_modify(pool, &prepared, &config, &mut exchange).await?;
252 }
253 } else {
254 debug!("Executing raw SQL (placeholder processing disabled)");
256 let prepared = PreparedQuery {
257 sql: query_str,
258 bindings: vec![],
259 };
260
261 if is_select_query(&prepared.sql) {
262 execute_select(pool, &prepared, &config, &mut exchange).await?;
263 } else {
264 execute_modify(pool, &prepared, &config, &mut exchange).await?;
265 }
266 }
267
268 Ok(exchange)
269 })
270 }
271}
272
273async fn execute_select(
275 pool: &AnyPool,
276 prepared: &PreparedQuery,
277 config: &SqlEndpointConfig,
278 exchange: &mut Exchange,
279) -> Result<(), CamelError> {
280 match config.output_type {
281 SqlOutputType::SelectOne => {
282 let mut query = sqlx::query(&prepared.sql);
284 query = bind_json_values(query, &prepared.bindings);
285
286 let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
287 warn!(error = %e, "SQL query failed");
288 CamelError::ProcessorError(format!("Query execution failed: {}", e))
289 })?;
290
291 let count = rows.len();
292 debug!(rows = count, "SQL query completed");
293 let json_rows: Vec<serde_json::Value> = rows
294 .iter()
295 .map(row_to_json)
296 .collect::<Result<Vec<_>, _>>()?;
297
298 if let Some(first_row) = json_rows.into_iter().next() {
299 exchange.input.body = Body::Json(first_row);
300 } else {
301 exchange.input.body = Body::Empty;
302 }
303 debug!("SelectOne returned {} row", if count > 0 { 1 } else { 0 });
304 exchange
305 .input
306 .set_header(headers::ROW_COUNT, serde_json::json!(count));
307 }
308 SqlOutputType::SelectList => {
309 let mut query = sqlx::query(&prepared.sql);
311 query = bind_json_values(query, &prepared.bindings);
312
313 let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
314 warn!(error = %e, "SQL query failed");
315 CamelError::ProcessorError(format!("Query execution failed: {}", e))
316 })?;
317
318 let count = rows.len();
319 debug!(rows = count, "SQL query completed");
320 let json_rows: Vec<serde_json::Value> = rows
321 .iter()
322 .map(row_to_json)
323 .collect::<Result<Vec<_>, _>>()?;
324
325 exchange.input.body = Body::Json(serde_json::Value::Array(json_rows));
326 debug!("SelectList returned {} rows", count);
327 exchange
328 .input
329 .set_header(headers::ROW_COUNT, serde_json::json!(count));
330 }
331 SqlOutputType::StreamList => {
332 use futures::TryStreamExt;
334
335 let pool_clone = pool.clone();
336 let sql_str = prepared.sql.clone();
337 let bindings = prepared.bindings.clone();
338
339 let byte_stream = async_stream::try_stream! {
341 let mut q = sqlx::query(&sql_str);
342 q = bind_json_values(q, &bindings);
343 let mut rows = q.fetch(&pool_clone);
344 while let Some(row) = rows.try_next().await.map_err(|e| {
345 CamelError::ProcessorError(format!("Query execution failed: {}", e))
346 })? {
347 let json_val = row_to_json(&row).map_err(|e| {
348 CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
349 })?;
350 let mut bytes = serde_json::to_vec(&json_val)
351 .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
352 bytes.push(b'\n');
353 yield Bytes::from(bytes);
354 }
355 };
356
357 exchange.input.body = Body::Stream(StreamBody {
358 stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
359 metadata: StreamMetadata {
360 content_type: Some("application/x-ndjson".to_string()),
361 size_hint: None,
362 origin: None,
363 },
364 });
365 debug!("StreamList: created lazy stream (rows fetched on demand)");
366 }
368 }
369
370 Ok(())
371}
372
373async fn execute_modify(
375 pool: &AnyPool,
376 prepared: &PreparedQuery,
377 config: &SqlEndpointConfig,
378 exchange: &mut Exchange,
379) -> Result<(), CamelError> {
380 let mut query = sqlx::query(&prepared.sql);
381 query = bind_json_values(query, &prepared.bindings);
382
383 let result = query.execute(pool).await.map_err(|e| {
384 warn!(error = %e, "SQL query failed");
385 CamelError::ProcessorError(format!("Query execution failed: {}", e))
386 })?;
387
388 let rows_affected = result.rows_affected();
389
390 if let Some(expected) = config.expected_update_count
392 && rows_affected as i64 != expected
393 {
394 warn!(expected, actual = rows_affected, "Row count mismatch");
395 return Err(CamelError::ProcessorError(format!(
396 "Expected {} rows affected, got {}",
397 expected, rows_affected
398 )));
399 }
400
401 exchange
402 .input
403 .set_header(headers::UPDATE_COUNT, serde_json::json!(rows_affected));
404
405 if config.noop {
406 } else {
408 exchange.input.body = Body::Json(json!({ "rowsAffected": rows_affected }));
409 }
410
411 debug!(rows = rows_affected, "SQL modify query completed");
412
413 Ok(())
414}
415
416async fn execute_batch(
418 pool: &AnyPool,
419 config: &SqlEndpointConfig,
420 exchange: &mut Exchange,
421) -> Result<(), CamelError> {
422 let body_json = match &exchange.input.body {
424 Body::Json(val) => val,
425 _ => {
426 return Err(CamelError::ProcessorError(
427 "Batch mode requires body to be a JSON array of arrays".to_string(),
428 ));
429 }
430 };
431
432 let batch_data = body_json
433 .as_array()
434 .ok_or_else(|| {
435 CamelError::ProcessorError("Batch mode requires body to be a JSON array".to_string())
436 })?
437 .clone();
438
439 let template = parse_query_template(&config.query, config.placeholder)?;
441
442 let mut tx = pool.begin().await.map_err(|e| {
444 warn!(error = %e, "Failed to begin transaction");
447 CamelError::ProcessorError(format!("Failed to begin transaction: {}", e))
448 })?;
449
450 let mut total_rows_affected: u64 = 0;
451
452 for (batch_idx, params_array) in batch_data.into_iter().enumerate() {
453 params_array.as_array().ok_or_else(|| {
455 CamelError::ProcessorError(format!(
456 "Batch item at index {} must be a JSON array of parameters",
457 batch_idx
458 ))
459 })?;
460
461 let temp_msg = Message::new(Body::Json(params_array.clone()));
463 let temp_exchange = Exchange::new(temp_msg);
464
465 let prepared = resolve_params(&template, &temp_exchange, &config.in_separator)?;
467
468 let mut query = sqlx::query(&prepared.sql);
470 query = bind_json_values(query, &prepared.bindings);
471
472 let result = query.execute(&mut *tx).await.map_err(|e| {
473 warn!("Batch query execution failed at index {}: {}", batch_idx, e);
476 CamelError::ProcessorError(format!("Batch query execution failed: {}", e))
477 })?;
478
479 if let Some(expected) = config.expected_update_count
481 && result.rows_affected() as i64 != expected
482 {
483 warn!(
486 "Batch item {}: expected {} rows affected, got {}",
487 batch_idx,
488 expected,
489 result.rows_affected()
490 );
491 return Err(CamelError::ProcessorError(format!(
492 "Batch item {}: expected {} rows affected, got {}",
493 batch_idx,
494 expected,
495 result.rows_affected()
496 )));
497 }
498
499 total_rows_affected += result.rows_affected();
500 }
501
502 tx.commit().await.map_err(|e| {
504 warn!(error = %e, "Failed to commit transaction");
507 CamelError::ProcessorError(format!("Failed to commit transaction: {}", e))
508 })?;
509
510 exchange.input.set_header(
511 headers::UPDATE_COUNT,
512 serde_json::json!(total_rows_affected),
513 );
514
515 debug!(
516 "Batch execution completed, total rows affected: {}",
517 total_rows_affected
518 );
519
520 Ok(())
521}
522
523#[cfg(test)]
524mod tests {
525 use super::*;
526 use camel_component_api::test_support::PanicRuntimeObservability;
527 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
528 std::sync::Arc::new(PanicRuntimeObservability)
529 }
530 use camel_component_api::Message;
531 use camel_component_api::UriConfig;
532 use sqlx::any::AnyPoolOptions;
533 use std::sync::Arc;
534 use tokio::sync::OnceCell;
535
536 async fn sqlite_pool() -> AnyPool {
537 sqlx::any::install_default_drivers();
538 AnyPoolOptions::new()
539 .max_connections(1)
540 .connect("sqlite::memory:")
541 .await
542 .expect("sqlite pool")
543 }
544
545 async fn seed_items_table(pool: &AnyPool) {
546 sqlx::query(
547 "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, done INTEGER DEFAULT 0)",
548 )
549 .execute(pool)
550 .await
551 .expect("create table");
552 sqlx::query("INSERT INTO items (id, name, done) VALUES (1, 'a', 0), (2, 'b', 0)")
553 .execute(pool)
554 .await
555 .expect("seed rows");
556 }
557
558 fn config() -> SqlEndpointConfig {
559 let mut c =
560 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
561 c.resolve_defaults();
562 c
563 }
564
565 #[test]
566 fn producer_clone_shares_pool() {
567 let p1 = SqlProducer::new(config(), Arc::new(OnceCell::new()), test_rt());
568 let p2 = p1.clone();
569 assert!(Arc::ptr_eq(&p1.pool, &p2.pool));
570 assert!(Arc::ptr_eq(&p1.stopped, &p2.stopped));
571 }
572
573 #[test]
574 fn resolve_query_from_config() {
575 let config = config();
576 let ex = Exchange::new(Message::default());
577 let q = SqlProducer::resolve_query_source(&ex, &config);
578 assert_eq!(q, "select 1");
579 }
580
581 #[test]
582 fn resolve_query_from_header() {
583 let config = config();
584 let mut msg = Message::default();
585 msg.set_header(headers::QUERY, serde_json::json!("select 2"));
586 let ex = Exchange::new(msg);
587 let q = SqlProducer::resolve_query_source(&ex, &config);
588 assert_eq!(q, "select 2");
589 }
590
591 #[test]
592 fn resolve_query_from_body() {
593 let mut config = config();
594 config.use_message_body_for_sql = true;
595 let msg = Message::new(Body::Text("select 3".to_string()));
596 let ex = Exchange::new(msg);
597 let q = SqlProducer::resolve_query_source(&ex, &config);
598 assert_eq!(q, "select 3");
599 }
600
601 #[test]
602 fn resolve_query_header_priority_over_body() {
603 let mut config = config();
604 config.use_message_body_for_sql = true;
605 let mut msg = Message::new(Body::Text("select from body".to_string()));
606 msg.set_header(headers::QUERY, serde_json::json!("select from header"));
607 let ex = Exchange::new(msg);
608 let q = SqlProducer::resolve_query_source(&ex, &config);
609 assert_eq!(q, "select from header");
610 }
611
612 #[test]
613 fn resolve_query_body_priority_over_config() {
614 let mut config = config();
615 config.use_message_body_for_sql = true;
616 let msg = Message::new(Body::Text("select from body".to_string()));
617 let ex = Exchange::new(msg);
618 let q = SqlProducer::resolve_query_source(&ex, &config);
619 assert_eq!(q, "select from body");
620 }
621
622 #[test]
623 fn bind_json_null() {
624 let query = sqlx::query("SELECT ?");
625 let values = vec![serde_json::Value::Null];
626 let _bound = bind_json_values(query, &values);
627 }
629
630 #[test]
631 fn bind_json_bool() {
632 let query = sqlx::query("SELECT ?");
633 let values = vec![serde_json::Value::Bool(true)];
634 let _bound = bind_json_values(query, &values);
635 }
636
637 #[test]
638 fn bind_json_number_i64() {
639 let query = sqlx::query("SELECT ?");
640 let values = vec![serde_json::json!(42)];
641 let _bound = bind_json_values(query, &values);
642 }
643
644 #[test]
645 fn bind_json_number_f64() {
646 let query = sqlx::query("SELECT ?");
647 let values = vec![serde_json::json!(std::f64::consts::PI)];
648 let _bound = bind_json_values(query, &values);
649 }
650
651 #[test]
652 fn bind_json_string() {
653 let query = sqlx::query("SELECT ?");
654 let values = vec![serde_json::json!("hello world")];
655 let _bound = bind_json_values(query, &values);
656 }
657
658 #[test]
659 fn bind_json_array() {
660 let query = sqlx::query("SELECT ?");
661 let values = vec![serde_json::json!([1, 2, 3])];
662 let _bound = bind_json_values(query, &values);
663 }
664
665 #[test]
666 fn bind_json_object() {
667 let query = sqlx::query("SELECT ?");
668 let values = vec![serde_json::json!({"key": "value"})];
669 let _bound = bind_json_values(query, &values);
670 }
671
672 #[test]
673 fn bind_multiple_values() {
674 let query = sqlx::query("SELECT ?, ?, ?");
675 let values = vec![
676 serde_json::json!(1),
677 serde_json::json!("test"),
678 serde_json::Value::Null,
679 ];
680 let _bound = bind_json_values(query, &values);
681 }
682
683 #[test]
685 fn expected_update_count_validation() {
686 let config = SqlEndpointConfig::from_uri(
688 "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=5",
689 )
690 .unwrap();
691 assert_eq!(config.expected_update_count, Some(5));
692
693 let config_default = self::config();
695 assert_eq!(config_default.expected_update_count, None);
696
697 let config_neg = SqlEndpointConfig::from_uri(
699 "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=-1",
700 )
701 .unwrap();
702 assert_eq!(config_neg.expected_update_count, Some(-1));
703 }
704
705 #[test]
707 fn parameters_header_override_logic() {
708 let mut prepared = PreparedQuery {
710 sql: "SELECT * FROM t WHERE id = $1".to_string(),
711 bindings: vec![serde_json::json!(42)],
712 };
713
714 let header_params = serde_json::json!([99, "extra"]);
716 if let Some(arr) = header_params.as_array() {
717 prepared.bindings = arr.clone();
718 }
719
720 assert_eq!(prepared.bindings.len(), 2);
722 assert_eq!(prepared.bindings[0], serde_json::json!(99));
723 assert_eq!(prepared.bindings[1], serde_json::json!("extra"));
724
725 let mut prepared2 = PreparedQuery {
727 sql: "SELECT * FROM t WHERE id = $1".to_string(),
728 bindings: vec![serde_json::json!(42)],
729 };
730 let header_non_array = serde_json::json!({"not": "an array"});
731 if let Some(arr) = header_non_array.as_array() {
732 prepared2.bindings = arr.clone();
733 }
734 assert_eq!(prepared2.bindings.len(), 1);
736 assert_eq!(prepared2.bindings[0], serde_json::json!(42));
737 }
738
739 #[tokio::test]
740 async fn execute_select_one_sets_body_and_row_count() {
741 let pool = sqlite_pool().await;
742 seed_items_table(&pool).await;
743
744 let mut config = SqlEndpointConfig::from_uri(
745 "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=SelectOne",
746 )
747 .unwrap();
748 config.resolve_defaults();
749
750 let prepared = PreparedQuery {
751 sql: "select id, name from items order by id".to_string(),
752 bindings: vec![],
753 };
754 let mut exchange = Exchange::new(Message::default());
755
756 execute_select(&pool, &prepared, &config, &mut exchange)
757 .await
758 .expect("select one");
759
760 assert_eq!(exchange.input.header(headers::ROW_COUNT), Some(&json!(2)));
761 assert_eq!(
762 exchange.input.body,
763 Body::Json(json!({"id": 1, "name": "a"}))
764 );
765 }
766
767 #[tokio::test]
768 async fn execute_stream_list_materializes_ndjson() {
769 let pool = sqlite_pool().await;
770 seed_items_table(&pool).await;
771
772 let mut config = SqlEndpointConfig::from_uri(
773 "sql:select id from items order by id?db_url=sqlite::memory:&outputType=StreamList",
774 )
775 .unwrap();
776 config.resolve_defaults();
777
778 let prepared = PreparedQuery {
779 sql: "select id from items order by id".to_string(),
780 bindings: vec![],
781 };
782 let mut exchange = Exchange::new(Message::default());
783
784 execute_select(&pool, &prepared, &config, &mut exchange)
785 .await
786 .expect("stream list");
787
788 let bytes = exchange
789 .input
790 .body
791 .clone()
792 .into_bytes(1024)
793 .await
794 .expect("stream bytes");
795 let text = String::from_utf8(bytes.to_vec()).expect("utf8");
796 assert!(text.contains("{\"id\":1}"));
797 assert!(text.contains("{\"id\":2}"));
798 assert_eq!(exchange.input.header(headers::ROW_COUNT), None);
799 }
800
801 #[tokio::test]
802 async fn execute_modify_expected_update_count_mismatch_returns_error() {
803 let pool = sqlite_pool().await;
804 seed_items_table(&pool).await;
805
806 let mut config = SqlEndpointConfig::from_uri(
807 "sql:update items set done=1 where id = #?db_url=sqlite::memory:&expectedUpdateCount=2",
808 )
809 .unwrap();
810 config.resolve_defaults();
811
812 let prepared = PreparedQuery {
813 sql: "update items set done=1 where id = $1".to_string(),
814 bindings: vec![json!(1)],
815 };
816 let mut exchange = Exchange::new(Message::default());
817
818 let err = execute_modify(&pool, &prepared, &config, &mut exchange)
819 .await
820 .expect_err("must fail due expected row count mismatch");
821 assert!(err.to_string().contains("Expected 2 rows affected, got 1"));
822 }
823
824 #[tokio::test]
825 async fn execute_batch_rollback_when_any_item_fails_expected_count() {
826 let pool = sqlite_pool().await;
827 seed_items_table(&pool).await;
828
829 let mut config = SqlEndpointConfig::from_uri(
830 "sql:update items set done=1 where id = #?db_url=sqlite::memory:&batch=true&expectedUpdateCount=1",
831 )
832 .unwrap();
833 config.resolve_defaults();
834
835 let mut exchange = Exchange::new(Message::new(Body::Json(json!([[1], [999]]))));
836
837 let err = execute_batch(&pool, &config, &mut exchange)
838 .await
839 .expect_err("second batch item should fail expectedUpdateCount");
840 assert!(
841 err.to_string()
842 .contains("Batch item 1: expected 1 rows affected, got 0")
843 );
844
845 let row = sqlx::query("select done from items where id = 1")
846 .fetch_one(&pool)
847 .await
848 .expect("query row");
849 let done: i64 = sqlx::Row::try_get(&row, 0).expect("done column");
850 assert_eq!(done, 0, "transaction must rollback first update");
851 }
852
853 #[tokio::test]
859 async fn producer_no_panic_without_prior_resolve_defaults() {
860 let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
862 assert!(config.max_connections.is_none());
863
864 let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()), test_rt());
865 let exchange = Exchange::new(Message::default());
866
867 let result = producer.call(exchange).await;
869 assert!(
870 result.is_ok(),
871 "Producer should initialize pool without panic, got: {:?}",
872 result
873 );
874 }
875
876 #[tokio::test]
878 async fn producer_pool_init_returns_config_error_for_invalid_db() {
879 let mut config = SqlEndpointConfig::from_uri(
883 "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false",
884 )
885 .unwrap();
886 config.max_connections = Some(1);
888 config.min_connections = Some(0);
889 config.idle_timeout_secs = Some(300);
890 config.max_lifetime_secs = Some(1800);
891
892 let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()), test_rt());
893 let exchange = Exchange::new(Message::default());
894
895 let result = producer.call(exchange).await;
896 assert!(result.is_err());
897 let err_msg = result.unwrap_err().to_string();
899 assert!(
900 err_msg.contains("Failed to connect") || err_msg.contains("database"),
901 "Expected connection error, got: {}",
902 err_msg
903 );
904 }
905
906 #[test]
908 fn poll_ready_returns_ready_for_uninitialized_pool() {
909 let config = {
910 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
911 c.resolve_defaults();
912 c
913 };
914 let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()), test_rt());
915 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
916 let result = producer.poll_ready(&mut cx);
917 assert!(matches!(result, Poll::Ready(Ok(()))));
918 }
919
920 #[test]
922 fn poll_ready_returns_error_when_stopped() {
923 let config = {
924 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
925 c.resolve_defaults();
926 c
927 };
928 let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()), test_rt());
929 producer.stop();
930 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
931 let result = producer.poll_ready(&mut cx);
932 assert!(matches!(result, Poll::Ready(Err(_))));
933 let err_msg = match result {
934 Poll::Ready(Err(e)) => e.to_string(),
935 _ => unreachable!(),
936 };
937 assert!(err_msg.contains("SQL producer stopped"));
938 }
939
940 #[tokio::test]
942 async fn poll_ready_returns_error_when_pool_closed() {
943 let pool = sqlite_pool().await;
944 pool.close().await;
945
946 let config = {
947 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
948 c.resolve_defaults();
949 c
950 };
951 let pool_cell = Arc::new(OnceCell::new());
952 pool_cell.set(pool).unwrap();
953
954 let mut producer = SqlProducer::new(config, pool_cell, test_rt());
955 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
956 let result = producer.poll_ready(&mut cx);
957 assert!(matches!(result, Poll::Ready(Err(_))));
958 let err_msg = match result {
959 Poll::Ready(Err(e)) => e.to_string(),
960 _ => unreachable!(),
961 };
962 assert!(err_msg.contains("SQL connection pool is closed"));
963 }
964
965 #[tokio::test]
967 async fn poll_ready_returns_ok_for_healthy_pool() {
968 let pool = sqlite_pool().await;
969
970 let config = {
971 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
972 c.resolve_defaults();
973 c
974 };
975 let pool_cell = Arc::new(OnceCell::new());
976 pool_cell.set(pool).unwrap();
977
978 let mut producer = SqlProducer::new(config, pool_cell, test_rt());
979 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
980 let result = producer.poll_ready(&mut cx);
981 assert!(matches!(result, Poll::Ready(Ok(()))));
982 }
983
984 #[tokio::test]
986 async fn test_sql_stop_closes_pool() {
987 let pool = sqlite_pool().await;
988
989 let config = {
990 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
991 c.resolve_defaults();
992 c
993 };
994 let pool_cell = Arc::new(OnceCell::new());
995 pool_cell.set(pool.clone()).unwrap();
996
997 let producer = SqlProducer::new(config, pool_cell.clone(), test_rt());
998 assert!(!pool.is_closed(), "Pool should be open before stop");
999
1000 producer.stop();
1001
1002 tokio::time::sleep(Duration::from_millis(100)).await;
1004
1005 assert!(
1006 pool.is_closed(),
1007 "Pool should be closed after producer.stop()"
1008 );
1009
1010 let mut producer2 = SqlProducer::new(
1012 {
1013 let mut c =
1014 SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1015 c.resolve_defaults();
1016 c
1017 },
1018 pool_cell.clone(),
1019 test_rt(),
1020 );
1021 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1022 let result = producer2.poll_ready(&mut cx);
1023 assert!(
1024 matches!(result, Poll::Ready(Err(_))),
1025 "poll_ready should fail after pool closed"
1026 );
1027 }
1028
1029 #[tokio::test]
1031 async fn use_placeholder_false_executes_raw_sql() {
1032 let pool = sqlite_pool().await;
1033 seed_items_table(&pool).await;
1034
1035 let mut config = SqlEndpointConfig::from_uri(
1036 "sql:select id, name from items order by id?db_url=sqlite::memory:&usePlaceholder=false",
1037 )
1038 .unwrap();
1039 config.resolve_defaults();
1040 assert!(!config.use_placeholder);
1041
1042 let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()), test_rt());
1043 producer.pool.set(pool.clone()).unwrap();
1045
1046 let exchange = Exchange::new(Message::default());
1047 let result = producer.call(exchange).await;
1048 assert!(result.is_ok());
1049 let exchange = result.unwrap();
1050 assert!(matches!(exchange.input.body, Body::Json(_)));
1052 }
1053
1054 #[tokio::test]
1056 async fn use_placeholder_true_processes_placeholders() {
1057 let pool = sqlite_pool().await;
1058 seed_items_table(&pool).await;
1059
1060 let mut config = SqlEndpointConfig::from_uri(
1061 "sql:select id, name from items where id = #?db_url=sqlite::memory:",
1062 )
1063 .unwrap();
1064 config.resolve_defaults();
1065 assert!(config.use_placeholder);
1066
1067 let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()), test_rt());
1068 producer.pool.set(pool.clone()).unwrap();
1069
1070 let msg = Message::new(Body::Json(json!([1])));
1071 let exchange = Exchange::new(msg);
1072 let result = producer.call(exchange).await;
1073 assert!(result.is_ok());
1074 }
1075
1076 #[tokio::test]
1080 async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
1081 use camel_component_api::NetworkRetryPolicy;
1082 use std::sync::Arc;
1083 use std::sync::atomic::{AtomicU32, Ordering};
1084
1085 let policy = NetworkRetryPolicy {
1086 max_attempts: 3,
1087 initial_delay: Duration::from_millis(1),
1088 max_delay: Duration::from_millis(1),
1089 multiplier: 1.0,
1090 ..NetworkRetryPolicy::default()
1091 };
1092
1093 let calls = Arc::new(AtomicU32::new(0));
1094 let calls_clone = Arc::clone(&calls);
1095
1096 let mut attempt: u32 = 0;
1097 let _result: Result<(), ()> = loop {
1098 attempt += 1;
1099 calls_clone.fetch_add(1, Ordering::SeqCst);
1100 let op_result: Result<(), ()> = Err(());
1101 match op_result {
1102 Ok(v) => break Ok(v),
1103 Err(_) if policy.should_retry(attempt) => {
1104 let delay = policy.delay_for(attempt - 1);
1105 tokio::time::sleep(delay).await;
1106 continue;
1107 }
1108 Err(_) => break Err(()),
1109 }
1110 };
1111
1112 assert_eq!(
1113 calls.load(Ordering::SeqCst),
1114 3,
1115 "max_attempts=3 must yield exactly 3 invocations"
1116 );
1117 }
1118}