1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use bytes::Bytes;
9use serde_json::json;
10use sqlx::AnyPool;
11use sqlx::any::AnyRow;
12use sqlx::pool::PoolOptions;
13use tokio::sync::OnceCell;
14use tower::Service;
15use tracing::{debug, error, info, warn};
16
17use crate::config::{SqlEndpointConfig, SqlOutputType, enrich_db_url_with_ssl, redact_db_url};
18use crate::headers;
19use crate::query::{PreparedQuery, is_select_query, parse_query_template, resolve_params};
20use crate::utils::{bind_json_values, is_retryable_sqlx_error, row_to_json};
21use camel_component_api::retry_async;
22use camel_component_api::{
23 Body, CamelError, Exchange, Message, RuntimeObservability, StreamBody, StreamMetadata,
24};
25
26#[derive(Clone)]
27pub struct SqlProducer {
28 pub(crate) config: SqlEndpointConfig,
29 pub(crate) pool: Arc<OnceCell<AnyPool>>,
30 pub(crate) stopped: Arc<AtomicBool>,
31 pub(crate) runtime: Arc<dyn RuntimeObservability>,
32 pub(crate) route_id: String,
34}
35
36impl SqlProducer {
37 pub fn new(
38 config: SqlEndpointConfig,
39 pool: Arc<OnceCell<AnyPool>>,
40 runtime: Arc<dyn RuntimeObservability>,
41 route_id: impl Into<String>,
42 ) -> Self {
43 Self {
44 config,
45 pool,
46 stopped: Arc::new(AtomicBool::new(false)),
47 runtime,
48 route_id: route_id.into(),
49 }
50 }
51
52 pub fn stop(&self) {
53 self.stopped.store(true, Ordering::Relaxed);
54 if let Some(pool) = self.pool.get() {
56 let pool = pool.clone();
57 tokio::spawn(async move {
58 if tokio::time::timeout(Duration::from_secs(5), pool.close())
59 .await
60 .is_err()
61 {
62 tracing::warn!("SQL producer pool did not close within 5s");
63 }
64 });
65 }
66 }
67
68 pub(crate) fn resolve_query_source(exchange: &Exchange, config: &SqlEndpointConfig) -> String {
73 if let Some(query_value) = exchange.input.header(headers::QUERY)
75 && let Some(query_str) = query_value.as_str()
76 {
77 return query_str.to_string();
78 }
79
80 if config.use_message_body_for_sql
82 && let Some(body_text) = exchange.input.body.as_text()
83 {
84 return body_text.to_string();
85 }
86
87 config.query.clone()
89 }
90
91 pub async fn check_connection(&self) -> Result<(), CamelError> {
97 let pool = self.pool.get().ok_or_else(|| {
98 CamelError::ProcessorError("SQL connection pool not initialized".into())
99 })?;
100
101 debug!("Running health check: SELECT 1");
102 sqlx::query("SELECT 1").execute(pool).await.map_err(|e| {
103 warn!(error = %e, "SQL health check failed");
104 CamelError::ProcessorError(format!("SQL health check failed: {}", e))
105 })?;
106
107 debug!("SQL health check passed");
108 Ok(())
109 }
110}
111
112impl Service<Exchange> for SqlProducer {
113 type Response = Exchange;
114 type Error = CamelError;
115 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
116
117 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
118 if self.stopped.load(Ordering::Relaxed) {
119 return Poll::Ready(Err(CamelError::ProcessorError(
120 "SQL producer stopped".into(),
121 )));
122 }
123 if let Some(pool) = self.pool.get()
124 && pool.is_closed()
125 {
126 return Poll::Ready(Err(CamelError::ProcessorError(
127 "SQL connection pool is closed".into(),
128 )));
129 }
130 Poll::Ready(Ok(()))
131 }
132
133 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
134 let mut config = self.config.clone();
135 let pool_cell = Arc::clone(&self.pool);
136 let runtime = Arc::clone(&self.runtime);
137 let route_id = self.route_id.clone();
138
139 Box::pin(async move {
140 let pool: &AnyPool = pool_cell
142 .get_or_try_init(|| async {
143 config.resolve_defaults();
145 config.resolve_file_query().await?;
147 let db_url = enrich_db_url_with_ssl(&config.db_url, &config)?;
148
149 sqlx::any::install_default_drivers();
152
153 let max_conn = config.max_connections.ok_or_else(|| {
154 CamelError::Config("max_connections not resolved for SQL pool".into())
155 })?;
156 let min_conn = config.min_connections.ok_or_else(|| {
157 CamelError::Config("min_connections not resolved for SQL pool".into())
158 })?;
159 let idle_timeout = config.idle_timeout_secs.ok_or_else(|| {
160 CamelError::Config("idle_timeout_secs not resolved for SQL pool".into())
161 })?;
162 let max_lifetime = config.max_lifetime_secs.ok_or_else(|| {
163 CamelError::Config("max_lifetime_secs not resolved for SQL pool".into())
164 })?;
165
166 info!(
167 db_url = %redact_db_url(&config.db_url),
168 "SQL producer pool initializing"
169 );
170 let retry_policy = &config.retry;
171 retry_async::<_, _, _, _, sqlx::Error>(
172 retry_policy,
173 Some("sql-producer"),
174 || {
175 PoolOptions::new()
176 .max_connections(max_conn)
177 .min_connections(min_conn)
178 .idle_timeout(Duration::from_secs(idle_timeout))
179 .max_lifetime(Duration::from_secs(max_lifetime))
180 .connect(&db_url)
181 },
182 is_retryable_sqlx_error,
183 )
184 .await
185 .map_err(|e| {
186 runtime.health().force_unhealthy_for_route(
187 &route_id,
188 "g:sql:producer-pool-init",
189 &e.to_string(),
190 );
191 error!(error = %e, db_url = %redact_db_url(&config.db_url), "Failed to connect to database");
193 CamelError::EndpointCreationFailed(format!(
194 "Failed to connect to database: {}",
195 e
196 ))
197 })
198 })
199 .await
200 .map_err(|e: CamelError| e.clone())?;
201
202 let query_str = Self::resolve_query_source(&exchange, &config);
204
205 if config.transaction_mode == crate::config::TransactionMode::Managed {
207 warn!("transactionManager not yet implemented; using Auto mode");
208 }
209
210 debug!(
211 query = %query_str,
212 "executing SQL query"
213 );
214
215 if config.batch {
217 execute_batch(pool, &config, &mut exchange).await?;
219 } else if config.use_placeholder {
220 let template = parse_query_template(&query_str, config.placeholder)?;
222 let mut prepared = resolve_params(&template, &exchange, &config.in_separator)?;
223
224 if let Some(params_value) = exchange.input.header(headers::PARAMETERS) {
226 if let Some(arr) = params_value.as_array() {
227 if arr.len() != prepared.bindings.len() {
228 warn!(
229 expected = prepared.bindings.len(),
230 got = arr.len(),
231 header = headers::PARAMETERS,
232 "Parameter count mismatch — SQL has {} placeholders but header provides {} values",
233 prepared.bindings.len(),
234 arr.len()
235 );
236 }
237 debug!(
238 "Overriding bindings from {} header with {} parameters",
239 headers::PARAMETERS,
240 arr.len()
241 );
242 prepared.bindings = arr.clone();
243 } else {
244 warn!(
245 header = headers::PARAMETERS,
246 "Header is present but not a JSON array — ignoring parameter override"
247 );
248 }
249 }
250
251 debug!(
252 "Executing prepared SQL ({} bindings)",
253 prepared.bindings.len()
254 );
255
256 if is_select_query(&prepared.sql) {
257 execute_select(pool, &prepared, &config, &mut exchange).await?;
258 } else {
259 execute_modify(pool, &prepared, &config, &mut exchange).await?;
260 }
261 } else {
262 debug!("Executing raw SQL (placeholder processing disabled)");
264 let prepared = PreparedQuery {
265 sql: query_str,
266 bindings: vec![],
267 };
268
269 if is_select_query(&prepared.sql) {
270 execute_select(pool, &prepared, &config, &mut exchange).await?;
271 } else {
272 execute_modify(pool, &prepared, &config, &mut exchange).await?;
273 }
274 }
275
276 Ok(exchange)
277 })
278 }
279}
280
281async fn execute_select(
283 pool: &AnyPool,
284 prepared: &PreparedQuery,
285 config: &SqlEndpointConfig,
286 exchange: &mut Exchange,
287) -> Result<(), CamelError> {
288 match config.output_type {
289 SqlOutputType::SelectOne => {
290 let mut query = sqlx::query(&prepared.sql);
292 query = bind_json_values(query, &prepared.bindings);
293
294 let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
295 warn!(error = %e, "SQL query failed");
296 CamelError::ProcessorError(format!("Query execution failed: {}", e))
297 })?;
298
299 let count = rows.len();
300 debug!(rows = count, "SQL query completed");
301 let json_rows: Vec<serde_json::Value> = rows
302 .iter()
303 .map(row_to_json)
304 .collect::<Result<Vec<_>, _>>()?;
305
306 if let Some(first_row) = json_rows.into_iter().next() {
307 exchange.input.body = Body::Json(first_row);
308 } else {
309 exchange.input.body = Body::Empty;
310 }
311 debug!("SelectOne returned {} row", if count > 0 { 1 } else { 0 });
312 exchange
313 .input
314 .set_header(headers::ROW_COUNT, serde_json::json!(count));
315 }
316 SqlOutputType::SelectList => {
317 let mut query = sqlx::query(&prepared.sql);
319 query = bind_json_values(query, &prepared.bindings);
320
321 let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
322 warn!(error = %e, "SQL query failed");
323 CamelError::ProcessorError(format!("Query execution failed: {}", e))
324 })?;
325
326 let count = rows.len();
327 debug!(rows = count, "SQL query completed");
328 let json_rows: Vec<serde_json::Value> = rows
329 .iter()
330 .map(row_to_json)
331 .collect::<Result<Vec<_>, _>>()?;
332
333 exchange.input.body = Body::Json(serde_json::Value::Array(json_rows));
334 debug!("SelectList returned {} rows", count);
335 exchange
336 .input
337 .set_header(headers::ROW_COUNT, serde_json::json!(count));
338 }
339 SqlOutputType::StreamList => {
340 use futures::TryStreamExt;
342
343 let pool_clone = pool.clone();
344 let sql_str = prepared.sql.clone();
345 let bindings = prepared.bindings.clone();
346
347 let byte_stream = async_stream::try_stream! {
349 let mut q = sqlx::query(&sql_str);
350 q = bind_json_values(q, &bindings);
351 let mut rows = q.fetch(&pool_clone);
352 while let Some(row) = rows.try_next().await.map_err(|e| {
353 CamelError::ProcessorError(format!("Query execution failed: {}", e))
354 })? {
355 let json_val = row_to_json(&row).map_err(|e| {
356 CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
357 })?;
358 let mut bytes = serde_json::to_vec(&json_val)
359 .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
360 bytes.push(b'\n');
361 yield Bytes::from(bytes);
362 }
363 };
364
365 exchange.input.body = Body::Stream(StreamBody {
366 stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
367 metadata: StreamMetadata {
368 content_type: Some("application/x-ndjson".to_string()),
369 size_hint: None,
370 origin: None,
371 },
372 });
373 debug!("StreamList: created lazy stream (rows fetched on demand)");
374 }
376 }
377
378 Ok(())
379}
380
381async fn execute_modify(
383 pool: &AnyPool,
384 prepared: &PreparedQuery,
385 config: &SqlEndpointConfig,
386 exchange: &mut Exchange,
387) -> Result<(), CamelError> {
388 let mut query = sqlx::query(&prepared.sql);
389 query = bind_json_values(query, &prepared.bindings);
390
391 let result = query.execute(pool).await.map_err(|e| {
392 warn!(error = %e, "SQL query failed");
393 CamelError::ProcessorError(format!("Query execution failed: {}", e))
394 })?;
395
396 let rows_affected = result.rows_affected();
397
398 if let Some(expected) = config.expected_update_count
400 && rows_affected as i64 != expected
401 {
402 warn!(expected, actual = rows_affected, "Row count mismatch");
403 return Err(CamelError::ProcessorError(format!(
404 "Expected {} rows affected, got {}",
405 expected, rows_affected
406 )));
407 }
408
409 exchange
410 .input
411 .set_header(headers::UPDATE_COUNT, serde_json::json!(rows_affected));
412
413 if config.noop {
414 } else {
416 exchange.input.body = Body::Json(json!({ "rowsAffected": rows_affected }));
417 }
418
419 debug!(rows = rows_affected, "SQL modify query completed");
420
421 Ok(())
422}
423
424async fn execute_batch(
426 pool: &AnyPool,
427 config: &SqlEndpointConfig,
428 exchange: &mut Exchange,
429) -> Result<(), CamelError> {
430 let body_json = match &exchange.input.body {
432 Body::Json(val) => val,
433 _ => {
434 return Err(CamelError::ProcessorError(
435 "Batch mode requires body to be a JSON array of arrays".to_string(),
436 ));
437 }
438 };
439
440 let batch_data = body_json
441 .as_array()
442 .ok_or_else(|| {
443 CamelError::ProcessorError("Batch mode requires body to be a JSON array".to_string())
444 })?
445 .clone();
446
447 let template = parse_query_template(&config.query, config.placeholder)?;
449
450 let mut tx = pool.begin().await.map_err(|e| {
452 warn!(error = %e, "Failed to begin transaction");
455 CamelError::ProcessorError(format!("Failed to begin transaction: {}", e))
456 })?;
457
458 let mut total_rows_affected: u64 = 0;
459
460 for (batch_idx, params_array) in batch_data.into_iter().enumerate() {
461 params_array.as_array().ok_or_else(|| {
463 CamelError::ProcessorError(format!(
464 "Batch item at index {} must be a JSON array of parameters",
465 batch_idx
466 ))
467 })?;
468
469 let temp_msg = Message::new(Body::Json(params_array.clone()));
471 let temp_exchange = Exchange::new(temp_msg);
472
473 let prepared = resolve_params(&template, &temp_exchange, &config.in_separator)?;
475
476 let mut query = sqlx::query(&prepared.sql);
478 query = bind_json_values(query, &prepared.bindings);
479
480 let result = query.execute(&mut *tx).await.map_err(|e| {
481 warn!("Batch query execution failed at index {}: {}", batch_idx, e);
484 CamelError::ProcessorError(format!("Batch query execution failed: {}", e))
485 })?;
486
487 if let Some(expected) = config.expected_update_count
489 && result.rows_affected() as i64 != expected
490 {
491 warn!(
494 "Batch item {}: expected {} rows affected, got {}",
495 batch_idx,
496 expected,
497 result.rows_affected()
498 );
499 return Err(CamelError::ProcessorError(format!(
500 "Batch item {}: expected {} rows affected, got {}",
501 batch_idx,
502 expected,
503 result.rows_affected()
504 )));
505 }
506
507 total_rows_affected += result.rows_affected();
508 }
509
510 tx.commit().await.map_err(|e| {
512 warn!(error = %e, "Failed to commit transaction");
515 CamelError::ProcessorError(format!("Failed to commit transaction: {}", e))
516 })?;
517
518 exchange.input.set_header(
519 headers::UPDATE_COUNT,
520 serde_json::json!(total_rows_affected),
521 );
522
523 debug!(
524 "Batch execution completed, total rows affected: {}",
525 total_rows_affected
526 );
527
528 Ok(())
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534 use camel_api::MetricsCollector;
535 use camel_component_api::HealthCheckRegistry;
536 use camel_component_api::test_support::PanicRuntimeObservability;
537 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
538 std::sync::Arc::new(PanicRuntimeObservability)
539 }
540 use camel_component_api::Message;
541 use camel_component_api::UriConfig;
542 use sqlx::any::AnyPoolOptions;
543 use std::sync::Arc;
544 use std::sync::Mutex;
545 use tokio::sync::OnceCell;
546
547 #[derive(Debug, Default)]
550 struct RecordingHealth {
551 forced: Mutex<Vec<(String, String, String)>>,
552 }
553
554 impl HealthCheckRegistry for RecordingHealth {
555 fn force_unhealthy_for_route(&self, route_id: &str, name: &str, reason: &str) {
556 self.forced.lock().unwrap().push((
557 route_id.to_string(),
558 name.to_string(),
559 reason.to_string(),
560 ));
561 }
562 }
563
564 struct NoopMetrics;
565
566 impl MetricsCollector for NoopMetrics {
567 fn record_exchange_duration(&self, _: &str, _: Duration) {}
568 fn increment_errors(&self, _: &str, _: &str) {}
569 fn increment_exchanges(&self, _: &str) {}
570 fn set_queue_depth(&self, _: &str, _: usize) {}
571 fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
572 }
573
574 struct RecordingRuntime {
575 health: Arc<RecordingHealth>,
576 }
577
578 impl RuntimeObservability for RecordingRuntime {
579 fn metrics(&self) -> Arc<dyn MetricsCollector> {
580 Arc::new(NoopMetrics)
581 }
582 fn health(&self) -> Arc<dyn HealthCheckRegistry> {
583 self.health.clone()
584 }
585 }
586
587 async fn sqlite_pool() -> AnyPool {
588 sqlx::any::install_default_drivers();
589 AnyPoolOptions::new()
590 .max_connections(1)
591 .connect("sqlite::memory:")
592 .await
593 .expect("sqlite pool")
594 }
595
596 async fn seed_items_table(pool: &AnyPool) {
597 sqlx::query(
598 "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, done INTEGER DEFAULT 0)",
599 )
600 .execute(pool)
601 .await
602 .expect("create table");
603 sqlx::query("INSERT INTO items (id, name, done) VALUES (1, 'a', 0), (2, 'b', 0)")
604 .execute(pool)
605 .await
606 .expect("seed rows");
607 }
608
609 fn config() -> SqlEndpointConfig {
610 let mut c =
611 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test").unwrap();
612 c.resolve_defaults();
613 c
614 }
615
616 #[test]
617 fn producer_clone_shares_pool() {
618 let p1 = SqlProducer::new(
619 config(),
620 Arc::new(OnceCell::new()),
621 test_rt(),
622 "sql-producer-test-route",
623 );
624 let p2 = p1.clone();
625 assert!(Arc::ptr_eq(&p1.pool, &p2.pool));
626 assert!(Arc::ptr_eq(&p1.stopped, &p2.stopped));
627 }
628
629 #[test]
630 fn resolve_query_from_config() {
631 let config = config();
632 let ex = Exchange::new(Message::default());
633 let q = SqlProducer::resolve_query_source(&ex, &config);
634 assert_eq!(q, "select 1");
635 }
636
637 #[test]
638 fn resolve_query_from_header() {
639 let config = config();
640 let mut msg = Message::default();
641 msg.set_header(headers::QUERY, serde_json::json!("select 2"));
642 let ex = Exchange::new(msg);
643 let q = SqlProducer::resolve_query_source(&ex, &config);
644 assert_eq!(q, "select 2");
645 }
646
647 #[test]
648 fn resolve_query_from_body() {
649 let mut config = config();
650 config.use_message_body_for_sql = true;
651 let msg = Message::new(Body::Text("select 3".to_string()));
652 let ex = Exchange::new(msg);
653 let q = SqlProducer::resolve_query_source(&ex, &config);
654 assert_eq!(q, "select 3");
655 }
656
657 #[test]
658 fn resolve_query_header_priority_over_body() {
659 let mut config = config();
660 config.use_message_body_for_sql = true;
661 let mut msg = Message::new(Body::Text("select from body".to_string()));
662 msg.set_header(headers::QUERY, serde_json::json!("select from header"));
663 let ex = Exchange::new(msg);
664 let q = SqlProducer::resolve_query_source(&ex, &config);
665 assert_eq!(q, "select from header");
666 }
667
668 #[test]
669 fn resolve_query_body_priority_over_config() {
670 let mut config = config();
671 config.use_message_body_for_sql = true;
672 let msg = Message::new(Body::Text("select from body".to_string()));
673 let ex = Exchange::new(msg);
674 let q = SqlProducer::resolve_query_source(&ex, &config);
675 assert_eq!(q, "select from body");
676 }
677
678 #[test]
679 fn bind_json_null() {
680 let query = sqlx::query("SELECT ?");
681 let values = vec![serde_json::Value::Null];
682 let _bound = bind_json_values(query, &values);
683 }
685
686 #[test]
687 fn bind_json_bool() {
688 let query = sqlx::query("SELECT ?");
689 let values = vec![serde_json::Value::Bool(true)];
690 let _bound = bind_json_values(query, &values);
691 }
692
693 #[test]
694 fn bind_json_number_i64() {
695 let query = sqlx::query("SELECT ?");
696 let values = vec![serde_json::json!(42)];
697 let _bound = bind_json_values(query, &values);
698 }
699
700 #[test]
701 fn bind_json_number_f64() {
702 let query = sqlx::query("SELECT ?");
703 let values = vec![serde_json::json!(std::f64::consts::PI)];
704 let _bound = bind_json_values(query, &values);
705 }
706
707 #[test]
708 fn bind_json_string() {
709 let query = sqlx::query("SELECT ?");
710 let values = vec![serde_json::json!("hello world")];
711 let _bound = bind_json_values(query, &values);
712 }
713
714 #[test]
715 fn bind_json_array() {
716 let query = sqlx::query("SELECT ?");
717 let values = vec![serde_json::json!([1, 2, 3])];
718 let _bound = bind_json_values(query, &values);
719 }
720
721 #[test]
722 fn bind_json_object() {
723 let query = sqlx::query("SELECT ?");
724 let values = vec![serde_json::json!({"key": "value"})];
725 let _bound = bind_json_values(query, &values);
726 }
727
728 #[test]
729 fn bind_multiple_values() {
730 let query = sqlx::query("SELECT ?, ?, ?");
731 let values = vec![
732 serde_json::json!(1),
733 serde_json::json!("test"),
734 serde_json::Value::Null,
735 ];
736 let _bound = bind_json_values(query, &values);
737 }
738
739 #[test]
741 fn expected_update_count_validation() {
742 let config = SqlEndpointConfig::from_uri(
744 "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=5",
745 )
746 .unwrap();
747 assert_eq!(config.expected_update_count, Some(5));
748
749 let config_default = self::config();
751 assert_eq!(config_default.expected_update_count, None);
752
753 let config_neg = SqlEndpointConfig::from_uri(
755 "sql:update t set x=1?db_url=postgres://localhost/test&expectedUpdateCount=-1",
756 )
757 .unwrap();
758 assert_eq!(config_neg.expected_update_count, Some(-1));
759 }
760
761 #[test]
763 fn parameters_header_override_logic() {
764 let mut prepared = PreparedQuery {
766 sql: "SELECT * FROM t WHERE id = $1".to_string(),
767 bindings: vec![serde_json::json!(42)],
768 };
769
770 let header_params = serde_json::json!([99, "extra"]);
772 if let Some(arr) = header_params.as_array() {
773 prepared.bindings = arr.clone();
774 }
775
776 assert_eq!(prepared.bindings.len(), 2);
778 assert_eq!(prepared.bindings[0], serde_json::json!(99));
779 assert_eq!(prepared.bindings[1], serde_json::json!("extra"));
780
781 let mut prepared2 = PreparedQuery {
783 sql: "SELECT * FROM t WHERE id = $1".to_string(),
784 bindings: vec![serde_json::json!(42)],
785 };
786 let header_non_array = serde_json::json!({"not": "an array"});
787 if let Some(arr) = header_non_array.as_array() {
788 prepared2.bindings = arr.clone();
789 }
790 assert_eq!(prepared2.bindings.len(), 1);
792 assert_eq!(prepared2.bindings[0], serde_json::json!(42));
793 }
794
795 #[tokio::test]
796 async fn execute_select_one_sets_body_and_row_count() {
797 let pool = sqlite_pool().await;
798 seed_items_table(&pool).await;
799
800 let mut config = SqlEndpointConfig::from_uri(
801 "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=SelectOne",
802 )
803 .unwrap();
804 config.resolve_defaults();
805
806 let prepared = PreparedQuery {
807 sql: "select id, name from items order by id".to_string(),
808 bindings: vec![],
809 };
810 let mut exchange = Exchange::new(Message::default());
811
812 execute_select(&pool, &prepared, &config, &mut exchange)
813 .await
814 .expect("select one");
815
816 assert_eq!(exchange.input.header(headers::ROW_COUNT), Some(&json!(2)));
817 assert_eq!(
818 exchange.input.body,
819 Body::Json(json!({"id": 1, "name": "a"}))
820 );
821 }
822
823 #[tokio::test]
824 async fn execute_stream_list_materializes_ndjson() {
825 let pool = sqlite_pool().await;
826 seed_items_table(&pool).await;
827
828 let mut config = SqlEndpointConfig::from_uri(
829 "sql:select id from items order by id?db_url=sqlite::memory:&outputType=StreamList",
830 )
831 .unwrap();
832 config.resolve_defaults();
833
834 let prepared = PreparedQuery {
835 sql: "select id from items order by id".to_string(),
836 bindings: vec![],
837 };
838 let mut exchange = Exchange::new(Message::default());
839
840 execute_select(&pool, &prepared, &config, &mut exchange)
841 .await
842 .expect("stream list");
843
844 let bytes = exchange
845 .input
846 .body
847 .clone()
848 .into_bytes(1024)
849 .await
850 .expect("stream bytes");
851 let text = String::from_utf8(bytes.to_vec()).expect("utf8");
852 assert!(text.contains("{\"id\":1}"));
853 assert!(text.contains("{\"id\":2}"));
854 assert_eq!(exchange.input.header(headers::ROW_COUNT), None);
855 }
856
857 #[tokio::test]
858 async fn execute_modify_expected_update_count_mismatch_returns_error() {
859 let pool = sqlite_pool().await;
860 seed_items_table(&pool).await;
861
862 let mut config = SqlEndpointConfig::from_uri(
863 "sql:update items set done=1 where id = #?db_url=sqlite::memory:&expectedUpdateCount=2",
864 )
865 .unwrap();
866 config.resolve_defaults();
867
868 let prepared = PreparedQuery {
869 sql: "update items set done=1 where id = $1".to_string(),
870 bindings: vec![json!(1)],
871 };
872 let mut exchange = Exchange::new(Message::default());
873
874 let err = execute_modify(&pool, &prepared, &config, &mut exchange)
875 .await
876 .expect_err("must fail due expected row count mismatch");
877 assert!(err.to_string().contains("Expected 2 rows affected, got 1"));
878 }
879
880 #[tokio::test]
881 async fn execute_batch_rollback_when_any_item_fails_expected_count() {
882 let pool = sqlite_pool().await;
883 seed_items_table(&pool).await;
884
885 let mut config = SqlEndpointConfig::from_uri(
886 "sql:update items set done=1 where id = #?db_url=sqlite::memory:&batch=true&expectedUpdateCount=1",
887 )
888 .unwrap();
889 config.resolve_defaults();
890
891 let mut exchange = Exchange::new(Message::new(Body::Json(json!([[1], [999]]))));
892
893 let err = execute_batch(&pool, &config, &mut exchange)
894 .await
895 .expect_err("second batch item should fail expectedUpdateCount");
896 assert!(
897 err.to_string()
898 .contains("Batch item 1: expected 1 rows affected, got 0")
899 );
900
901 let row = sqlx::query("select done from items where id = 1")
902 .fetch_one(&pool)
903 .await
904 .expect("query row");
905 let done: i64 = sqlx::Row::try_get(&row, 0).expect("done column");
906 assert_eq!(done, 0, "transaction must rollback first update");
907 }
908
909 #[tokio::test]
915 async fn producer_no_panic_without_prior_resolve_defaults() {
916 let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
918 assert!(config.max_connections.is_none());
919
920 let mut producer = SqlProducer::new(
921 config,
922 Arc::new(OnceCell::new()),
923 test_rt(),
924 "sql-producer-test-route",
925 );
926 let exchange = Exchange::new(Message::default());
927
928 let result = producer.call(exchange).await;
930 assert!(
931 result.is_ok(),
932 "Producer should initialize pool without panic, got: {:?}",
933 result
934 );
935 }
936
937 #[tokio::test]
939 async fn producer_pool_init_returns_config_error_for_invalid_db() {
940 let mut config = SqlEndpointConfig::from_uri(
944 "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false",
945 )
946 .unwrap();
947 config.max_connections = Some(1);
949 config.min_connections = Some(0);
950 config.idle_timeout_secs = Some(300);
951 config.max_lifetime_secs = Some(1800);
952
953 let health = Arc::new(RecordingHealth::default());
954 let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntime {
955 health: health.clone(),
956 });
957 let mut producer = SqlProducer::new(
958 config,
959 Arc::new(OnceCell::new()),
960 rt,
961 "sql-producer-test-route",
962 );
963 let exchange = Exchange::new(Message::default());
964
965 let result = producer.call(exchange).await;
966 assert!(result.is_err());
967 let err_msg = result.unwrap_err().to_string();
969 assert!(
970 err_msg.contains("Failed to connect") || err_msg.contains("database"),
971 "Expected connection error, got: {}",
972 err_msg
973 );
974 let forced = health.forced.lock().unwrap();
976 assert_eq!(
977 forced.len(),
978 1,
979 "expected one force_unhealthy_for_route call"
980 );
981 assert_eq!(forced[0].0, "sql-producer-test-route");
982 assert_eq!(forced[0].1, "g:sql:producer-pool-init");
983 }
984
985 #[test]
987 fn poll_ready_returns_ready_for_uninitialized_pool() {
988 let config = {
989 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
990 c.resolve_defaults();
991 c
992 };
993 let mut producer = SqlProducer::new(
994 config,
995 Arc::new(OnceCell::new()),
996 test_rt(),
997 "sql-producer-test-route",
998 );
999 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1000 let result = producer.poll_ready(&mut cx);
1001 assert!(matches!(result, Poll::Ready(Ok(()))));
1002 }
1003
1004 #[test]
1006 fn poll_ready_returns_error_when_stopped() {
1007 let config = {
1008 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1009 c.resolve_defaults();
1010 c
1011 };
1012 let mut producer = SqlProducer::new(
1013 config,
1014 Arc::new(OnceCell::new()),
1015 test_rt(),
1016 "sql-producer-test-route",
1017 );
1018 producer.stop();
1019 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1020 let result = producer.poll_ready(&mut cx);
1021 assert!(matches!(result, Poll::Ready(Err(_))));
1022 let err_msg = match result {
1023 Poll::Ready(Err(e)) => e.to_string(),
1024 _ => unreachable!(),
1025 };
1026 assert!(err_msg.contains("SQL producer stopped"));
1027 }
1028
1029 #[tokio::test]
1031 async fn poll_ready_returns_error_when_pool_closed() {
1032 let pool = sqlite_pool().await;
1033 pool.close().await;
1034
1035 let config = {
1036 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1037 c.resolve_defaults();
1038 c
1039 };
1040 let pool_cell = Arc::new(OnceCell::new());
1041 pool_cell.set(pool).unwrap();
1042
1043 let mut producer =
1044 SqlProducer::new(config, pool_cell, test_rt(), "sql-producer-test-route");
1045 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1046 let result = producer.poll_ready(&mut cx);
1047 assert!(matches!(result, Poll::Ready(Err(_))));
1048 let err_msg = match result {
1049 Poll::Ready(Err(e)) => e.to_string(),
1050 _ => unreachable!(),
1051 };
1052 assert!(err_msg.contains("SQL connection pool is closed"));
1053 }
1054
1055 #[tokio::test]
1057 async fn poll_ready_returns_ok_for_healthy_pool() {
1058 let pool = sqlite_pool().await;
1059
1060 let config = {
1061 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1062 c.resolve_defaults();
1063 c
1064 };
1065 let pool_cell = Arc::new(OnceCell::new());
1066 pool_cell.set(pool).unwrap();
1067
1068 let mut producer =
1069 SqlProducer::new(config, pool_cell, test_rt(), "sql-producer-test-route");
1070 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1071 let result = producer.poll_ready(&mut cx);
1072 assert!(matches!(result, Poll::Ready(Ok(()))));
1073 }
1074
1075 #[tokio::test]
1077 async fn test_sql_stop_closes_pool() {
1078 let pool = sqlite_pool().await;
1079
1080 let config = {
1081 let mut c = SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1082 c.resolve_defaults();
1083 c
1084 };
1085 let pool_cell = Arc::new(OnceCell::new());
1086 pool_cell.set(pool.clone()).unwrap();
1087
1088 let producer = SqlProducer::new(
1089 config,
1090 pool_cell.clone(),
1091 test_rt(),
1092 "sql-producer-test-route",
1093 );
1094 assert!(!pool.is_closed(), "Pool should be open before stop");
1095
1096 producer.stop();
1097
1098 tokio::time::sleep(Duration::from_millis(100)).await;
1100
1101 assert!(
1102 pool.is_closed(),
1103 "Pool should be closed after producer.stop()"
1104 );
1105
1106 let mut producer2 = SqlProducer::new(
1108 {
1109 let mut c =
1110 SqlEndpointConfig::from_uri("sql:select 1?db_url=sqlite::memory:").unwrap();
1111 c.resolve_defaults();
1112 c
1113 },
1114 pool_cell.clone(),
1115 test_rt(),
1116 "sql-producer-test-route",
1117 );
1118 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1119 let result = producer2.poll_ready(&mut cx);
1120 assert!(
1121 matches!(result, Poll::Ready(Err(_))),
1122 "poll_ready should fail after pool closed"
1123 );
1124 }
1125
1126 #[tokio::test]
1128 async fn use_placeholder_false_executes_raw_sql() {
1129 let pool = sqlite_pool().await;
1130 seed_items_table(&pool).await;
1131
1132 let mut config = SqlEndpointConfig::from_uri(
1133 "sql:select id, name from items order by id?db_url=sqlite::memory:&usePlaceholder=false",
1134 )
1135 .unwrap();
1136 config.resolve_defaults();
1137 assert!(!config.use_placeholder);
1138
1139 let mut producer = SqlProducer::new(
1140 config,
1141 Arc::new(OnceCell::new()),
1142 test_rt(),
1143 "sql-producer-test-route",
1144 );
1145 producer.pool.set(pool.clone()).unwrap();
1147
1148 let exchange = Exchange::new(Message::default());
1149 let result = producer.call(exchange).await;
1150 assert!(result.is_ok());
1151 let exchange = result.unwrap();
1152 assert!(matches!(exchange.input.body, Body::Json(_)));
1154 }
1155
1156 #[tokio::test]
1158 async fn use_placeholder_true_processes_placeholders() {
1159 let pool = sqlite_pool().await;
1160 seed_items_table(&pool).await;
1161
1162 let mut config = SqlEndpointConfig::from_uri(
1163 "sql:select id, name from items where id = #?db_url=sqlite::memory:",
1164 )
1165 .unwrap();
1166 config.resolve_defaults();
1167 assert!(config.use_placeholder);
1168
1169 let mut producer = SqlProducer::new(
1170 config,
1171 Arc::new(OnceCell::new()),
1172 test_rt(),
1173 "sql-producer-test-route",
1174 );
1175 producer.pool.set(pool.clone()).unwrap();
1176
1177 let msg = Message::new(Body::Json(json!([1])));
1178 let exchange = Exchange::new(msg);
1179 let result = producer.call(exchange).await;
1180 assert!(result.is_ok());
1181 }
1182
1183 #[tokio::test]
1187 async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
1188 use camel_component_api::NetworkRetryPolicy;
1189 use std::sync::Arc;
1190 use std::sync::atomic::{AtomicU32, Ordering};
1191
1192 let policy = NetworkRetryPolicy {
1193 max_attempts: 3,
1194 initial_delay: Duration::from_millis(1),
1195 max_delay: Duration::from_millis(1),
1196 multiplier: 1.0,
1197 ..NetworkRetryPolicy::default()
1198 };
1199
1200 let calls = Arc::new(AtomicU32::new(0));
1201 let calls_clone = Arc::clone(&calls);
1202
1203 let mut attempt: u32 = 0;
1204 let _result: Result<(), ()> = loop {
1205 attempt += 1;
1206 calls_clone.fetch_add(1, Ordering::SeqCst);
1207 let op_result: Result<(), ()> = Err(());
1208 match op_result {
1209 Ok(v) => break Ok(v),
1210 Err(_) if policy.should_retry(attempt) => {
1211 let delay = policy.delay_for(attempt - 1);
1212 tokio::time::sleep(delay).await;
1213 continue;
1214 }
1215 Err(_) => break Err(()),
1216 }
1217 };
1218
1219 assert_eq!(
1220 calls.load(Ordering::SeqCst),
1221 3,
1222 "max_attempts=3 must yield exactly 3 invocations"
1223 );
1224 }
1225
1226 #[tokio::test]
1231 async fn producer_pool_init_failure_calls_force_unhealthy_for_route() {
1232 let health = Arc::new(RecordingHealth::default());
1233 let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntime {
1234 health: health.clone(),
1235 });
1236
1237 let mut config = SqlEndpointConfig::from_uri(
1238 "sql:select 1?db_url=postgres://nonexistent-host:5432/nonexistent_db&retryEnabled=false",
1239 )
1240 .unwrap();
1241 config.max_connections = Some(1);
1242 config.min_connections = Some(0);
1243 config.idle_timeout_secs = Some(300);
1244 config.max_lifetime_secs = Some(1800);
1245
1246 let mut producer = SqlProducer::new(
1247 config,
1248 Arc::new(OnceCell::new()),
1249 rt,
1250 "sql-producer-test-route",
1251 );
1252 let exchange = Exchange::new(Message::default());
1253
1254 let result = producer.call(exchange).await;
1255 assert!(result.is_err());
1256
1257 let forced = health.forced.lock().unwrap();
1258 assert_eq!(
1259 forced.len(),
1260 1,
1261 "expected one force_unhealthy_for_route call"
1262 );
1263 assert_eq!(forced[0].0, "sql-producer-test-route");
1264 assert_eq!(forced[0].1, "g:sql:producer-pool-init");
1265 assert!(!forced[0].2.is_empty(), "reason should be non-empty");
1266 }
1267}