1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use bytes::Bytes;
9use camel_api::datasource::DatasourceCatalog;
10use serde_json::json;
11use sqlx::AnyPool;
12use sqlx::any::AnyRow;
13use sqlx::pool::PoolOptions;
14use tokio::sync::OnceCell;
15use tower::Service;
16use tracing::{debug, error, info, warn};
17
18use crate::config::{SqlEndpointConfig, SqlOutputType, enrich_db_url_with_ssl, redact_db_url};
19use crate::headers;
20use crate::query::{PreparedQuery, is_select_query, parse_query_template, resolve_params};
21use crate::utils::{bind_json_values, is_retryable_sqlx_error, row_to_json};
22use camel_component_api::retry_async;
23use camel_component_api::{
24 Body, CamelError, Exchange, Message, RuntimeObservability, StreamBody, StreamMetadata,
25};
26
27#[derive(Clone)]
28pub struct SqlProducer {
29 pub(crate) config: SqlEndpointConfig,
30 pub(crate) pool: Arc<OnceCell<Arc<AnyPool>>>,
31 pub(crate) catalog: Option<Arc<dyn DatasourceCatalog>>,
32 pub(crate) stopped: Arc<AtomicBool>,
33 pub(crate) runtime: Arc<dyn RuntimeObservability>,
34 pub(crate) route_id: String,
36}
37
38impl SqlProducer {
39 pub fn new(
40 config: SqlEndpointConfig,
41 pool: Arc<OnceCell<Arc<AnyPool>>>,
42 catalog: Option<Arc<dyn DatasourceCatalog>>,
43 runtime: Arc<dyn RuntimeObservability>,
44 route_id: impl Into<String>,
45 ) -> Self {
46 Self {
47 config,
48 pool,
49 catalog,
50 stopped: Arc::new(AtomicBool::new(false)),
51 runtime,
52 route_id: route_id.into(),
53 }
54 }
55
56 pub fn stop(&self) {
57 self.stopped.store(true, Ordering::Relaxed);
58 if let Some(pool) = self.pool.get() {
60 let pool = pool.clone();
61 tokio::spawn(async move {
62 if tokio::time::timeout(Duration::from_secs(5), pool.close())
63 .await
64 .is_err()
65 {
66 tracing::warn!("SQL producer pool did not close within 5s");
67 }
68 });
69 }
70 }
71
72 pub(crate) fn resolve_query_source(exchange: &Exchange, config: &SqlEndpointConfig) -> String {
77 if let Some(query_value) = exchange.input.header(headers::QUERY)
79 && let Some(query_str) = query_value.as_str()
80 {
81 return query_str.to_string();
82 }
83
84 if config.use_message_body_for_sql
86 && let Some(body_text) = exchange.input.body.as_text()
87 {
88 return body_text.to_string();
89 }
90
91 config.query.clone()
93 }
94
95 pub async fn check_connection(&self) -> Result<(), CamelError> {
101 let pool =
102 if let (Some(catalog), Some(ds_name)) = (&self.catalog, &self.config.datasource_name) {
103 let handle = catalog.get_pool(ds_name).await?;
104 let arc_pool: Arc<AnyPool> = handle.downcast()?;
105 arc_pool
107 } else {
108 self.pool
109 .get()
110 .ok_or_else(|| {
111 CamelError::ProcessorError("SQL connection pool not initialized".into())
112 })?
113 .clone()
114 };
115
116 debug!("Running health check: SELECT 1");
117 sqlx::query("SELECT 1").execute(&*pool).await.map_err(|e| {
118 warn!(error = %e, "SQL health check failed");
119 CamelError::ProcessorError(format!("SQL health check failed: {}", e))
120 })?;
121
122 debug!("SQL health check passed");
123 Ok(())
124 }
125}
126
127impl Service<Exchange> for SqlProducer {
128 type Response = Exchange;
129 type Error = CamelError;
130 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
131
132 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
133 if self.stopped.load(Ordering::Relaxed) {
134 return Poll::Ready(Err(CamelError::ProcessorError(
135 "SQL producer stopped".into(),
136 )));
137 }
138 if let Some(pool) = self.pool.get()
139 && pool.is_closed()
140 {
141 return Poll::Ready(Err(CamelError::ProcessorError(
142 "SQL connection pool is closed".into(),
143 )));
144 }
145 Poll::Ready(Ok(()))
146 }
147
148 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
149 let mut config = self.config.clone();
150 let pool_cell = Arc::clone(&self.pool);
151 let catalog = self.catalog.clone();
152 let runtime = Arc::clone(&self.runtime);
153 let route_id = self.route_id.clone();
154
155 Box::pin(async move {
156 config.resolve_defaults();
158 config.resolve_file_query().await?;
159
160 let pool: &Arc<AnyPool> = pool_cell
162 .get_or_try_init(|| async {
163 let ds_name = config.datasource_name.clone();
165 if let (Some(ref cat), Some(ref name)) = (catalog, ds_name) {
166 let handle = cat.get_pool(name).await?;
167 return handle.downcast::<AnyPool>();
168 }
169
170 let db_url = enrich_db_url_with_ssl(&config.db_url, &config)?;
171
172 sqlx::any::install_default_drivers();
175
176 let max_conn = config.max_connections.ok_or_else(|| {
177 CamelError::Config("max_connections not resolved for SQL pool".into())
178 })?;
179 let min_conn = config.min_connections.ok_or_else(|| {
180 CamelError::Config("min_connections not resolved for SQL pool".into())
181 })?;
182 let idle_timeout = config.idle_timeout_secs.ok_or_else(|| {
183 CamelError::Config("idle_timeout_secs not resolved for SQL pool".into())
184 })?;
185 let max_lifetime = config.max_lifetime_secs.ok_or_else(|| {
186 CamelError::Config("max_lifetime_secs not resolved for SQL pool".into())
187 })?;
188
189 info!(
190 db_url = %redact_db_url(&config.db_url),
191 "SQL producer pool initializing"
192 );
193 let retry_policy = &config.retry;
194 let pool = retry_async::<_, _, _, _, sqlx::Error>(
195 retry_policy,
196 Some("sql-producer"),
197 || {
198 async {
199 PoolOptions::new()
200 .max_connections(max_conn)
201 .min_connections(min_conn)
202 .idle_timeout(Duration::from_secs(idle_timeout))
203 .max_lifetime(Duration::from_secs(max_lifetime))
204 .connect(&db_url)
205 .await
206 }
207 },
208 is_retryable_sqlx_error,
209 )
210 .await
211 .map_err(|e| {
212 runtime.health().force_unhealthy_for_route(
213 &route_id,
214 "g:sql:producer-pool-init",
215 &e.to_string(),
216 );
217 error!(error = %e, db_url = %redact_db_url(&config.db_url), "Failed to connect to database");
219 CamelError::EndpointCreationFailed(format!(
220 "Failed to connect to database: {}",
221 e
222 ))
223 })?;
224 Ok(Arc::new(pool))
225 })
226 .await
227 .map_err(|e: CamelError| e.clone())?;
228
229 let query_str = Self::resolve_query_source(&exchange, &config);
231
232 if config.transaction_mode == crate::config::TransactionMode::Managed {
234 warn!("transactionManager not yet implemented; using Auto mode");
235 }
236
237 debug!(
238 query = %query_str,
239 "executing SQL query"
240 );
241
242 if config.batch {
244 execute_batch(pool.as_ref(), &config, &mut exchange).await?;
246 } else if config.use_placeholder {
247 let template = parse_query_template(&query_str, config.placeholder)?;
249 let mut prepared = resolve_params(&template, &exchange, &config.in_separator)?;
250
251 if let Some(params_value) = exchange.input.header(headers::PARAMETERS) {
253 if let Some(arr) = params_value.as_array() {
254 if arr.len() != prepared.bindings.len() {
255 warn!(
256 expected = prepared.bindings.len(),
257 got = arr.len(),
258 header = headers::PARAMETERS,
259 "Parameter count mismatch — SQL has {} placeholders but header provides {} values",
260 prepared.bindings.len(),
261 arr.len()
262 );
263 }
264 debug!(
265 "Overriding bindings from {} header with {} parameters",
266 headers::PARAMETERS,
267 arr.len()
268 );
269 prepared.bindings = arr.clone();
270 } else {
271 warn!(
272 header = headers::PARAMETERS,
273 "Header is present but not a JSON array — ignoring parameter override"
274 );
275 }
276 }
277
278 debug!(
279 "Executing prepared SQL ({} bindings)",
280 prepared.bindings.len()
281 );
282
283 if is_select_query(&prepared.sql) {
284 execute_select(pool.as_ref(), &prepared, &config, &mut exchange).await?;
285 } else {
286 execute_modify(pool.as_ref(), &prepared, &config, &mut exchange).await?;
287 }
288 } else {
289 debug!("Executing raw SQL (placeholder processing disabled)");
291 let prepared = PreparedQuery {
292 sql: query_str,
293 bindings: vec![],
294 };
295
296 if is_select_query(&prepared.sql) {
297 execute_select(pool.as_ref(), &prepared, &config, &mut exchange).await?;
298 } else {
299 execute_modify(pool.as_ref(), &prepared, &config, &mut exchange).await?;
300 }
301 }
302
303 Ok(exchange)
304 })
305 }
306}
307
308async fn execute_select(
310 pool: &AnyPool,
311 prepared: &PreparedQuery,
312 config: &SqlEndpointConfig,
313 exchange: &mut Exchange,
314) -> Result<(), CamelError> {
315 match config.output_type {
316 SqlOutputType::SelectOne => {
317 let mut query = sqlx::query(&prepared.sql);
319 query = bind_json_values(query, &prepared.bindings);
320
321 let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
322 warn!(error = %e, "SQL query failed");
323 CamelError::ProcessorError(format!("Query execution failed: {}", e))
324 })?;
325
326 let count = rows.len();
327 debug!(rows = count, "SQL query completed");
328 let json_rows: Vec<serde_json::Value> = rows
329 .iter()
330 .map(row_to_json)
331 .collect::<Result<Vec<_>, _>>()?;
332
333 if let Some(first_row) = json_rows.into_iter().next() {
334 exchange.input.body = Body::Json(first_row);
335 } else {
336 exchange.input.body = Body::Empty;
337 }
338 debug!("SelectOne returned {} row", if count > 0 { 1 } else { 0 });
339 exchange
340 .input
341 .set_header(headers::ROW_COUNT, serde_json::json!(count));
342 }
343 SqlOutputType::SelectList => {
344 let mut query = sqlx::query(&prepared.sql);
346 query = bind_json_values(query, &prepared.bindings);
347
348 let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
349 warn!(error = %e, "SQL query failed");
350 CamelError::ProcessorError(format!("Query execution failed: {}", e))
351 })?;
352
353 let count = rows.len();
354 debug!(rows = count, "SQL query completed");
355 let json_rows: Vec<serde_json::Value> = rows
356 .iter()
357 .map(row_to_json)
358 .collect::<Result<Vec<_>, _>>()?;
359
360 exchange.input.body = Body::Json(serde_json::Value::Array(json_rows));
361 debug!("SelectList returned {} rows", count);
362 exchange
363 .input
364 .set_header(headers::ROW_COUNT, serde_json::json!(count));
365 }
366 SqlOutputType::StreamList => {
367 use futures::TryStreamExt;
369
370 let pool_clone = pool.clone();
371 let sql_str = prepared.sql.clone();
372 let bindings = prepared.bindings.clone();
373
374 let byte_stream = async_stream::try_stream! {
376 let mut q = sqlx::query(&sql_str);
377 q = bind_json_values(q, &bindings);
378 let mut rows = q.fetch(&pool_clone);
379 while let Some(row) = rows.try_next().await.map_err(|e| {
380 CamelError::ProcessorError(format!("Query execution failed: {}", e))
381 })? {
382 let json_val = row_to_json(&row).map_err(|e| {
383 CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
384 })?;
385 let mut bytes = serde_json::to_vec(&json_val)
386 .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
387 bytes.push(b'\n');
388 yield Bytes::from(bytes);
389 }
390 };
391
392 exchange.input.body = Body::Stream(StreamBody {
393 stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
394 metadata: StreamMetadata {
395 content_type: Some("application/x-ndjson".to_string()),
396 size_hint: None,
397 origin: None,
398 },
399 });
400 debug!("StreamList: created lazy stream (rows fetched on demand)");
401 }
403 }
404
405 Ok(())
406}
407
408async fn execute_modify(
410 pool: &AnyPool,
411 prepared: &PreparedQuery,
412 config: &SqlEndpointConfig,
413 exchange: &mut Exchange,
414) -> Result<(), CamelError> {
415 let mut query = sqlx::query(&prepared.sql);
416 query = bind_json_values(query, &prepared.bindings);
417
418 let result = query.execute(pool).await.map_err(|e| {
419 warn!(error = %e, "SQL query failed");
420 CamelError::ProcessorError(format!("Query execution failed: {}", e))
421 })?;
422
423 let rows_affected = result.rows_affected();
424
425 if let Some(expected) = config.expected_update_count
427 && rows_affected as i64 != expected
428 {
429 warn!(expected, actual = rows_affected, "Row count mismatch");
430 return Err(CamelError::ProcessorError(format!(
431 "Expected {} rows affected, got {}",
432 expected, rows_affected
433 )));
434 }
435
436 exchange
437 .input
438 .set_header(headers::UPDATE_COUNT, serde_json::json!(rows_affected));
439
440 if config.noop {
441 } else {
443 exchange.input.body = Body::Json(json!({ "rowsAffected": rows_affected }));
444 }
445
446 debug!(rows = rows_affected, "SQL modify query completed");
447
448 Ok(())
449}
450
451async fn execute_batch(
453 pool: &AnyPool,
454 config: &SqlEndpointConfig,
455 exchange: &mut Exchange,
456) -> Result<(), CamelError> {
457 let body_json = match &exchange.input.body {
459 Body::Json(val) => val,
460 _ => {
461 return Err(CamelError::ProcessorError(
462 "Batch mode requires body to be a JSON array of arrays".to_string(),
463 ));
464 }
465 };
466
467 let batch_data = body_json
468 .as_array()
469 .ok_or_else(|| {
470 CamelError::ProcessorError("Batch mode requires body to be a JSON array".to_string())
471 })?
472 .clone();
473
474 let template = parse_query_template(&config.query, config.placeholder)?;
476
477 let mut tx = pool.begin().await.map_err(|e| {
479 warn!(error = %e, "Failed to begin transaction");
482 CamelError::ProcessorError(format!("Failed to begin transaction: {}", e))
483 })?;
484
485 let mut total_rows_affected: u64 = 0;
486
487 for (batch_idx, params_array) in batch_data.into_iter().enumerate() {
488 params_array.as_array().ok_or_else(|| {
490 CamelError::ProcessorError(format!(
491 "Batch item at index {} must be a JSON array of parameters",
492 batch_idx
493 ))
494 })?;
495
496 let temp_msg = Message::new(Body::Json(params_array.clone()));
498 let temp_exchange = Exchange::new(temp_msg);
499
500 let prepared = resolve_params(&template, &temp_exchange, &config.in_separator)?;
502
503 let mut query = sqlx::query(&prepared.sql);
505 query = bind_json_values(query, &prepared.bindings);
506
507 let result = query.execute(&mut *tx).await.map_err(|e| {
508 warn!("Batch query execution failed at index {}: {}", batch_idx, e);
511 CamelError::ProcessorError(format!("Batch query execution failed: {}", e))
512 })?;
513
514 if let Some(expected) = config.expected_update_count
516 && result.rows_affected() as i64 != expected
517 {
518 warn!(
521 "Batch item {}: expected {} rows affected, got {}",
522 batch_idx,
523 expected,
524 result.rows_affected()
525 );
526 return Err(CamelError::ProcessorError(format!(
527 "Batch item {}: expected {} rows affected, got {}",
528 batch_idx,
529 expected,
530 result.rows_affected()
531 )));
532 }
533
534 total_rows_affected += result.rows_affected();
535 }
536
537 tx.commit().await.map_err(|e| {
539 warn!(error = %e, "Failed to commit transaction");
542 CamelError::ProcessorError(format!("Failed to commit transaction: {}", e))
543 })?;
544
545 exchange.input.set_header(
546 headers::UPDATE_COUNT,
547 serde_json::json!(total_rows_affected),
548 );
549
550 debug!(
551 "Batch execution completed, total rows affected: {}",
552 total_rows_affected
553 );
554
555 Ok(())
556}
557
558#[cfg(test)]
559mod tests {
560 use super::*;
561 use camel_api::MetricsCollector;
562 use camel_component_api::HealthCheckRegistry;
563 use camel_component_api::test_support::PanicRuntimeObservability;
564 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
565 std::sync::Arc::new(PanicRuntimeObservability)
566 }
567 use camel_component_api::Message;
568 use camel_component_api::UriConfig;
569 use sqlx::any::AnyPoolOptions;
570 use std::sync::Arc;
571 use std::sync::Mutex;
572 use tokio::sync::OnceCell;
573
574 #[derive(Debug, Default)]
577 struct RecordingHealth {
578 forced: Mutex<Vec<(String, String, String)>>,
579 }
580
581 impl HealthCheckRegistry for RecordingHealth {
582 fn force_unhealthy_for_route(&self, route_id: &str, name: &str, reason: &str) {
583 self.forced.lock().unwrap().push((
584 route_id.to_string(),
585 name.to_string(),
586 reason.to_string(),
587 ));
588 }
589 }
590
591 struct NoopMetrics;
592
593 impl MetricsCollector for NoopMetrics {
594 fn record_exchange_duration(&self, _: &str, _: Duration) {}
595 fn increment_errors(&self, _: &str, _: &str) {}
596 fn increment_exchanges(&self, _: &str) {}
597 fn set_queue_depth(&self, _: &str, _: usize) {}
598 fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
599 }
600
601 struct RecordingRuntime {
602 health: Arc<RecordingHealth>,
603 }
604
605 impl RuntimeObservability for RecordingRuntime {
606 fn metrics(&self) -> Arc<dyn MetricsCollector> {
607 Arc::new(NoopMetrics)
608 }
609 fn health(&self) -> Arc<dyn HealthCheckRegistry> {
610 self.health.clone()
611 }
612 }
613
614 async fn sqlite_pool() -> AnyPool {
615 sqlx::any::install_default_drivers();
616 AnyPoolOptions::new()
617 .max_connections(1)
618 .connect("sqlite::memory:")
619 .await
620 .expect("sqlite pool")
621 }
622
623 async fn seed_items_table(pool: &AnyPool) {
624 sqlx::query(
625 "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, done INTEGER DEFAULT 0)",
626 )
627 .execute(pool)
628 .await
629 .expect("create table");
630 sqlx::query("INSERT INTO items (id, name, done) VALUES (1, 'a', 0), (2, 'b', 0)")
631 .execute(pool)
632 .await
633 .expect("seed rows");
634 }
635
636 fn config() -> SqlEndpointConfig {
637 let mut c =
638 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
639 c.resolve_defaults();
640 c
641 }
642
643 #[test]
644 fn producer_clone_shares_pool() {
645 let p1 = SqlProducer::new(
646 config(),
647 Arc::new(OnceCell::new()),
648 None,
649 test_rt(),
650 "sql-producer-test-route",
651 );
652 let p2 = p1.clone();
653 assert!(Arc::ptr_eq(&p1.pool, &p2.pool));
654 assert!(Arc::ptr_eq(&p1.stopped, &p2.stopped));
655 }
656
657 #[test]
658 fn resolve_query_from_config() {
659 let config = config();
660 let ex = Exchange::new(Message::default());
661 let q = SqlProducer::resolve_query_source(&ex, &config);
662 assert_eq!(q, "select 1");
663 }
664
665 #[test]
666 fn resolve_query_from_header() {
667 let config = config();
668 let mut msg = Message::default();
669 msg.set_header(headers::QUERY, serde_json::json!("select 2"));
670 let ex = Exchange::new(msg);
671 let q = SqlProducer::resolve_query_source(&ex, &config);
672 assert_eq!(q, "select 2");
673 }
674
675 #[test]
676 fn resolve_query_from_body() {
677 let mut config = config();
678 config.use_message_body_for_sql = true;
679 let msg = Message::new(Body::Text("select 3".to_string()));
680 let ex = Exchange::new(msg);
681 let q = SqlProducer::resolve_query_source(&ex, &config);
682 assert_eq!(q, "select 3");
683 }
684
685 #[test]
686 fn resolve_query_header_priority_over_body() {
687 let mut config = config();
688 config.use_message_body_for_sql = true;
689 let mut msg = Message::new(Body::Text("select from body".to_string()));
690 msg.set_header(headers::QUERY, serde_json::json!("select from header"));
691 let ex = Exchange::new(msg);
692 let q = SqlProducer::resolve_query_source(&ex, &config);
693 assert_eq!(q, "select from header");
694 }
695
696 #[test]
697 fn resolve_query_body_priority_over_config() {
698 let mut config = config();
699 config.use_message_body_for_sql = true;
700 let msg = Message::new(Body::Text("select from body".to_string()));
701 let ex = Exchange::new(msg);
702 let q = SqlProducer::resolve_query_source(&ex, &config);
703 assert_eq!(q, "select from body");
704 }
705
706 #[test]
707 fn bind_json_null() {
708 let query = sqlx::query("SELECT ?");
709 let values = vec![serde_json::Value::Null];
710 let _bound = bind_json_values(query, &values);
711 }
713
714 #[test]
715 fn bind_json_bool() {
716 let query = sqlx::query("SELECT ?");
717 let values = vec![serde_json::Value::Bool(true)];
718 let _bound = bind_json_values(query, &values);
719 }
720
721 #[test]
722 fn bind_json_number_i64() {
723 let query = sqlx::query("SELECT ?");
724 let values = vec![serde_json::json!(42)];
725 let _bound = bind_json_values(query, &values);
726 }
727
728 #[test]
729 fn bind_json_number_f64() {
730 let query = sqlx::query("SELECT ?");
731 let values = vec![serde_json::json!(std::f64::consts::PI)];
732 let _bound = bind_json_values(query, &values);
733 }
734
735 #[test]
736 fn bind_json_string() {
737 let query = sqlx::query("SELECT ?");
738 let values = vec![serde_json::json!("hello world")];
739 let _bound = bind_json_values(query, &values);
740 }
741
742 #[test]
743 fn bind_json_array() {
744 let query = sqlx::query("SELECT ?");
745 let values = vec![serde_json::json!([1, 2, 3])];
746 let _bound = bind_json_values(query, &values);
747 }
748
749 #[test]
750 fn bind_json_object() {
751 let query = sqlx::query("SELECT ?");
752 let values = vec![serde_json::json!({"key": "value"})];
753 let _bound = bind_json_values(query, &values);
754 }
755
756 #[test]
757 fn bind_multiple_values() {
758 let query = sqlx::query("SELECT ?, ?, ?");
759 let values = vec![
760 serde_json::json!(1),
761 serde_json::json!("test"),
762 serde_json::Value::Null,
763 ];
764 let _bound = bind_json_values(query, &values);
765 }
766
767 #[test]
769 fn expected_update_count_validation() {
770 let config = SqlEndpointConfig::from_uri(
772 "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=5",
773 )
774 .unwrap();
775 assert_eq!(config.expected_update_count, Some(5));
776
777 let config_default = self::config();
779 assert_eq!(config_default.expected_update_count, None);
780
781 let config_neg = SqlEndpointConfig::from_uri(
783 "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=-1",
784 )
785 .unwrap();
786 assert_eq!(config_neg.expected_update_count, Some(-1));
787 }
788
789 #[test]
791 fn parameters_header_override_logic() {
792 let mut prepared = PreparedQuery {
794 sql: "SELECT * FROM t WHERE id = $1".to_string(),
795 bindings: vec![serde_json::json!(42)],
796 };
797
798 let header_params = serde_json::json!([99, "extra"]);
800 if let Some(arr) = header_params.as_array() {
801 prepared.bindings = arr.clone();
802 }
803
804 assert_eq!(prepared.bindings.len(), 2);
806 assert_eq!(prepared.bindings[0], serde_json::json!(99));
807 assert_eq!(prepared.bindings[1], serde_json::json!("extra"));
808
809 let mut prepared2 = PreparedQuery {
811 sql: "SELECT * FROM t WHERE id = $1".to_string(),
812 bindings: vec![serde_json::json!(42)],
813 };
814 let header_non_array = serde_json::json!({"not": "an array"});
815 if let Some(arr) = header_non_array.as_array() {
816 prepared2.bindings = arr.clone();
817 }
818 assert_eq!(prepared2.bindings.len(), 1);
820 assert_eq!(prepared2.bindings[0], serde_json::json!(42));
821 }
822
823 #[tokio::test]
824 async fn execute_select_one_sets_body_and_row_count() {
825 let pool = sqlite_pool().await;
826 seed_items_table(&pool).await;
827
828 let mut config = SqlEndpointConfig::from_uri(
829 "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=SelectOne",
830 )
831 .unwrap();
832 config.resolve_defaults();
833
834 let prepared = PreparedQuery {
835 sql: "select id, name from items order by id".to_string(),
836 bindings: vec![],
837 };
838 let mut exchange = Exchange::new(Message::default());
839
840 execute_select(&pool, &prepared, &config, &mut exchange)
841 .await
842 .expect("select one");
843
844 assert_eq!(exchange.input.header(headers::ROW_COUNT), Some(&json!(2)));
845 assert_eq!(
846 exchange.input.body,
847 Body::Json(json!({"id": 1, "name": "a"}))
848 );
849 }
850
851 #[tokio::test]
852 async fn execute_stream_list_materializes_ndjson() {
853 let pool = sqlite_pool().await;
854 seed_items_table(&pool).await;
855
856 let mut config = SqlEndpointConfig::from_uri(
857 "sql:select id from items order by id?db_url=sqlite::memory:&outputType=StreamList",
858 )
859 .unwrap();
860 config.resolve_defaults();
861
862 let prepared = PreparedQuery {
863 sql: "select id from items order by id".to_string(),
864 bindings: vec![],
865 };
866 let mut exchange = Exchange::new(Message::default());
867
868 execute_select(&pool, &prepared, &config, &mut exchange)
869 .await
870 .expect("stream list");
871
872 let bytes = exchange
873 .input
874 .body
875 .clone()
876 .into_bytes(1024)
877 .await
878 .expect("stream bytes");
879 let text = String::from_utf8(bytes.to_vec()).expect("utf8");
880 assert!(text.contains("{\"id\":1}"));
881 assert!(text.contains("{\"id\":2}"));
882 assert_eq!(exchange.input.header(headers::ROW_COUNT), None);
883 }
884
885 #[tokio::test]
886 async fn execute_modify_expected_update_count_mismatch_returns_error() {
887 let pool = sqlite_pool().await;
888 seed_items_table(&pool).await;
889
890 let mut config = SqlEndpointConfig::from_uri(
891 "sql:update items set done=1 where id = #?db_url=sqlite::memory:&expectedUpdateCount=2",
892 )
893 .unwrap();
894 config.resolve_defaults();
895
896 let prepared = PreparedQuery {
897 sql: "update items set done=1 where id = $1".to_string(),
898 bindings: vec![json!(1)],
899 };
900 let mut exchange = Exchange::new(Message::default());
901
902 let err = execute_modify(&pool, &prepared, &config, &mut exchange)
903 .await
904 .expect_err("must fail due expected row count mismatch");
905 assert!(err.to_string().contains("Expected 2 rows affected, got 1"));
906 }
907
908 #[tokio::test]
909 async fn execute_batch_rollback_when_any_item_fails_expected_count() {
910 let pool = sqlite_pool().await;
911 seed_items_table(&pool).await;
912
913 let mut config = SqlEndpointConfig::from_uri(
914 "sql:update items set done=1 where id = #?db_url=sqlite::memory:&batch=true&expectedUpdateCount=1",
915 )
916 .unwrap();
917 config.resolve_defaults();
918
919 let mut exchange = Exchange::new(Message::new(Body::Json(json!([[1], [999]]))));
920
921 let err = execute_batch(&pool, &config, &mut exchange)
922 .await
923 .expect_err("second batch item should fail expectedUpdateCount");
924 assert!(
925 err.to_string()
926 .contains("Batch item 1: expected 1 rows affected, got 0")
927 );
928
929 let row = sqlx::query("select done from items where id = 1")
930 .fetch_one(&pool)
931 .await
932 .expect("query row");
933 let done: i64 = sqlx::Row::try_get(&row, 0).expect("done column");
934 assert_eq!(done, 0, "transaction must rollback first update");
935 }
936
937 #[tokio::test]
943 async fn producer_no_panic_without_prior_resolve_defaults() {
944 let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
946 assert!(config.max_connections.is_none());
947
948 let mut producer = SqlProducer::new(
949 config,
950 Arc::new(OnceCell::new()),
951 None,
952 test_rt(),
953 "sql-producer-test-route",
954 );
955 let exchange = Exchange::new(Message::default());
956
957 let result = producer.call(exchange).await;
959 assert!(
960 result.is_ok(),
961 "Producer should initialize pool without panic, got: {:?}",
962 result
963 );
964 }
965
966 #[tokio::test]
968 async fn producer_pool_init_returns_config_error_for_invalid_db() {
969 let mut config = SqlEndpointConfig::from_uri(
973 "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false",
974 )
975 .unwrap();
976 config.max_connections = Some(1);
978 config.min_connections = Some(0);
979 config.idle_timeout_secs = Some(300);
980 config.max_lifetime_secs = Some(1800);
981
982 let health = Arc::new(RecordingHealth::default());
983 let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntime {
984 health: health.clone(),
985 });
986 let mut producer = SqlProducer::new(
987 config,
988 Arc::new(OnceCell::new()),
989 None,
990 rt,
991 "sql-producer-test-route",
992 );
993 let exchange = Exchange::new(Message::default());
994
995 let result = producer.call(exchange).await;
996 assert!(result.is_err());
997 let err_msg = result.unwrap_err().to_string();
999 assert!(
1000 err_msg.contains("Failed to connect") || err_msg.contains("database"),
1001 "Expected connection error, got: {}",
1002 err_msg
1003 );
1004 let forced = health.forced.lock().unwrap();
1006 assert_eq!(
1007 forced.len(),
1008 1,
1009 "expected one force_unhealthy_for_route call"
1010 );
1011 assert_eq!(forced[0].0, "sql-producer-test-route");
1012 assert_eq!(forced[0].1, "g:sql:producer-pool-init");
1013 }
1014
1015 #[test]
1017 fn poll_ready_returns_ready_for_uninitialized_pool() {
1018 let config = {
1019 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1020 c.resolve_defaults();
1021 c
1022 };
1023 let mut producer = SqlProducer::new(
1024 config,
1025 Arc::new(OnceCell::new()),
1026 None,
1027 test_rt(),
1028 "sql-producer-test-route",
1029 );
1030 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1031 let result = producer.poll_ready(&mut cx);
1032 assert!(matches!(result, Poll::Ready(Ok(()))));
1033 }
1034
1035 #[test]
1037 fn poll_ready_returns_error_when_stopped() {
1038 let config = {
1039 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1040 c.resolve_defaults();
1041 c
1042 };
1043 let mut producer = SqlProducer::new(
1044 config,
1045 Arc::new(OnceCell::new()),
1046 None,
1047 test_rt(),
1048 "sql-producer-test-route",
1049 );
1050 producer.stop();
1051 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1052 let result = producer.poll_ready(&mut cx);
1053 assert!(matches!(result, Poll::Ready(Err(_))));
1054 let err_msg = match result {
1055 Poll::Ready(Err(e)) => e.to_string(),
1056 _ => unreachable!(),
1057 };
1058 assert!(err_msg.contains("SQL producer stopped"));
1059 }
1060
1061 #[tokio::test]
1063 async fn poll_ready_returns_error_when_pool_closed() {
1064 let pool = sqlite_pool().await;
1065 pool.close().await;
1066
1067 let config = {
1068 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1069 c.resolve_defaults();
1070 c
1071 };
1072 let pool_cell = Arc::new(OnceCell::new());
1073 pool_cell.set(Arc::new(pool)).unwrap();
1074
1075 let mut producer = SqlProducer::new(
1076 config,
1077 pool_cell,
1078 None,
1079 test_rt(),
1080 "sql-producer-test-route",
1081 );
1082 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1083 let result = producer.poll_ready(&mut cx);
1084 assert!(matches!(result, Poll::Ready(Err(_))));
1085 let err_msg = match result {
1086 Poll::Ready(Err(e)) => e.to_string(),
1087 _ => unreachable!(),
1088 };
1089 assert!(err_msg.contains("SQL connection pool is closed"));
1090 }
1091
1092 #[tokio::test]
1094 async fn poll_ready_returns_ok_for_healthy_pool() {
1095 let pool = sqlite_pool().await;
1096
1097 let config = {
1098 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1099 c.resolve_defaults();
1100 c
1101 };
1102 let pool_cell = Arc::new(OnceCell::new());
1103 pool_cell.set(Arc::new(pool)).unwrap();
1104
1105 let mut producer = SqlProducer::new(
1106 config,
1107 pool_cell,
1108 None,
1109 test_rt(),
1110 "sql-producer-test-route",
1111 );
1112 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1113 let result = producer.poll_ready(&mut cx);
1114 assert!(matches!(result, Poll::Ready(Ok(()))));
1115 }
1116
1117 #[tokio::test]
1119 async fn test_sql_stop_closes_pool() {
1120 let pool = sqlite_pool().await;
1121
1122 let config = {
1123 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1124 c.resolve_defaults();
1125 c
1126 };
1127 let pool_cell = Arc::new(OnceCell::new());
1128 pool_cell.set(Arc::new(pool.clone())).unwrap();
1129
1130 let producer = SqlProducer::new(
1131 config,
1132 pool_cell.clone(),
1133 None,
1134 test_rt(),
1135 "sql-producer-test-route",
1136 );
1137 assert!(!pool.is_closed(), "Pool should be open before stop");
1138
1139 producer.stop();
1140
1141 tokio::time::sleep(Duration::from_millis(100)).await;
1143
1144 assert!(
1145 pool.is_closed(),
1146 "Pool should be closed after producer.stop()"
1147 );
1148
1149 let mut producer2 = SqlProducer::new(
1151 {
1152 let mut c =
1153 SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1154 c.resolve_defaults();
1155 c
1156 },
1157 pool_cell.clone(),
1158 None,
1159 test_rt(),
1160 "sql-producer-test-route",
1161 );
1162 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1163 let result = producer2.poll_ready(&mut cx);
1164 assert!(
1165 matches!(result, Poll::Ready(Err(_))),
1166 "poll_ready should fail after pool closed"
1167 );
1168 }
1169
1170 #[tokio::test]
1172 async fn use_placeholder_false_executes_raw_sql() {
1173 let pool = sqlite_pool().await;
1174 seed_items_table(&pool).await;
1175
1176 let mut config = SqlEndpointConfig::from_uri(
1177 "sql:select id, name from items order by id?db_url=sqlite::memory:&usePlaceholder=false",
1178 )
1179 .unwrap();
1180 config.resolve_defaults();
1181 assert!(!config.use_placeholder);
1182
1183 let mut producer = SqlProducer::new(
1184 config,
1185 Arc::new(OnceCell::new()),
1186 None,
1187 test_rt(),
1188 "sql-producer-test-route",
1189 );
1190 producer.pool.set(Arc::new(pool.clone())).unwrap();
1192
1193 let exchange = Exchange::new(Message::default());
1194 let result = producer.call(exchange).await;
1195 assert!(result.is_ok());
1196 let exchange = result.unwrap();
1197 assert!(matches!(exchange.input.body, Body::Json(_)));
1199 }
1200
1201 #[tokio::test]
1203 async fn use_placeholder_true_processes_placeholders() {
1204 let pool = sqlite_pool().await;
1205 seed_items_table(&pool).await;
1206
1207 let mut config = SqlEndpointConfig::from_uri(
1208 "sql:select id, name from items where id = #?db_url=sqlite::memory:",
1209 )
1210 .unwrap();
1211 config.resolve_defaults();
1212 assert!(config.use_placeholder);
1213
1214 let mut producer = SqlProducer::new(
1215 config,
1216 Arc::new(OnceCell::new()),
1217 None,
1218 test_rt(),
1219 "sql-producer-test-route",
1220 );
1221 producer.pool.set(Arc::new(pool.clone())).unwrap();
1222
1223 let msg = Message::new(Body::Json(json!([1])));
1224 let exchange = Exchange::new(msg);
1225 let result = producer.call(exchange).await;
1226 assert!(result.is_ok());
1227 }
1228
1229 #[tokio::test]
1233 async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
1234 use camel_component_api::NetworkRetryPolicy;
1235 use std::sync::Arc;
1236 use std::sync::atomic::{AtomicU32, Ordering};
1237
1238 let policy = NetworkRetryPolicy {
1239 max_attempts: 3,
1240 initial_delay: Duration::from_millis(1),
1241 max_delay: Duration::from_millis(1),
1242 multiplier: 1.0,
1243 ..NetworkRetryPolicy::default()
1244 };
1245
1246 let calls = Arc::new(AtomicU32::new(0));
1247 let calls_clone = Arc::clone(&calls);
1248
1249 let mut attempt: u32 = 0;
1250 let _result: Result<(), ()> = loop {
1251 attempt += 1;
1252 calls_clone.fetch_add(1, Ordering::SeqCst);
1253 let op_result: Result<(), ()> = Err(());
1254 match op_result {
1255 Ok(v) => break Ok(v),
1256 Err(_) if policy.should_retry(attempt) => {
1257 let delay = policy.delay_for(attempt - 1);
1258 tokio::time::sleep(delay).await;
1259 continue;
1260 }
1261 Err(_) => break Err(()),
1262 }
1263 };
1264
1265 assert_eq!(
1266 calls.load(Ordering::SeqCst),
1267 3,
1268 "max_attempts=3 must yield exactly 3 invocations"
1269 );
1270 }
1271
1272 #[tokio::test]
1277 async fn producer_pool_init_failure_calls_force_unhealthy_for_route() {
1278 let health = Arc::new(RecordingHealth::default());
1279 let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntime {
1280 health: health.clone(),
1281 });
1282
1283 let mut config = SqlEndpointConfig::from_uri(
1284 "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false",
1285 )
1286 .unwrap();
1287 config.max_connections = Some(1);
1288 config.min_connections = Some(0);
1289 config.idle_timeout_secs = Some(300);
1290 config.max_lifetime_secs = Some(1800);
1291
1292 let mut producer = SqlProducer::new(
1293 config,
1294 Arc::new(OnceCell::new()),
1295 None,
1296 rt,
1297 "sql-producer-test-route",
1298 );
1299 let exchange = Exchange::new(Message::default());
1300
1301 let result = producer.call(exchange).await;
1302 assert!(result.is_err());
1303
1304 let forced = health.forced.lock().unwrap();
1305 assert_eq!(
1306 forced.len(),
1307 1,
1308 "expected one force_unhealthy_for_route call"
1309 );
1310 assert_eq!(forced[0].0, "sql-producer-test-route");
1311 assert_eq!(forced[0].1, "g:sql:producer-pool-init");
1312 assert!(!forced[0].2.is_empty(), "reason should be non-empty");
1313 }
1314}