1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures::TryStreamExt;
7use serde_json::Value as JsonValue;
8use sqlx::AnyPool;
9use sqlx::any::AnyPoolOptions;
10use sqlx::any::AnyRow;
11use tokio::sync::OnceCell;
12use tracing::{debug, error, info, warn};
13
14use camel_component_api::retry_async;
15use camel_component_api::{
16 Body, CamelError, Exchange, Message, RuntimeObservability, StreamBody, StreamMetadata,
17};
18use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
19
20use crate::config::{
21 PollStrategy, ProcessingStrategy, SqlEndpointConfig, SqlOutputType, TransactionMode,
22 enrich_db_url_with_ssl, redact_db_url,
23};
24use crate::headers;
25use crate::query::{QueryTemplate, parse_query_template, resolve_params};
26use crate::utils::{bind_json_values, is_retryable_sqlx_error, row_to_json};
27
28pub struct SqlConsumer {
29 pub(crate) config: SqlEndpointConfig,
30 pub(crate) pool: Arc<OnceCell<AnyPool>>,
31 stopped: bool,
32 #[allow(dead_code)]
35 runtime: Arc<dyn RuntimeObservability>,
36}
37
38impl SqlConsumer {
39 pub fn new(
40 config: SqlEndpointConfig,
41 pool: Arc<OnceCell<AnyPool>>,
42 runtime: Arc<dyn RuntimeObservability>,
43 ) -> Self {
44 Self {
45 config,
46 pool,
47 stopped: false,
48 runtime,
49 }
50 }
51
52 async fn poll_database(
54 &self,
55 pool: &AnyPool,
56 context: &ConsumerContext,
57 template: &QueryTemplate,
58 ) -> Result<(), CamelError> {
59 let empty_exchange = Exchange::new(Message::default());
61
62 let prepared = resolve_params(template, &empty_exchange, &self.config.in_separator)?;
64
65 debug!(query = %prepared.sql, "executing SQL consumer poll");
66
67 if self.config.output_type == SqlOutputType::StreamList {
68 return self.poll_database_stream(pool, context, &prepared).await;
69 }
70
71 let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
72 let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
73 warn!(error = %e, "SQL consumer poll query failed");
74 CamelError::ProcessorError(format!("Query execution failed: {}", e))
75 })?;
76
77 debug!(rows = rows.len(), "SQL consumer poll completed");
78
79 if rows.is_empty() && !self.config.route_empty_result_set {
80 return Ok(());
81 }
82
83 let rows_to_process: Vec<AnyRow> = if let Some(max) = self.config.max_messages_per_poll {
84 if max > 0 {
85 rows.into_iter().take(max as usize).collect()
86 } else {
87 rows
88 }
89 } else {
90 rows
91 };
92
93 if self.config.use_iterator {
94 for row in rows_to_process {
96 let row_json = row_to_json(&row)?;
97
98 let mut msg = Message::new(Body::Json(row_json.clone()));
100
101 if let Some(obj) = row_json.as_object() {
103 for (key, value) in obj {
104 msg.set_header(format!("CamelSql.{}", key), value.clone());
105 }
106 }
107
108 let exchange = Exchange::new(msg);
109
110 let result = context.send_and_wait(exchange).await;
112
113 if let Err(e) = self.handle_post_processing(pool, &result, &row_json).await {
115 error!(error = %e, "Post-processing failed"); if self.config.break_batch_on_consume_fail {
118 return Err(e);
119 }
120 }
121
122 if let Err(ref consume_err) = result
124 && self.config.break_batch_on_consume_fail
125 {
126 return Err(consume_err.clone());
127 }
128 }
129 } else {
130 let rows_json: Vec<JsonValue> = rows_to_process
132 .iter()
133 .map(row_to_json)
134 .collect::<Result<Vec<_>, CamelError>>()?;
135
136 let row_count = rows_json.len();
137
138 let mut msg = Message::new(Body::Json(JsonValue::Array(rows_json.clone())));
140 msg.set_header(headers::ROW_COUNT, JsonValue::Number(row_count.into()));
141
142 let exchange = Exchange::new(msg);
143
144 let result = context.send_and_wait(exchange).await;
146
147 for row_json in rows_json.iter() {
151 if let Err(e) = self.handle_post_processing(pool, &result, row_json).await {
152 error!(error = %e, "Post-processing failed for batch row"); if self.config.break_batch_on_consume_fail {
155 return Err(e);
156 }
157 }
158 }
159
160 if let Err(ref consume_err) = result
162 && self.config.break_batch_on_consume_fail
163 {
164 return Err(consume_err.clone());
165 }
166 }
167
168 if let Some(ref batch_query) = self.config.on_consume_batch_complete {
170 let _ = self
171 .execute_post_query(pool, batch_query, &JsonValue::Null)
172 .await;
173 }
174
175 Ok(())
176 }
177
178 async fn poll_database_stream(
179 &self,
180 pool: &AnyPool,
181 context: &ConsumerContext,
182 prepared: &crate::query::PreparedQuery,
183 ) -> Result<(), CamelError> {
184 let pool_clone = pool.clone();
185 let sql_str = prepared.sql.clone();
186 let bindings = prepared.bindings.clone();
187
188 let byte_stream = async_stream::try_stream! {
189 let mut q = sqlx::query(&sql_str);
190 q = bind_json_values(q, &bindings);
191 let mut rows = q.fetch(&pool_clone);
192 while let Some(row) = rows.try_next().await.map_err(|e| {
193 CamelError::ProcessorError(format!("Query execution failed: {}", e))
194 })? {
195 let json_val = row_to_json(&row).map_err(|e| {
196 CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
197 })?;
198 let mut bytes = serde_json::to_vec(&json_val)
199 .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
200 bytes.push(b'\n');
201 yield Bytes::from(bytes);
202 }
203 };
204
205 let msg = Message::new(Body::Stream(StreamBody {
206 stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
207 metadata: StreamMetadata {
208 content_type: Some("application/x-ndjson".to_string()),
209 size_hint: None,
210 origin: None,
211 },
212 }));
213
214 let exchange = Exchange::new(msg);
215 let result = context.send_and_wait(exchange).await;
216 if let Err(e) = result {
217 error!(error = %e, "StreamList consumer downstream processing failed"); return Err(e);
226 }
227
228 debug!("StreamList: consumer poll completed (lazy stream emitted)");
229 Ok(())
230 }
231
232 async fn handle_post_processing(
234 &self,
235 pool: &AnyPool,
236 result: &Result<Exchange, CamelError>,
237 row_json: &JsonValue,
238 ) -> Result<(), CamelError> {
239 match result {
240 Ok(_) => {
241 if let Some(ref on_consume) = self.config.on_consume {
243 self.execute_post_query(pool, on_consume, row_json).await?;
244 }
245 }
246 Err(_) => {
247 if let Some(ref on_consume_failed) = self.config.on_consume_failed {
249 self.execute_post_query(pool, on_consume_failed, row_json)
250 .await?;
251 }
252 }
253 }
254 Ok(())
255 }
256
257 async fn execute_post_query(
259 &self,
260 pool: &AnyPool,
261 query_str: &str,
262 row_json: &JsonValue,
263 ) -> Result<(), CamelError> {
264 let template = parse_query_template(query_str, self.config.placeholder)?;
266
267 let mut temp_msg = Message::new(Body::Json(row_json.clone()));
270 if let Some(obj) = row_json.as_object() {
271 for (key, value) in obj {
272 temp_msg.set_header(format!("CamelSql.{}", key), value.clone());
273 }
274 }
275 let temp_exchange = Exchange::new(temp_msg);
276
277 let prepared = resolve_params(&template, &temp_exchange, &self.config.in_separator)?;
279
280 let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
282 let result = query.execute(pool).await.map_err(|e| {
283 CamelError::ProcessorError(format!("Post-query execution failed: {}", e))
284 })?;
285
286 if result.rows_affected() == 0 {
288 warn!(
289 query = query_str,
290 "Post-processing query affected 0 rows — the row may not have been marked correctly"
291 );
292 }
293
294 Ok(())
295 }
296
297 async fn handle_poll_result(
300 &self,
301 pool: &AnyPool,
302 context: &ConsumerContext,
303 template: &QueryTemplate,
304 ) {
305 if let Err(e) = self.poll_database(pool, context, template).await {
306 if self.config.bridge_error_handler {
307 warn!(error = %e, "SQL consumer poll failed (bridged)");
311 if let Err(route_err) = self.bridge_poll_error(context, e).await {
312 error!(error = %route_err, "Failed to bridge SQL consumer error to route");
315 }
316 } else {
317 error!(error = %e, "SQL consumer poll failed"); }
321 }
322 }
323
324 async fn bridge_poll_error(
325 &self,
326 context: &ConsumerContext,
327 error: CamelError,
328 ) -> Result<(), CamelError> {
329 if !self.config.bridge_error_handler {
330 return Ok(());
331 }
332 let mut exchange = Exchange::new(Message::default());
333 exchange.set_error(error);
334 context.send_and_wait(exchange).await.map(|_| ())
335 }
336}
337
338#[async_trait]
339impl Consumer for SqlConsumer {
340 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
341 if self.stopped {
343 return Err(CamelError::Config(
344 "SQL consumer cannot be restarted after stop".into(),
345 ));
346 }
347
348 let pool = self
350 .pool
351 .get_or_try_init(|| async {
352 self.config.resolve_defaults();
354 self.config.resolve_file_query().await?;
356
357 sqlx::any::install_default_drivers();
360 let db_url = enrich_db_url_with_ssl(&self.config.db_url, &self.config)?;
361
362 let max_conn = self.config.max_connections.ok_or_else(|| {
363 CamelError::Config("max_connections not resolved for SQL consumer pool".into())
364 })?;
365 let min_conn = self.config.min_connections.ok_or_else(|| {
366 CamelError::Config("min_connections not resolved for SQL consumer pool".into())
367 })?;
368 let idle_timeout = self.config.idle_timeout_secs.ok_or_else(|| {
369 CamelError::Config(
370 "idle_timeout_secs not resolved for SQL consumer pool".into(),
371 )
372 })?;
373 let max_lifetime = self.config.max_lifetime_secs.ok_or_else(|| {
374 CamelError::Config(
375 "max_lifetime_secs not resolved for SQL consumer pool".into(),
376 )
377 })?;
378
379 info!(
380 db_url = %redact_db_url(&self.config.db_url),
381 "SQL consumer pool initializing"
382 );
383 let retry_policy = &self.config.retry;
384 retry_async::<_, _, _, _, sqlx::Error>(
385 retry_policy,
386 Some("sql-consumer"),
387 || {
388 AnyPoolOptions::new()
389 .max_connections(max_conn)
390 .min_connections(min_conn)
391 .idle_timeout(Duration::from_secs(idle_timeout))
392 .max_lifetime(Duration::from_secs(max_lifetime))
393 .connect(&db_url)
394 },
395 is_retryable_sqlx_error,
396 )
397 .await
398 .map_err(|e| {
399 error!(error = %e, db_url = %redact_db_url(&self.config.db_url), "SQL connect failed, giving up"); CamelError::EndpointCreationFailed(format!(
402 "Failed to connect to database: {}",
403 e
404 ))
405 })
406 })
407 .await?;
408
409 if self.config.transaction_mode == TransactionMode::Managed {
411 warn!("transactionManager not yet implemented; using Auto mode");
412 }
413
414 if self.config.processing_strategy == ProcessingStrategy::Scheduled {
416 debug!(
417 "Processing strategy: Scheduled (rows dispatched individually via send_and_wait)"
418 );
419 }
420 if self.config.poll_strategy == PollStrategy::Burst {
421 debug!("Poll strategy: Burst (rapid successive polls)");
422 }
423
424 if self.config.output_type == SqlOutputType::StreamList
425 && (self.config.on_consume.is_some()
426 || self.config.on_consume_failed.is_some()
427 || self.config.on_consume_batch_complete.is_some())
428 {
429 warn!(
430 "onConsume/onConsumeFailed/onConsumeBatchComplete are not executed in StreamList mode \
431 (rows are consumed lazily downstream)"
432 );
433 }
434
435 if self.config.on_consume.is_none() {
437 warn!(
438 "SQL consumer started without onConsume configured — consumed rows will not be marked/deleted"
439 );
440 }
441
442 info!(
443 db_url = %redact_db_url(&self.config.db_url),
444 query_len = self.config.query.len(),
445 "SQL consumer started"
446 );
447
448 let template = parse_query_template(&self.config.query, self.config.placeholder)
450 .map_err(|e| CamelError::Config(format!("Invalid query template: {}", e)))?;
451
452 if self.config.initial_delay_ms > 0 {
454 tokio::select! {
455 _ = context.cancelled() => {
456 info!("SQL consumer stopped during initial delay");
457 return Ok(());
458 }
459 _ = tokio::time::sleep(Duration::from_millis(self.config.initial_delay_ms)) => {}
460 }
461 }
462
463 let mut poll_count: u32 = 0;
479 loop {
480 if let Some(max_repeats) = self.config.repeat_count
482 && poll_count >= max_repeats
483 {
484 info!(
485 repeat_count = max_repeats,
486 "SQL consumer reached repeat_count limit, stopping"
487 );
488 break;
489 }
490
491 tokio::select! {
492 _ = context.cancelled() => {
493 info!("SQL consumer stopped");
494 break;
495 }
496 _ = tokio::time::sleep(Duration::from_millis(self.config.delay_ms)) => {
497 poll_count += 1;
498 self.handle_poll_result(pool, &context, &template).await;
499 }
500 }
501 }
502
503 Ok(())
504 }
505
506 async fn stop(&mut self) -> Result<(), CamelError> {
507 if self.stopped {
509 debug!("SQL consumer stop called on already-stopped consumer");
510 return Ok(());
511 }
512
513 if let Some(pool) = self.pool.get() {
515 debug!("SQL consumer closing connection pool");
516 pool.close().await;
517 debug!("SQL consumer pool closed");
518 }
519
520 self.stopped = true;
521 info!("SQL consumer stopped");
522 Ok(())
523 }
524
525 fn concurrency_model(&self) -> ConcurrencyModel {
526 ConcurrencyModel::Sequential
530 }
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536 use camel_component_api::test_support::PanicRuntimeObservability;
537 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
538 std::sync::Arc::new(PanicRuntimeObservability)
539 }
540 use crate::config::SqlEndpointConfig;
541 use camel_component_api::ExchangeEnvelope;
542 use camel_component_api::UriConfig;
543 use sqlx::any::AnyPoolOptions;
544 use std::sync::Arc;
545 use std::time::Duration;
546 use tokio::sync::mpsc;
547 use tokio_util::sync::CancellationToken;
548
549 async fn sqlite_pool() -> AnyPool {
550 sqlx::any::install_default_drivers();
551 AnyPoolOptions::new()
552 .max_connections(1)
553 .connect("sqlite::memory:")
554 .await
555 .expect("sqlite pool")
556 }
557
558 async fn seed_consumer_table(pool: &AnyPool) {
559 sqlx::query("CREATE TABLE jobs (id INTEGER PRIMARY KEY, processed INTEGER DEFAULT 0, failed INTEGER DEFAULT 0)")
560 .execute(pool)
561 .await
562 .expect("create table");
563 sqlx::query("INSERT INTO jobs (id, processed, failed) VALUES (1, 0, 0), (2, 0, 0)")
564 .execute(pool)
565 .await
566 .expect("seed rows");
567 }
568
569 fn config() -> SqlEndpointConfig {
570 let mut c =
571 SqlEndpointConfig::from_uri("sql:select * from t?db_url=postgres://localhost/test")
572 .unwrap();
573 c.resolve_defaults();
574 c
575 }
576
577 #[test]
578 fn consumer_concurrency_model() {
579 let c = SqlConsumer::new(config(), Arc::new(OnceCell::new()), test_rt());
580 assert_eq!(c.concurrency_model(), ConcurrencyModel::Sequential);
581 }
582
583 #[test]
584 fn consumer_stores_config() {
585 let mut config = SqlEndpointConfig::from_uri(
586 "sql:select * from t?db_url=postgres://localhost/test&delay=2000&onConsume=update t set done=true"
587 ).unwrap();
588 config.resolve_defaults();
589 let c = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
590 assert_eq!(c.config.delay_ms, 2000);
591 assert!(c.config.on_consume.is_some());
592 }
593
594 #[tokio::test]
595 async fn poll_database_runs_on_consume_for_successful_rows() {
596 let pool = sqlite_pool().await;
597 seed_consumer_table(&pool).await;
598
599 let mut config = SqlEndpointConfig::from_uri(
600 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
601 )
602 .unwrap();
603 config.resolve_defaults();
604
605 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
606 let template = parse_query_template(&config.query, config.placeholder).unwrap();
607
608 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
609 tokio::spawn(async move {
610 while let Some(env) = rx.recv().await {
611 if let Some(reply_tx) = env.reply_tx {
612 let _ = reply_tx.send(Ok(env.exchange));
613 }
614 }
615 });
616 let ctx = ConsumerContext::new(tx, CancellationToken::new());
617
618 consumer
619 .poll_database(&pool, &ctx, &template)
620 .await
621 .expect("poll must succeed");
622
623 let row = sqlx::query("select processed from jobs where id = 1")
624 .fetch_one(&pool)
625 .await
626 .expect("row 1");
627 let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
628
629 let row = sqlx::query("select processed from jobs where id = 2")
630 .fetch_one(&pool)
631 .await
632 .expect("row 2");
633 let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
634
635 assert_eq!(processed_1, 1);
636 assert_eq!(processed_2, 1);
637 }
638
639 #[tokio::test]
640 async fn poll_database_runs_on_consume_failed_when_downstream_fails() {
641 let pool = sqlite_pool().await;
642 seed_consumer_table(&pool).await;
643
644 let mut config = SqlEndpointConfig::from_uri(
645 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&initialDelay=0&delay=1",
646 )
647 .unwrap();
648 config.resolve_defaults();
649
650 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
651 let template = parse_query_template(&config.query, config.placeholder).unwrap();
652
653 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
654 tokio::spawn(async move {
655 while let Some(env) = rx.recv().await {
656 if let Some(reply_tx) = env.reply_tx {
657 let _ =
658 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
659 }
660 }
661 });
662 let ctx = ConsumerContext::new(tx, CancellationToken::new());
663
664 consumer
665 .poll_database(&pool, &ctx, &template)
666 .await
667 .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
668
669 let row = sqlx::query("select failed from jobs where id = 1")
670 .fetch_one(&pool)
671 .await
672 .expect("row 1");
673 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
674
675 let row = sqlx::query("select failed from jobs where id = 2")
676 .fetch_one(&pool)
677 .await
678 .expect("row 2");
679 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
680
681 assert_eq!(failed_1, 1);
682 assert_eq!(failed_2, 1);
683 }
684
685 #[tokio::test]
686 async fn poll_database_breaks_batch_on_consume_fail() {
687 let pool = sqlite_pool().await;
688 seed_consumer_table(&pool).await;
689
690 let mut config = SqlEndpointConfig::from_uri(
691 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&breakBatchOnConsumeFail=true&initialDelay=0&delay=1",
692 )
693 .unwrap();
694 config.resolve_defaults();
695
696 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
697 let template = parse_query_template(&config.query, config.placeholder).unwrap();
698
699 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
700 tokio::spawn(async move {
701 while let Some(env) = rx.recv().await {
702 if let Some(reply_tx) = env.reply_tx {
703 let _ =
704 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
705 }
706 }
707 });
708 let ctx = ConsumerContext::new(tx, CancellationToken::new());
709
710 let err = consumer
711 .poll_database(&pool, &ctx, &template)
712 .await
713 .expect_err("must stop on first downstream failure");
714 assert!(err.to_string().contains("downstream boom"));
715
716 let row = sqlx::query("select failed from jobs where id = 1")
717 .fetch_one(&pool)
718 .await
719 .expect("row 1");
720 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
721
722 let row = sqlx::query("select failed from jobs where id = 2")
723 .fetch_one(&pool)
724 .await
725 .expect("row 2");
726 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
727
728 assert_eq!(failed_1, 1);
729 assert_eq!(failed_2, 0, "second row must not be processed");
730 }
731
732 #[tokio::test]
738 async fn consumer_no_panic_without_prior_resolve_defaults() {
739 let config = SqlEndpointConfig::from_uri(
740 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
741 )
742 .unwrap();
743 assert!(config.max_connections.is_none());
745
746 let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), test_rt());
747 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
748 tokio::spawn(async move {
749 while let Some(env) = rx.recv().await {
750 if let Some(reply_tx) = env.reply_tx {
751 let _ = reply_tx.send(Ok(env.exchange));
752 }
753 }
754 });
755 let token = CancellationToken::new();
756 let ctx = ConsumerContext::new(tx, token.clone());
757
758 let consumer_handle = tokio::spawn(async move { consumer.start(ctx).await });
760
761 tokio::time::sleep(Duration::from_millis(50)).await;
763 token.cancel();
764
765 let result = consumer_handle.await.expect("task should not panic");
766 let _ = result;
768 }
769
770 #[tokio::test]
772 async fn stop_closes_pool() {
773 let pool = sqlite_pool().await;
774 seed_consumer_table(&pool).await;
775
776 let mut config = SqlEndpointConfig::from_uri(
777 "sql:select id from jobs?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
778 )
779 .unwrap();
780 config.resolve_defaults();
781
782 let pool_cell = Arc::new(OnceCell::new());
783 pool_cell.set(pool.clone()).unwrap();
784
785 let mut consumer = SqlConsumer::new(config, pool_cell, test_rt());
786 consumer.stop().await.expect("stop should succeed");
787
788 assert!(
790 pool.is_closed(),
791 "Pool should be closed after consumer.stop()"
792 );
793 }
794
795 #[tokio::test]
797 async fn double_stop_is_safe() {
798 let pool = sqlite_pool().await;
799 let mut config = SqlEndpointConfig::from_uri(
800 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
801 )
802 .unwrap();
803 config.resolve_defaults();
804
805 let pool_cell = Arc::new(OnceCell::new());
806 pool_cell.set(pool.clone()).unwrap();
807
808 let mut consumer = SqlConsumer::new(config, pool_cell, test_rt());
809 consumer.stop().await.expect("first stop should succeed");
810 consumer
811 .stop()
812 .await
813 .expect("second stop should also succeed");
814 }
815
816 #[tokio::test]
818 async fn start_after_stop_rejected() {
819 let pool = sqlite_pool().await;
820 let mut config = SqlEndpointConfig::from_uri(
821 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
822 )
823 .unwrap();
824 config.resolve_defaults();
825
826 let pool_cell = Arc::new(OnceCell::new());
827 pool_cell.set(pool.clone()).unwrap();
828
829 let mut consumer = SqlConsumer::new(config, pool_cell, test_rt());
830 consumer.stop().await.expect("stop should succeed");
831
832 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
833 tokio::spawn(async move {
834 while let Some(env) = rx.recv().await {
835 if let Some(reply_tx) = env.reply_tx {
836 let _ = reply_tx.send(Ok(env.exchange));
837 }
838 }
839 });
840 let ctx = ConsumerContext::new(tx, CancellationToken::new());
841
842 let result = consumer.start(ctx).await;
843 assert!(result.is_err());
844 let err_msg = result.unwrap_err().to_string();
845 assert!(
846 err_msg.contains("cannot be restarted") || err_msg.contains("after stop"),
847 "Expected restart error, got: {}",
848 err_msg
849 );
850 }
851
852 #[tokio::test]
854 async fn batch_mode_per_row_post_processing() {
855 let pool = sqlite_pool().await;
856 seed_consumer_table(&pool).await;
857
858 let mut config = SqlEndpointConfig::from_uri(
859 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
860 )
861 .unwrap();
862 config.resolve_defaults();
863
864 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
865 let template = parse_query_template(&config.query, config.placeholder).unwrap();
866
867 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
868 tokio::spawn(async move {
869 while let Some(env) = rx.recv().await {
870 if let Some(reply_tx) = env.reply_tx {
871 let _ = reply_tx.send(Ok(env.exchange));
872 }
873 }
874 });
875 let ctx = ConsumerContext::new(tx, CancellationToken::new());
876
877 consumer
878 .poll_database(&pool, &ctx, &template)
879 .await
880 .expect("poll must succeed");
881
882 let row = sqlx::query("select processed from jobs where id = 1")
884 .fetch_one(&pool)
885 .await
886 .expect("row 1");
887 let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
888
889 let row = sqlx::query("select processed from jobs where id = 2")
890 .fetch_one(&pool)
891 .await
892 .expect("row 2");
893 let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
894
895 assert_eq!(
896 processed_1, 1,
897 "row 1 should be marked processed via per-row onConsume"
898 );
899 assert_eq!(
900 processed_2, 1,
901 "row 2 should be marked processed via per-row onConsume"
902 );
903 }
904
905 #[tokio::test]
907 async fn batch_mode_per_row_post_processing_on_failure() {
908 let pool = sqlite_pool().await;
909 seed_consumer_table(&pool).await;
910
911 let mut config = SqlEndpointConfig::from_uri(
912 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
913 )
914 .unwrap();
915 config.resolve_defaults();
916
917 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
918 let template = parse_query_template(&config.query, config.placeholder).unwrap();
919
920 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
921 tokio::spawn(async move {
922 while let Some(env) = rx.recv().await {
923 if let Some(reply_tx) = env.reply_tx {
924 let _ =
925 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
926 }
927 }
928 });
929 let ctx = ConsumerContext::new(tx, CancellationToken::new());
930
931 consumer
932 .poll_database(&pool, &ctx, &template)
933 .await
934 .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
935
936 let row = sqlx::query("select failed from jobs where id = 1")
938 .fetch_one(&pool)
939 .await
940 .expect("row 1");
941 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
942
943 let row = sqlx::query("select failed from jobs where id = 2")
944 .fetch_one(&pool)
945 .await
946 .expect("row 2");
947 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
948
949 assert_eq!(
950 failed_1, 1,
951 "row 1 should be marked failed via per-row onConsumeFailed"
952 );
953 assert_eq!(
954 failed_2, 1,
955 "row 2 should be marked failed via per-row onConsumeFailed"
956 );
957 }
958
959 #[tokio::test]
960 async fn bridge_error_handler_routes_poll_errors_to_exchange_error() {
961 let mut config = config();
962 config.bridge_error_handler = true;
963 let consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()), test_rt());
964
965 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
966 tokio::spawn(async move {
967 while let Some(env) = rx.recv().await {
968 assert!(env.exchange.error.is_some(), "exchange must carry error");
969 if let Some(reply_tx) = env.reply_tx {
970 let _ = reply_tx.send(Ok(env.exchange));
971 }
972 break;
973 }
974 });
975
976 let ctx = ConsumerContext::new(tx, CancellationToken::new());
977 consumer
978 .bridge_poll_error(&ctx, CamelError::ProcessorError("poll failed".into()))
979 .await
980 .expect("bridging should succeed");
981 }
982
983 #[tracing_test::traced_test]
987 #[tokio::test]
988 async fn bridged_poll_failure_emits_warn_not_error() {
989 let pool = sqlite_pool().await;
990 let mut config = config();
995 config.bridge_error_handler = true;
996 config.query = "select * from nonexistent_table".to_string();
998 config.resolve_defaults();
999 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1000 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1001
1002 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1005 tokio::spawn(async move {
1006 while let Some(env) = rx.recv().await {
1007 if let Some(reply_tx) = env.reply_tx {
1008 let _ = reply_tx.send(Ok(env.exchange));
1009 }
1010 }
1011 });
1012 let ctx = ConsumerContext::new(tx, CancellationToken::new());
1013
1014 consumer.handle_poll_result(&pool, &ctx, &template).await;
1016
1017 assert!(
1019 !logs_contain("ERROR"),
1020 "bridged poll failure must not emit ERROR (handler owns it); check captured logs for stray ERROR lines"
1021 );
1022 assert!(
1024 logs_contain("WARN"),
1025 "bridged poll failure should emit warn! for operator visibility"
1026 );
1027 }
1028
1029 #[tracing_test::traced_test]
1039 #[tokio::test]
1040 async fn unbridged_send_and_wait_failure_emits_error_loud() {
1041 let pool = sqlite_pool().await;
1042 sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
1043 .execute(&pool)
1044 .await
1045 .expect("create table");
1046 sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha')")
1047 .execute(&pool)
1048 .await
1049 .expect("seed rows");
1050
1051 let mut config = SqlEndpointConfig::from_uri(
1052 "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1053 )
1054 .unwrap();
1055 config.resolve_defaults();
1056 config.bridge_error_handler = false;
1058 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1059 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1060
1061 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
1063 tokio::spawn(async move {
1064 while let Some(env) = rx.recv().await {
1065 if let Some(reply_tx) = env.reply_tx {
1066 let _ = reply_tx.send(Err(CamelError::ProcessorError("boom".into())));
1067 }
1068 }
1069 });
1070 let ctx = ConsumerContext::new(tx, CancellationToken::new());
1071
1072 let _ = consumer.poll_database(&pool, &ctx, &template).await;
1073
1074 assert!(
1076 logs_contain("ERROR"),
1077 "unbridged send_and_wait failure MUST emit ERROR (consumer owns the signal)"
1078 );
1079 }
1080
1081 #[tracing_test::traced_test]
1084 #[tokio::test]
1085 async fn unbridged_handle_poll_result_emits_error_loud() {
1086 let pool = sqlite_pool().await;
1087 let mut config = config();
1090 config.bridge_error_handler = false;
1091 config.query = "select * from nonexistent_table".to_string();
1092 config.resolve_defaults();
1093 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1094 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1095
1096 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
1098 tokio::spawn(async move {
1099 while let Some(env) = rx.recv().await {
1100 if let Some(reply_tx) = env.reply_tx {
1101 let _ = reply_tx.send(Ok(env.exchange));
1102 }
1103 }
1104 });
1105 let ctx = ConsumerContext::new(tx, CancellationToken::new());
1106
1107 consumer.handle_poll_result(&pool, &ctx, &template).await;
1108
1109 assert!(
1110 logs_contain("ERROR"),
1111 "unbridged handle_poll_result failure MUST emit ERROR (consumer owns signal)"
1112 );
1113 }
1114
1115 #[tokio::test]
1116 async fn stream_list_consumer_emits_ndjson_body() {
1117 let pool = sqlite_pool().await;
1118 sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
1119 .execute(&pool)
1120 .await
1121 .expect("create table");
1122 sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')")
1123 .execute(&pool)
1124 .await
1125 .expect("seed rows");
1126
1127 let mut config = SqlEndpointConfig::from_uri(
1128 "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1129 )
1130 .unwrap();
1131 config.resolve_defaults();
1132
1133 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1134 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1135
1136 let (tx, rx) = mpsc::channel::<ExchangeEnvelope>(8);
1137 let (result_tx, result_rx) = tokio::sync::oneshot::channel::<Exchange>();
1138 tokio::spawn(async move {
1139 let mut rx = rx;
1140 if let Some(env) = rx.recv().await {
1141 if let Some(reply_tx) = env.reply_tx {
1142 let _ = reply_tx.send(Ok(env.exchange.clone()));
1143 }
1144 let _ = result_tx.send(env.exchange);
1145 }
1146 });
1147 let ctx = ConsumerContext::new(tx, CancellationToken::new());
1148
1149 consumer
1150 .poll_database(&pool, &ctx, &template)
1151 .await
1152 .expect("poll must succeed");
1153
1154 let exchange = result_rx.await.expect("should have received one exchange");
1155
1156 match exchange.input.body {
1157 Body::Stream(ref stream_body) => {
1158 let stream = stream_body.stream.clone();
1159 let mut guard = stream.lock().await;
1160 let stream_opt = guard.take();
1161 assert!(stream_opt.is_some(), "stream should be present");
1162
1163 use futures::StreamExt;
1164 let mut collected = Vec::new();
1165 let mut stream = stream_opt.unwrap();
1166 while let Some(chunk) = stream.next().await {
1167 let chunk = chunk.expect("stream chunk should not error");
1168 collected.extend_from_slice(&chunk);
1169 }
1170
1171 let ndjson = String::from_utf8(collected).expect("valid utf8");
1172 let lines: Vec<&str> = ndjson.trim().lines().collect();
1173 assert_eq!(lines.len(), 3, "should have 3 NDJSON lines");
1174
1175 let row0: serde_json::Value =
1176 serde_json::from_str(lines[0]).expect("valid json line 0");
1177 assert_eq!(row0["id"], 1);
1178 assert_eq!(row0["name"], "alpha");
1179
1180 let row1: serde_json::Value =
1181 serde_json::from_str(lines[1]).expect("valid json line 1");
1182 assert_eq!(row1["id"], 2);
1183 assert_eq!(row1["name"], "beta");
1184
1185 let row2: serde_json::Value =
1186 serde_json::from_str(lines[2]).expect("valid json line 2");
1187 assert_eq!(row2["id"], 3);
1188 assert_eq!(row2["name"], "gamma");
1189 }
1190 ref other => panic!("expected Body::Stream, got {:?}", other),
1191 }
1192 }
1193
1194 #[tokio::test]
1195 async fn stream_list_consumer_empty_result_set_emits_empty_stream() {
1196 let pool = sqlite_pool().await;
1197 sqlx::query("CREATE TABLE empty_items (id INTEGER PRIMARY KEY, name TEXT)")
1198 .execute(&pool)
1199 .await
1200 .expect("create table");
1201
1202 let mut config = SqlEndpointConfig::from_uri(
1203 "sql:select id, name from empty_items?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1204 )
1205 .unwrap();
1206 config.resolve_defaults();
1207
1208 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()), test_rt());
1209 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1210
1211 let (tx, rx) = tokio::sync::oneshot::channel();
1212 let (mpsc_tx, mut mpsc_rx) = mpsc::channel::<ExchangeEnvelope>(8);
1213 tokio::spawn(async move {
1214 while let Some(env) = mpsc_rx.recv().await {
1215 if let Some(reply_tx) = env.reply_tx {
1216 let _ = reply_tx.send(Ok(env.exchange.clone()));
1217 }
1218 let _ = tx.send(env.exchange);
1219 break;
1220 }
1221 });
1222 let ctx = ConsumerContext::new(mpsc_tx, CancellationToken::new());
1223
1224 consumer
1225 .poll_database(&pool, &ctx, &template)
1226 .await
1227 .expect("poll must succeed");
1228
1229 let exchange = rx
1230 .await
1231 .expect("StreamList should emit exchange even for empty results");
1232
1233 match exchange.input.body {
1234 Body::Stream(ref stream_body) => {
1235 let stream = stream_body.stream.clone();
1236 let mut guard = stream.lock().await;
1237 let stream_opt = guard.take();
1238
1239 use futures::StreamExt;
1240 let mut count = 0;
1241 if let Some(mut stream) = stream_opt {
1242 while let Some(chunk) = stream.next().await {
1243 let chunk = chunk.expect("stream chunk should not error");
1244 count += chunk.len();
1245 }
1246 }
1247 assert_eq!(count, 0, "empty table should produce zero stream bytes");
1248 }
1249 ref other => panic!("expected Body::Stream, got {:?}", other),
1250 }
1251 }
1252
1253 #[tokio::test]
1257 async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
1258 use camel_component_api::NetworkRetryPolicy;
1259 use std::sync::Arc;
1260 use std::sync::atomic::{AtomicU32, Ordering};
1261
1262 let policy = NetworkRetryPolicy {
1263 max_attempts: 3,
1264 initial_delay: Duration::from_millis(1),
1265 max_delay: Duration::from_millis(1),
1266 multiplier: 1.0,
1267 ..NetworkRetryPolicy::default()
1268 };
1269
1270 let calls = Arc::new(AtomicU32::new(0));
1271 let calls_clone = Arc::clone(&calls);
1272
1273 let mut attempt: u32 = 0;
1274 let _result: Result<(), ()> = loop {
1275 attempt += 1;
1276 calls_clone.fetch_add(1, Ordering::SeqCst);
1277 let op_result: Result<(), ()> = Err(());
1278 match op_result {
1279 Ok(v) => break Ok(v),
1280 Err(_) if policy.should_retry(attempt) => {
1281 let delay = policy.delay_for(attempt - 1);
1282 tokio::time::sleep(delay).await;
1283 continue;
1284 }
1285 Err(_) => break Err(()),
1286 }
1287 };
1288
1289 assert_eq!(
1290 calls.load(Ordering::SeqCst),
1291 3,
1292 "max_attempts=3 must yield exactly 3 invocations"
1293 );
1294 }
1295}