1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use bytes::Bytes;
9use serde_json::json;
10use sqlx::AnyPool;
11use sqlx::any::AnyRow;
12use sqlx::pool::PoolOptions;
13use tokio::sync::OnceCell;
14use tower::Service;
15use tracing::{debug, error, info, warn};
16
17use crate::config::{SqlEndpointConfig, SqlOutputType, enrich_db_url_with_ssl, redact_db_url};
18use crate::headers;
19use crate::query::{PreparedQuery, is_select_query, parse_query_template, resolve_params};
20use crate::utils::{bind_json_values, row_to_json};
21use camel_component_api::{Body, CamelError, Exchange, Message, StreamBody, StreamMetadata};
22
23#[derive(Clone)]
24pub struct SqlProducer {
25 pub(crate) config: SqlEndpointConfig,
26 pub(crate) pool: Arc<OnceCell<AnyPool>>,
27 pub(crate) stopped: Arc<AtomicBool>,
28}
29
30impl SqlProducer {
31 pub fn new(config: SqlEndpointConfig, pool: Arc<OnceCell<AnyPool>>) -> Self {
32 Self {
33 config,
34 pool,
35 stopped: Arc::new(AtomicBool::new(false)),
36 }
37 }
38
39 pub fn stop(&self) {
40 self.stopped.store(true, Ordering::Relaxed);
41 }
42
43 pub(crate) fn resolve_query_source(exchange: &Exchange, config: &SqlEndpointConfig) -> String {
48 if let Some(query_value) = exchange.input.header(headers::QUERY)
50 && let Some(query_str) = query_value.as_str()
51 {
52 return query_str.to_string();
53 }
54
55 if config.use_message_body_for_sql
57 && let Some(body_text) = exchange.input.body.as_text()
58 {
59 return body_text.to_string();
60 }
61
62 config.query.clone()
64 }
65}
66
67impl Service<Exchange> for SqlProducer {
68 type Response = Exchange;
69 type Error = CamelError;
70 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
71
72 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73 if self.stopped.load(Ordering::Relaxed) {
74 return Poll::Ready(Err(CamelError::ProcessorError(
75 "SQL producer stopped".into(),
76 )));
77 }
78 if let Some(pool) = self.pool.get()
79 && pool.is_closed()
80 {
81 return Poll::Ready(Err(CamelError::ProcessorError(
82 "SQL connection pool is closed".into(),
83 )));
84 }
85 Poll::Ready(Ok(()))
86 }
87
88 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
89 let mut config = self.config.clone();
90 let pool_cell = Arc::clone(&self.pool);
91
92 Box::pin(async move {
93 let pool: &AnyPool = pool_cell
95 .get_or_try_init(|| async {
96 config.resolve_defaults();
98 let db_url = enrich_db_url_with_ssl(&config.db_url, &config)?;
99
100 sqlx::any::install_default_drivers();
103
104 let max_conn = config.max_connections.ok_or_else(|| {
105 CamelError::Config("max_connections not resolved for SQL pool".into())
106 })?;
107 let min_conn = config.min_connections.ok_or_else(|| {
108 CamelError::Config("min_connections not resolved for SQL pool".into())
109 })?;
110 let idle_timeout = config.idle_timeout_secs.ok_or_else(|| {
111 CamelError::Config("idle_timeout_secs not resolved for SQL pool".into())
112 })?;
113 let max_lifetime = config.max_lifetime_secs.ok_or_else(|| {
114 CamelError::Config("max_lifetime_secs not resolved for SQL pool".into())
115 })?;
116
117 let opts: PoolOptions<sqlx::Any> = PoolOptions::new()
118 .max_connections(max_conn)
119 .min_connections(min_conn)
120 .idle_timeout(Duration::from_secs(idle_timeout))
121 .max_lifetime(Duration::from_secs(max_lifetime));
122
123 info!(
124 db_url = %redact_db_url(&config.db_url),
125 "SQL producer pool initializing"
126 );
127 opts.connect(&db_url).await.map_err(|e| {
128 error!(error = %e, db_url = %redact_db_url(&config.db_url), "Failed to connect to database");
129 CamelError::EndpointCreationFailed(format!(
130 "Failed to connect to database: {}",
131 e
132 ))
133 })
134 })
135 .await
136 .map_err(|e: CamelError| {
137 error!("SQL producer pool initialization failed: {}", e);
138 e.clone()
139 })?;
140
141 let query_str = Self::resolve_query_source(&exchange, &config);
143
144 debug!(
145 "Executing SQL query (config query length: {})",
146 query_str.len()
147 );
148
149 if config.batch {
151 execute_batch(pool, &config, &mut exchange).await?;
153 } else if config.use_placeholder {
154 let template = parse_query_template(&query_str, config.placeholder)?;
156 let mut prepared = resolve_params(&template, &exchange, &config.in_separator)?;
157
158 if let Some(params_value) = exchange.input.header(headers::PARAMETERS) {
160 if let Some(arr) = params_value.as_array() {
161 if arr.len() != prepared.bindings.len() {
162 warn!(
163 expected = prepared.bindings.len(),
164 got = arr.len(),
165 header = headers::PARAMETERS,
166 "Parameter count mismatch — SQL has {} placeholders but header provides {} values",
167 prepared.bindings.len(),
168 arr.len()
169 );
170 }
171 debug!(
172 "Overriding bindings from {} header with {} parameters",
173 headers::PARAMETERS,
174 arr.len()
175 );
176 prepared.bindings = arr.clone();
177 } else {
178 warn!(
179 header = headers::PARAMETERS,
180 "Header is present but not a JSON array — ignoring parameter override"
181 );
182 }
183 }
184
185 debug!(
186 "Executing prepared SQL ({} bindings)",
187 prepared.bindings.len()
188 );
189
190 if is_select_query(&prepared.sql) {
191 execute_select(pool, &prepared, &config, &mut exchange).await?;
192 } else {
193 execute_modify(pool, &prepared, &config, &mut exchange).await?;
194 }
195 } else {
196 debug!("Executing raw SQL (placeholder processing disabled)");
198 let prepared = PreparedQuery {
199 sql: query_str,
200 bindings: vec![],
201 };
202
203 if is_select_query(&prepared.sql) {
204 execute_select(pool, &prepared, &config, &mut exchange).await?;
205 } else {
206 execute_modify(pool, &prepared, &config, &mut exchange).await?;
207 }
208 }
209
210 Ok(exchange)
211 })
212 }
213}
214
215async fn execute_select(
217 pool: &AnyPool,
218 prepared: &PreparedQuery,
219 config: &SqlEndpointConfig,
220 exchange: &mut Exchange,
221) -> Result<(), CamelError> {
222 match config.output_type {
223 SqlOutputType::SelectOne => {
224 let mut query = sqlx::query(&prepared.sql);
226 query = bind_json_values(query, &prepared.bindings);
227
228 let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
229 error!("Query execution failed: {}", e);
230 CamelError::ProcessorError(format!("Query execution failed: {}", e))
231 })?;
232
233 let count = rows.len();
234 let json_rows: Vec<serde_json::Value> = rows
235 .iter()
236 .map(row_to_json)
237 .collect::<Result<Vec<_>, _>>()?;
238
239 if let Some(first_row) = json_rows.into_iter().next() {
240 exchange.input.body = Body::Json(first_row);
241 } else {
242 exchange.input.body = Body::Empty;
243 }
244 debug!("SelectOne returned {} row", if count > 0 { 1 } else { 0 });
245 exchange
246 .input
247 .set_header(headers::ROW_COUNT, serde_json::json!(count));
248 }
249 SqlOutputType::SelectList => {
250 let mut query = sqlx::query(&prepared.sql);
252 query = bind_json_values(query, &prepared.bindings);
253
254 let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
255 error!("Query execution failed: {}", e);
256 CamelError::ProcessorError(format!("Query execution failed: {}", e))
257 })?;
258
259 let count = rows.len();
260 let json_rows: Vec<serde_json::Value> = rows
261 .iter()
262 .map(row_to_json)
263 .collect::<Result<Vec<_>, _>>()?;
264
265 exchange.input.body = Body::Json(serde_json::Value::Array(json_rows));
266 debug!("SelectList returned {} rows", count);
267 exchange
268 .input
269 .set_header(headers::ROW_COUNT, serde_json::json!(count));
270 }
271 SqlOutputType::StreamList => {
272 use futures::TryStreamExt;
274
275 let pool_clone = pool.clone();
276 let sql_str = prepared.sql.clone();
277 let bindings = prepared.bindings.clone();
278
279 let byte_stream = async_stream::try_stream! {
281 let mut q = sqlx::query(&sql_str);
282 q = bind_json_values(q, &bindings);
283 let mut rows = q.fetch(&pool_clone);
284 while let Some(row) = rows.try_next().await.map_err(|e| {
285 CamelError::ProcessorError(format!("Query execution failed: {}", e))
286 })? {
287 let json_val = row_to_json(&row).map_err(|e| {
288 CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
289 })?;
290 let mut bytes = serde_json::to_vec(&json_val)
291 .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
292 bytes.push(b'\n');
293 yield Bytes::from(bytes);
294 }
295 };
296
297 exchange.input.body = Body::Stream(StreamBody {
298 stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
299 metadata: StreamMetadata {
300 content_type: Some("application/x-ndjson".to_string()),
301 size_hint: None,
302 origin: None,
303 },
304 });
305 debug!("StreamList: created lazy stream (rows fetched on demand)");
306 }
308 }
309
310 Ok(())
311}
312
313async fn execute_modify(
315 pool: &AnyPool,
316 prepared: &PreparedQuery,
317 config: &SqlEndpointConfig,
318 exchange: &mut Exchange,
319) -> Result<(), CamelError> {
320 let mut query = sqlx::query(&prepared.sql);
321 query = bind_json_values(query, &prepared.bindings);
322
323 let result = query.execute(pool).await.map_err(|e| {
324 error!("Query execution failed: {}", e);
325 CamelError::ProcessorError(format!("Query execution failed: {}", e))
326 })?;
327
328 let rows_affected = result.rows_affected();
329
330 if let Some(expected) = config.expected_update_count
332 && rows_affected as i64 != expected
333 {
334 error!("Expected {} rows affected, got {}", expected, rows_affected);
335 return Err(CamelError::ProcessorError(format!(
336 "Expected {} rows affected, got {}",
337 expected, rows_affected
338 )));
339 }
340
341 exchange
342 .input
343 .set_header(headers::UPDATE_COUNT, serde_json::json!(rows_affected));
344
345 if config.noop {
346 } else {
348 exchange.input.body = Body::Json(json!({ "rowsAffected": rows_affected }));
349 }
350
351 debug!("Modify query affected {} rows", rows_affected);
352
353 Ok(())
354}
355
356async fn execute_batch(
358 pool: &AnyPool,
359 config: &SqlEndpointConfig,
360 exchange: &mut Exchange,
361) -> Result<(), CamelError> {
362 let body_json = match &exchange.input.body {
364 Body::Json(val) => val,
365 _ => {
366 return Err(CamelError::ProcessorError(
367 "Batch mode requires body to be a JSON array of arrays".to_string(),
368 ));
369 }
370 };
371
372 let batch_data = body_json
373 .as_array()
374 .ok_or_else(|| {
375 CamelError::ProcessorError("Batch mode requires body to be a JSON array".to_string())
376 })?
377 .clone();
378
379 let template = parse_query_template(&config.query, config.placeholder)?;
381
382 let mut tx = pool.begin().await.map_err(|e| {
384 error!("Failed to begin transaction: {}", e);
385 CamelError::ProcessorError(format!("Failed to begin transaction: {}", e))
386 })?;
387
388 let mut total_rows_affected: u64 = 0;
389
390 for (batch_idx, params_array) in batch_data.into_iter().enumerate() {
391 params_array.as_array().ok_or_else(|| {
393 CamelError::ProcessorError(format!(
394 "Batch item at index {} must be a JSON array of parameters",
395 batch_idx
396 ))
397 })?;
398
399 let temp_msg = Message::new(Body::Json(params_array.clone()));
401 let temp_exchange = Exchange::new(temp_msg);
402
403 let prepared = resolve_params(&template, &temp_exchange, &config.in_separator)?;
405
406 let mut query = sqlx::query(&prepared.sql);
408 query = bind_json_values(query, &prepared.bindings);
409
410 let result = query.execute(&mut *tx).await.map_err(|e| {
411 error!("Batch query execution failed at index {}: {}", batch_idx, e);
412 CamelError::ProcessorError(format!("Batch query execution failed: {}", e))
413 })?;
414
415 if let Some(expected) = config.expected_update_count
417 && result.rows_affected() as i64 != expected
418 {
419 error!(
420 "Batch item {}: expected {} rows affected, got {}",
421 batch_idx,
422 expected,
423 result.rows_affected()
424 );
425 return Err(CamelError::ProcessorError(format!(
426 "Batch item {}: expected {} rows affected, got {}",
427 batch_idx,
428 expected,
429 result.rows_affected()
430 )));
431 }
432
433 total_rows_affected += result.rows_affected();
434 }
435
436 tx.commit().await.map_err(|e| {
438 error!("Failed to commit transaction: {}", e);
439 CamelError::ProcessorError(format!("Failed to commit transaction: {}", e))
440 })?;
441
442 exchange.input.set_header(
443 headers::UPDATE_COUNT,
444 serde_json::json!(total_rows_affected),
445 );
446
447 debug!(
448 "Batch execution completed, total rows affected: {}",
449 total_rows_affected
450 );
451
452 Ok(())
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458 use camel_component_api::Message;
459 use camel_component_api::UriConfig;
460 use sqlx::any::AnyPoolOptions;
461 use std::sync::Arc;
462 use tokio::sync::OnceCell;
463
464 async fn sqlite_pool() -> AnyPool {
465 sqlx::any::install_default_drivers();
466 AnyPoolOptions::new()
467 .max_connections(1)
468 .connect("sqlite::memory:")
469 .await
470 .expect("sqlite pool")
471 }
472
473 async fn seed_items_table(pool: &AnyPool) {
474 sqlx::query(
475 "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, done INTEGER DEFAULT 0)",
476 )
477 .execute(pool)
478 .await
479 .expect("create table");
480 sqlx::query("INSERT INTO items (id, name, done) VALUES (1, 'a', 0), (2, 'b', 0)")
481 .execute(pool)
482 .await
483 .expect("seed rows");
484 }
485
486 fn config() -> SqlEndpointConfig {
487 let mut c =
488 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
489 c.resolve_defaults();
490 c
491 }
492
493 #[test]
494 fn producer_clone_shares_pool() {
495 let p1 = SqlProducer::new(config(), Arc::new(OnceCell::new()));
496 let p2 = p1.clone();
497 assert!(Arc::ptr_eq(&p1.pool, &p2.pool));
498 assert!(Arc::ptr_eq(&p1.stopped, &p2.stopped));
499 }
500
501 #[test]
502 fn resolve_query_from_config() {
503 let config = config();
504 let ex = Exchange::new(Message::default());
505 let q = SqlProducer::resolve_query_source(&ex, &config);
506 assert_eq!(q, "select 1");
507 }
508
509 #[test]
510 fn resolve_query_from_header() {
511 let config = config();
512 let mut msg = Message::default();
513 msg.set_header(headers::QUERY, serde_json::json!("select 2"));
514 let ex = Exchange::new(msg);
515 let q = SqlProducer::resolve_query_source(&ex, &config);
516 assert_eq!(q, "select 2");
517 }
518
519 #[test]
520 fn resolve_query_from_body() {
521 let mut config = config();
522 config.use_message_body_for_sql = true;
523 let msg = Message::new(Body::Text("select 3".to_string()));
524 let ex = Exchange::new(msg);
525 let q = SqlProducer::resolve_query_source(&ex, &config);
526 assert_eq!(q, "select 3");
527 }
528
529 #[test]
530 fn resolve_query_header_priority_over_body() {
531 let mut config = config();
532 config.use_message_body_for_sql = true;
533 let mut msg = Message::new(Body::Text("select from body".to_string()));
534 msg.set_header(headers::QUERY, serde_json::json!("select from header"));
535 let ex = Exchange::new(msg);
536 let q = SqlProducer::resolve_query_source(&ex, &config);
537 assert_eq!(q, "select from header");
538 }
539
540 #[test]
541 fn resolve_query_body_priority_over_config() {
542 let mut config = config();
543 config.use_message_body_for_sql = true;
544 let msg = Message::new(Body::Text("select from body".to_string()));
545 let ex = Exchange::new(msg);
546 let q = SqlProducer::resolve_query_source(&ex, &config);
547 assert_eq!(q, "select from body");
548 }
549
550 #[test]
551 fn bind_json_null() {
552 let query = sqlx::query("SELECT ?");
553 let values = vec![serde_json::Value::Null];
554 let _bound = bind_json_values(query, &values);
555 }
557
558 #[test]
559 fn bind_json_bool() {
560 let query = sqlx::query("SELECT ?");
561 let values = vec![serde_json::Value::Bool(true)];
562 let _bound = bind_json_values(query, &values);
563 }
564
565 #[test]
566 fn bind_json_number_i64() {
567 let query = sqlx::query("SELECT ?");
568 let values = vec![serde_json::json!(42)];
569 let _bound = bind_json_values(query, &values);
570 }
571
572 #[test]
573 fn bind_json_number_f64() {
574 let query = sqlx::query("SELECT ?");
575 let values = vec![serde_json::json!(std::f64::consts::PI)];
576 let _bound = bind_json_values(query, &values);
577 }
578
579 #[test]
580 fn bind_json_string() {
581 let query = sqlx::query("SELECT ?");
582 let values = vec![serde_json::json!("hello world")];
583 let _bound = bind_json_values(query, &values);
584 }
585
586 #[test]
587 fn bind_json_array() {
588 let query = sqlx::query("SELECT ?");
589 let values = vec![serde_json::json!([1, 2, 3])];
590 let _bound = bind_json_values(query, &values);
591 }
592
593 #[test]
594 fn bind_json_object() {
595 let query = sqlx::query("SELECT ?");
596 let values = vec![serde_json::json!({"key": "value"})];
597 let _bound = bind_json_values(query, &values);
598 }
599
600 #[test]
601 fn bind_multiple_values() {
602 let query = sqlx::query("SELECT ?, ?, ?");
603 let values = vec![
604 serde_json::json!(1),
605 serde_json::json!("test"),
606 serde_json::Value::Null,
607 ];
608 let _bound = bind_json_values(query, &values);
609 }
610
611 #[test]
613 fn expected_update_count_validation() {
614 let config = SqlEndpointConfig::from_uri(
616 "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=5",
617 )
618 .unwrap();
619 assert_eq!(config.expected_update_count, Some(5));
620
621 let config_default = self::config();
623 assert_eq!(config_default.expected_update_count, None);
624
625 let config_neg = SqlEndpointConfig::from_uri(
627 "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=-1",
628 )
629 .unwrap();
630 assert_eq!(config_neg.expected_update_count, Some(-1));
631 }
632
633 #[test]
635 fn parameters_header_override_logic() {
636 let mut prepared = PreparedQuery {
638 sql: "SELECT * FROM t WHERE id = $1".to_string(),
639 bindings: vec![serde_json::json!(42)],
640 };
641
642 let header_params = serde_json::json!([99, "extra"]);
644 if let Some(arr) = header_params.as_array() {
645 prepared.bindings = arr.clone();
646 }
647
648 assert_eq!(prepared.bindings.len(), 2);
650 assert_eq!(prepared.bindings[0], serde_json::json!(99));
651 assert_eq!(prepared.bindings[1], serde_json::json!("extra"));
652
653 let mut prepared2 = PreparedQuery {
655 sql: "SELECT * FROM t WHERE id = $1".to_string(),
656 bindings: vec![serde_json::json!(42)],
657 };
658 let header_non_array = serde_json::json!({"not": "an array"});
659 if let Some(arr) = header_non_array.as_array() {
660 prepared2.bindings = arr.clone();
661 }
662 assert_eq!(prepared2.bindings.len(), 1);
664 assert_eq!(prepared2.bindings[0], serde_json::json!(42));
665 }
666
667 #[tokio::test]
668 async fn execute_select_one_sets_body_and_row_count() {
669 let pool = sqlite_pool().await;
670 seed_items_table(&pool).await;
671
672 let mut config = SqlEndpointConfig::from_uri(
673 "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=SelectOne",
674 )
675 .unwrap();
676 config.resolve_defaults();
677
678 let prepared = PreparedQuery {
679 sql: "select id, name from items order by id".to_string(),
680 bindings: vec![],
681 };
682 let mut exchange = Exchange::new(Message::default());
683
684 execute_select(&pool, &prepared, &config, &mut exchange)
685 .await
686 .expect("select one");
687
688 assert_eq!(exchange.input.header(headers::ROW_COUNT), Some(&json!(2)));
689 assert_eq!(
690 exchange.input.body,
691 Body::Json(json!({"id": 1, "name": "a"}))
692 );
693 }
694
695 #[tokio::test]
696 async fn execute_stream_list_materializes_ndjson() {
697 let pool = sqlite_pool().await;
698 seed_items_table(&pool).await;
699
700 let mut config = SqlEndpointConfig::from_uri(
701 "sql:select id from items order by id?db_url=sqlite::memory:&outputType=StreamList",
702 )
703 .unwrap();
704 config.resolve_defaults();
705
706 let prepared = PreparedQuery {
707 sql: "select id from items order by id".to_string(),
708 bindings: vec![],
709 };
710 let mut exchange = Exchange::new(Message::default());
711
712 execute_select(&pool, &prepared, &config, &mut exchange)
713 .await
714 .expect("stream list");
715
716 let bytes = exchange
717 .input
718 .body
719 .clone()
720 .into_bytes(1024)
721 .await
722 .expect("stream bytes");
723 let text = String::from_utf8(bytes.to_vec()).expect("utf8");
724 assert!(text.contains("{\"id\":1}"));
725 assert!(text.contains("{\"id\":2}"));
726 assert_eq!(exchange.input.header(headers::ROW_COUNT), None);
727 }
728
729 #[tokio::test]
730 async fn execute_modify_expected_update_count_mismatch_returns_error() {
731 let pool = sqlite_pool().await;
732 seed_items_table(&pool).await;
733
734 let mut config = SqlEndpointConfig::from_uri(
735 "sql:update items set done=1 where id = #?db_url=sqlite::memory:&expectedUpdateCount=2",
736 )
737 .unwrap();
738 config.resolve_defaults();
739
740 let prepared = PreparedQuery {
741 sql: "update items set done=1 where id = $1".to_string(),
742 bindings: vec![json!(1)],
743 };
744 let mut exchange = Exchange::new(Message::default());
745
746 let err = execute_modify(&pool, &prepared, &config, &mut exchange)
747 .await
748 .expect_err("must fail due expected row count mismatch");
749 assert!(err.to_string().contains("Expected 2 rows affected, got 1"));
750 }
751
752 #[tokio::test]
753 async fn execute_batch_rollback_when_any_item_fails_expected_count() {
754 let pool = sqlite_pool().await;
755 seed_items_table(&pool).await;
756
757 let mut config = SqlEndpointConfig::from_uri(
758 "sql:update items set done=1 where id = #?db_url=sqlite::memory:&batch=true&expectedUpdateCount=1",
759 )
760 .unwrap();
761 config.resolve_defaults();
762
763 let mut exchange = Exchange::new(Message::new(Body::Json(json!([[1], [999]]))));
764
765 let err = execute_batch(&pool, &config, &mut exchange)
766 .await
767 .expect_err("second batch item should fail expectedUpdateCount");
768 assert!(
769 err.to_string()
770 .contains("Batch item 1: expected 1 rows affected, got 0")
771 );
772
773 let row = sqlx::query("select done from items where id = 1")
774 .fetch_one(&pool)
775 .await
776 .expect("query row");
777 let done: i64 = sqlx::Row::try_get(&row, 0).expect("done column");
778 assert_eq!(done, 0, "transaction must rollback first update");
779 }
780
781 #[tokio::test]
787 async fn producer_no_panic_without_prior_resolve_defaults() {
788 let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
790 assert!(config.max_connections.is_none());
791
792 let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()));
793 let exchange = Exchange::new(Message::default());
794
795 let result = producer.call(exchange).await;
797 assert!(
798 result.is_ok(),
799 "Producer should initialize pool without panic, got: {:?}",
800 result
801 );
802 }
803
804 #[tokio::test]
806 async fn producer_pool_init_returns_config_error_for_invalid_db() {
807 let mut config = SqlEndpointConfig::from_uri(
809 "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db",
810 )
811 .unwrap();
812 config.max_connections = Some(1);
814 config.min_connections = Some(0);
815 config.idle_timeout_secs = Some(300);
816 config.max_lifetime_secs = Some(1800);
817
818 let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()));
819 let exchange = Exchange::new(Message::default());
820
821 let result = producer.call(exchange).await;
822 assert!(result.is_err());
823 let err_msg = result.unwrap_err().to_string();
825 assert!(
826 err_msg.contains("Failed to connect") || err_msg.contains("database"),
827 "Expected connection error, got: {}",
828 err_msg
829 );
830 }
831
832 #[test]
834 fn poll_ready_returns_ready_for_uninitialized_pool() {
835 let config = {
836 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
837 c.resolve_defaults();
838 c
839 };
840 let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()));
841 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
842 let result = producer.poll_ready(&mut cx);
843 assert!(matches!(result, Poll::Ready(Ok(()))));
844 }
845
846 #[test]
848 fn poll_ready_returns_error_when_stopped() {
849 let config = {
850 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
851 c.resolve_defaults();
852 c
853 };
854 let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()));
855 producer.stop();
856 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
857 let result = producer.poll_ready(&mut cx);
858 assert!(matches!(result, Poll::Ready(Err(_))));
859 let err_msg = match result {
860 Poll::Ready(Err(e)) => e.to_string(),
861 _ => unreachable!(),
862 };
863 assert!(err_msg.contains("SQL producer stopped"));
864 }
865
866 #[tokio::test]
868 async fn poll_ready_returns_error_when_pool_closed() {
869 let pool = sqlite_pool().await;
870 pool.close().await;
871
872 let config = {
873 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
874 c.resolve_defaults();
875 c
876 };
877 let pool_cell = Arc::new(OnceCell::new());
878 pool_cell.set(pool).unwrap();
879
880 let mut producer = SqlProducer::new(config, pool_cell);
881 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
882 let result = producer.poll_ready(&mut cx);
883 assert!(matches!(result, Poll::Ready(Err(_))));
884 let err_msg = match result {
885 Poll::Ready(Err(e)) => e.to_string(),
886 _ => unreachable!(),
887 };
888 assert!(err_msg.contains("SQL connection pool is closed"));
889 }
890
891 #[tokio::test]
893 async fn poll_ready_returns_ok_for_healthy_pool() {
894 let pool = sqlite_pool().await;
895
896 let config = {
897 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
898 c.resolve_defaults();
899 c
900 };
901 let pool_cell = Arc::new(OnceCell::new());
902 pool_cell.set(pool).unwrap();
903
904 let mut producer = SqlProducer::new(config, pool_cell);
905 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
906 let result = producer.poll_ready(&mut cx);
907 assert!(matches!(result, Poll::Ready(Ok(()))));
908 }
909
910 #[tokio::test]
912 async fn use_placeholder_false_executes_raw_sql() {
913 let pool = sqlite_pool().await;
914 seed_items_table(&pool).await;
915
916 let mut config = SqlEndpointConfig::from_uri(
917 "sql:select id, name from items order by id?db_url=sqlite::memory:&usePlaceholder=false",
918 )
919 .unwrap();
920 config.resolve_defaults();
921 assert!(!config.use_placeholder);
922
923 let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()));
924 producer.pool.set(pool.clone()).unwrap();
926
927 let exchange = Exchange::new(Message::default());
928 let result = producer.call(exchange).await;
929 assert!(result.is_ok());
930 let exchange = result.unwrap();
931 assert!(matches!(exchange.input.body, Body::Json(_)));
933 }
934
935 #[tokio::test]
937 async fn use_placeholder_true_processes_placeholders() {
938 let pool = sqlite_pool().await;
939 seed_items_table(&pool).await;
940
941 let mut config = SqlEndpointConfig::from_uri(
942 "sql:select id, name from items where id = #?db_url=sqlite::memory:",
943 )
944 .unwrap();
945 config.resolve_defaults();
946 assert!(config.use_placeholder);
947
948 let mut producer = SqlProducer::new(config, Arc::new(OnceCell::new()));
949 producer.pool.set(pool.clone()).unwrap();
950
951 let msg = Message::new(Body::Json(json!([1])));
952 let exchange = Exchange::new(msg);
953 let result = producer.call(exchange).await;
954 assert!(result.is_ok());
955 }
956}