1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use serde_json::Value as JsonValue;
6use sqlx::AnyPool;
7use sqlx::any::AnyPoolOptions;
8use sqlx::any::AnyRow;
9use tokio::sync::OnceCell;
10use tracing::{debug, error, info, warn};
11
12use camel_component_api::{Body, CamelError, Exchange, Message};
13use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
14
15use crate::config::{
16 PollStrategy, ProcessingStrategy, SqlEndpointConfig, TransactionMode, enrich_db_url_with_ssl,
17 redact_db_url,
18};
19use crate::headers;
20use crate::query::{QueryTemplate, parse_query_template, resolve_params};
21use crate::utils::{bind_json_values, row_to_json};
22
23pub struct SqlConsumer {
24 pub(crate) config: SqlEndpointConfig,
25 pub(crate) pool: Arc<OnceCell<AnyPool>>,
26 stopped: bool,
27}
28
29impl SqlConsumer {
30 pub fn new(config: SqlEndpointConfig, pool: Arc<OnceCell<AnyPool>>) -> Self {
31 Self {
32 config,
33 pool,
34 stopped: false,
35 }
36 }
37
38 async fn poll_database(
40 &self,
41 pool: &AnyPool,
42 context: &ConsumerContext,
43 template: &QueryTemplate,
44 ) -> Result<(), CamelError> {
45 let empty_exchange = Exchange::new(Message::default());
47
48 let prepared = resolve_params(template, &empty_exchange, &self.config.in_separator)?;
50
51 debug!(query = %prepared.sql, "executing SQL consumer poll");
53 let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
54 let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
55 warn!(error = %e, "SQL consumer poll query failed");
56 CamelError::ProcessorError(format!("Query execution failed: {}", e))
57 })?;
58
59 debug!(rows = rows.len(), "SQL consumer poll completed");
60
61 if rows.is_empty() && !self.config.route_empty_result_set {
63 return Ok(());
64 }
65
66 let rows_to_process: Vec<AnyRow> = if let Some(max) = self.config.max_messages_per_poll {
68 if max > 0 {
69 rows.into_iter().take(max as usize).collect()
70 } else {
71 rows
72 }
73 } else {
74 rows
75 };
76
77 if self.config.use_iterator {
78 for row in rows_to_process {
80 let row_json = row_to_json(&row)?;
81
82 let mut msg = Message::new(Body::Json(row_json.clone()));
84
85 if let Some(obj) = row_json.as_object() {
87 for (key, value) in obj {
88 msg.set_header(format!("CamelSql.{}", key), value.clone());
89 }
90 }
91
92 let exchange = Exchange::new(msg);
93
94 let result = context.send_and_wait(exchange).await;
96
97 if let Err(e) = self.handle_post_processing(pool, &result, &row_json).await {
99 error!(error = %e, "Post-processing failed");
100 if self.config.break_batch_on_consume_fail {
101 return Err(e);
102 }
103 }
104
105 if let Err(ref consume_err) = result
107 && self.config.break_batch_on_consume_fail
108 {
109 return Err(consume_err.clone());
110 }
111 }
112 } else {
113 let rows_json: Vec<JsonValue> = rows_to_process
115 .iter()
116 .map(row_to_json)
117 .collect::<Result<Vec<_>, CamelError>>()?;
118
119 let row_count = rows_json.len();
120
121 let mut msg = Message::new(Body::Json(JsonValue::Array(rows_json.clone())));
123 msg.set_header(headers::ROW_COUNT, JsonValue::Number(row_count.into()));
124
125 let exchange = Exchange::new(msg);
126
127 let result = context.send_and_wait(exchange).await;
129
130 for row_json in rows_json.iter() {
134 if let Err(e) = self.handle_post_processing(pool, &result, row_json).await {
135 error!(error = %e, "Post-processing failed for batch row");
136 if self.config.break_batch_on_consume_fail {
137 return Err(e);
138 }
139 }
140 }
141
142 if let Err(ref consume_err) = result
144 && self.config.break_batch_on_consume_fail
145 {
146 return Err(consume_err.clone());
147 }
148 }
149
150 if let Some(ref batch_query) = self.config.on_consume_batch_complete
152 && let Err(e) = self
153 .execute_post_query(pool, batch_query, &JsonValue::Null)
154 .await
155 {
156 error!(error = %e, "onConsumeBatchComplete query failed");
157 }
158
159 Ok(())
160 }
161
162 async fn handle_post_processing(
164 &self,
165 pool: &AnyPool,
166 result: &Result<Exchange, CamelError>,
167 row_json: &JsonValue,
168 ) -> Result<(), CamelError> {
169 match result {
170 Ok(_) => {
171 if let Some(ref on_consume) = self.config.on_consume {
173 self.execute_post_query(pool, on_consume, row_json).await?;
174 }
175 }
176 Err(_) => {
177 if let Some(ref on_consume_failed) = self.config.on_consume_failed {
179 self.execute_post_query(pool, on_consume_failed, row_json)
180 .await?;
181 }
182 }
183 }
184 Ok(())
185 }
186
187 async fn execute_post_query(
189 &self,
190 pool: &AnyPool,
191 query_str: &str,
192 row_json: &JsonValue,
193 ) -> Result<(), CamelError> {
194 let template = parse_query_template(query_str, self.config.placeholder)?;
196
197 let mut temp_msg = Message::new(Body::Json(row_json.clone()));
200 if let Some(obj) = row_json.as_object() {
201 for (key, value) in obj {
202 temp_msg.set_header(format!("CamelSql.{}", key), value.clone());
203 }
204 }
205 let temp_exchange = Exchange::new(temp_msg);
206
207 let prepared = resolve_params(&template, &temp_exchange, &self.config.in_separator)?;
209
210 let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
212 let result = query.execute(pool).await.map_err(|e| {
213 CamelError::ProcessorError(format!("Post-query execution failed: {}", e))
214 })?;
215
216 if result.rows_affected() == 0 {
218 warn!(
219 query = query_str,
220 "Post-processing query affected 0 rows — the row may not have been marked correctly"
221 );
222 }
223
224 Ok(())
225 }
226
227 async fn bridge_poll_error(
228 &self,
229 context: &ConsumerContext,
230 error: CamelError,
231 ) -> Result<(), CamelError> {
232 if !self.config.bridge_error_handler {
233 return Ok(());
234 }
235 let mut exchange = Exchange::new(Message::default());
236 exchange.set_error(error);
237 context.send_and_wait(exchange).await.map(|_| ())
238 }
239}
240
241#[async_trait]
242impl Consumer for SqlConsumer {
243 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
244 if self.stopped {
246 return Err(CamelError::Config(
247 "SQL consumer cannot be restarted after stop".into(),
248 ));
249 }
250
251 let pool = self
253 .pool
254 .get_or_try_init(|| async {
255 self.config.resolve_defaults();
257 self.config.resolve_file_query().await?;
259
260 sqlx::any::install_default_drivers();
263 let db_url = enrich_db_url_with_ssl(&self.config.db_url, &self.config)?;
264
265 let max_conn = self.config.max_connections.ok_or_else(|| {
266 CamelError::Config("max_connections not resolved for SQL consumer pool".into())
267 })?;
268 let min_conn = self.config.min_connections.ok_or_else(|| {
269 CamelError::Config("min_connections not resolved for SQL consumer pool".into())
270 })?;
271 let idle_timeout = self.config.idle_timeout_secs.ok_or_else(|| {
272 CamelError::Config(
273 "idle_timeout_secs not resolved for SQL consumer pool".into(),
274 )
275 })?;
276 let max_lifetime = self.config.max_lifetime_secs.ok_or_else(|| {
277 CamelError::Config(
278 "max_lifetime_secs not resolved for SQL consumer pool".into(),
279 )
280 })?;
281
282 info!(
283 db_url = %redact_db_url(&self.config.db_url),
284 "SQL consumer pool initializing"
285 );
286 AnyPoolOptions::new()
287 .max_connections(max_conn)
288 .min_connections(min_conn)
289 .idle_timeout(Duration::from_secs(idle_timeout))
290 .max_lifetime(Duration::from_secs(max_lifetime))
291 .connect(&db_url)
292 .await
293 .map_err(|e| {
294 CamelError::EndpointCreationFailed(format!(
295 "Failed to connect to database: {}",
296 e
297 ))
298 })
299 })
300 .await?;
301
302 if self.config.transaction_mode == TransactionMode::Managed {
304 warn!("transactionManager not yet implemented; using Auto mode");
305 }
306
307 if self.config.processing_strategy == ProcessingStrategy::Scheduled {
309 debug!(
310 "Processing strategy: Scheduled (rows dispatched individually via send_and_wait)"
311 );
312 }
313 if self.config.poll_strategy == PollStrategy::Burst {
314 debug!("Poll strategy: Burst (rapid successive polls)");
315 }
316
317 if self.config.on_consume.is_none() {
319 warn!(
320 "SQL consumer started without onConsume configured — consumed rows will not be marked/deleted"
321 );
322 }
323
324 info!(
325 db_url = %redact_db_url(&self.config.db_url),
326 query_len = self.config.query.len(),
327 "SQL consumer started"
328 );
329
330 let template = parse_query_template(&self.config.query, self.config.placeholder)
332 .map_err(|e| CamelError::Config(format!("Invalid query template: {}", e)))?;
333
334 if self.config.initial_delay_ms > 0 {
336 tokio::select! {
337 _ = context.cancelled() => {
338 info!("SQL consumer stopped during initial delay");
339 return Ok(());
340 }
341 _ = tokio::time::sleep(Duration::from_millis(self.config.initial_delay_ms)) => {}
342 }
343 }
344
345 let mut poll_count: u32 = 0;
347 loop {
348 if let Some(max_repeats) = self.config.repeat_count
350 && poll_count >= max_repeats
351 {
352 info!(
353 repeat_count = max_repeats,
354 "SQL consumer reached repeat_count limit, stopping"
355 );
356 break;
357 }
358
359 tokio::select! {
360 _ = context.cancelled() => {
361 info!("SQL consumer stopped");
362 break;
363 }
364 _ = tokio::time::sleep(Duration::from_millis(self.config.delay_ms)) => {
365 poll_count += 1;
366 if let Err(e) = self.poll_database(pool, &context, &template).await {
367 error!(error = %e, "SQL consumer poll failed");
368 if let Err(route_err) = self.bridge_poll_error(&context, e).await {
369 error!(error = %route_err, "Failed to bridge SQL consumer error to route");
370 }
371 }
372 }
373 }
374 }
375
376 Ok(())
377 }
378
379 async fn stop(&mut self) -> Result<(), CamelError> {
380 if self.stopped {
382 debug!("SQL consumer stop called on already-stopped consumer");
383 return Ok(());
384 }
385
386 if let Some(pool) = self.pool.get() {
388 debug!("SQL consumer closing connection pool");
389 pool.close().await;
390 debug!("SQL consumer pool closed");
391 }
392
393 self.stopped = true;
394 info!("SQL consumer stopped");
395 Ok(())
396 }
397
398 fn concurrency_model(&self) -> ConcurrencyModel {
399 ConcurrencyModel::Sequential
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409 use crate::config::SqlEndpointConfig;
410 use camel_component_api::ExchangeEnvelope;
411 use camel_component_api::UriConfig;
412 use sqlx::any::AnyPoolOptions;
413 use std::sync::Arc;
414 use std::time::Duration;
415 use tokio::sync::mpsc;
416 use tokio_util::sync::CancellationToken;
417
418 async fn sqlite_pool() -> AnyPool {
419 sqlx::any::install_default_drivers();
420 AnyPoolOptions::new()
421 .max_connections(1)
422 .connect("sqlite::memory:")
423 .await
424 .expect("sqlite pool")
425 }
426
427 async fn seed_consumer_table(pool: &AnyPool) {
428 sqlx::query("CREATE TABLE jobs (id INTEGER PRIMARY KEY, processed INTEGER DEFAULT 0, failed INTEGER DEFAULT 0)")
429 .execute(pool)
430 .await
431 .expect("create table");
432 sqlx::query("INSERT INTO jobs (id, processed, failed) VALUES (1, 0, 0), (2, 0, 0)")
433 .execute(pool)
434 .await
435 .expect("seed rows");
436 }
437
438 fn config() -> SqlEndpointConfig {
439 let mut c =
440 SqlEndpointConfig::from_uri("sql:select * from t?db_url=postgres://localhost/test")
441 .unwrap();
442 c.resolve_defaults();
443 c
444 }
445
446 #[test]
447 fn consumer_concurrency_model() {
448 let c = SqlConsumer::new(config(), Arc::new(OnceCell::new()));
449 assert_eq!(c.concurrency_model(), ConcurrencyModel::Sequential);
450 }
451
452 #[test]
453 fn consumer_stores_config() {
454 let mut config = SqlEndpointConfig::from_uri(
455 "sql:select * from t?db_url=postgres://localhost/test&delay=2000&onConsume=update t set done=true"
456 ).unwrap();
457 config.resolve_defaults();
458 let c = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
459 assert_eq!(c.config.delay_ms, 2000);
460 assert!(c.config.on_consume.is_some());
461 }
462
463 #[tokio::test]
464 async fn poll_database_runs_on_consume_for_successful_rows() {
465 let pool = sqlite_pool().await;
466 seed_consumer_table(&pool).await;
467
468 let mut config = SqlEndpointConfig::from_uri(
469 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
470 )
471 .unwrap();
472 config.resolve_defaults();
473
474 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
475 let template = parse_query_template(&config.query, config.placeholder).unwrap();
476
477 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
478 tokio::spawn(async move {
479 while let Some(env) = rx.recv().await {
480 if let Some(reply_tx) = env.reply_tx {
481 let _ = reply_tx.send(Ok(env.exchange));
482 }
483 }
484 });
485 let ctx = ConsumerContext::new(tx, CancellationToken::new());
486
487 consumer
488 .poll_database(&pool, &ctx, &template)
489 .await
490 .expect("poll must succeed");
491
492 let row = sqlx::query("select processed from jobs where id = 1")
493 .fetch_one(&pool)
494 .await
495 .expect("row 1");
496 let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
497
498 let row = sqlx::query("select processed from jobs where id = 2")
499 .fetch_one(&pool)
500 .await
501 .expect("row 2");
502 let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
503
504 assert_eq!(processed_1, 1);
505 assert_eq!(processed_2, 1);
506 }
507
508 #[tokio::test]
509 async fn poll_database_runs_on_consume_failed_when_downstream_fails() {
510 let pool = sqlite_pool().await;
511 seed_consumer_table(&pool).await;
512
513 let mut config = SqlEndpointConfig::from_uri(
514 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&initialDelay=0&delay=1",
515 )
516 .unwrap();
517 config.resolve_defaults();
518
519 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
520 let template = parse_query_template(&config.query, config.placeholder).unwrap();
521
522 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
523 tokio::spawn(async move {
524 while let Some(env) = rx.recv().await {
525 if let Some(reply_tx) = env.reply_tx {
526 let _ =
527 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
528 }
529 }
530 });
531 let ctx = ConsumerContext::new(tx, CancellationToken::new());
532
533 consumer
534 .poll_database(&pool, &ctx, &template)
535 .await
536 .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
537
538 let row = sqlx::query("select failed from jobs where id = 1")
539 .fetch_one(&pool)
540 .await
541 .expect("row 1");
542 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
543
544 let row = sqlx::query("select failed from jobs where id = 2")
545 .fetch_one(&pool)
546 .await
547 .expect("row 2");
548 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
549
550 assert_eq!(failed_1, 1);
551 assert_eq!(failed_2, 1);
552 }
553
554 #[tokio::test]
555 async fn poll_database_breaks_batch_on_consume_fail() {
556 let pool = sqlite_pool().await;
557 seed_consumer_table(&pool).await;
558
559 let mut config = SqlEndpointConfig::from_uri(
560 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&breakBatchOnConsumeFail=true&initialDelay=0&delay=1",
561 )
562 .unwrap();
563 config.resolve_defaults();
564
565 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
566 let template = parse_query_template(&config.query, config.placeholder).unwrap();
567
568 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
569 tokio::spawn(async move {
570 while let Some(env) = rx.recv().await {
571 if let Some(reply_tx) = env.reply_tx {
572 let _ =
573 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
574 }
575 }
576 });
577 let ctx = ConsumerContext::new(tx, CancellationToken::new());
578
579 let err = consumer
580 .poll_database(&pool, &ctx, &template)
581 .await
582 .expect_err("must stop on first downstream failure");
583 assert!(err.to_string().contains("downstream boom"));
584
585 let row = sqlx::query("select failed from jobs where id = 1")
586 .fetch_one(&pool)
587 .await
588 .expect("row 1");
589 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
590
591 let row = sqlx::query("select failed from jobs where id = 2")
592 .fetch_one(&pool)
593 .await
594 .expect("row 2");
595 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
596
597 assert_eq!(failed_1, 1);
598 assert_eq!(failed_2, 0, "second row must not be processed");
599 }
600
601 #[tokio::test]
607 async fn consumer_no_panic_without_prior_resolve_defaults() {
608 let config = SqlEndpointConfig::from_uri(
609 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
610 )
611 .unwrap();
612 assert!(config.max_connections.is_none());
614
615 let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()));
616 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
617 tokio::spawn(async move {
618 while let Some(env) = rx.recv().await {
619 if let Some(reply_tx) = env.reply_tx {
620 let _ = reply_tx.send(Ok(env.exchange));
621 }
622 }
623 });
624 let token = CancellationToken::new();
625 let ctx = ConsumerContext::new(tx, token.clone());
626
627 let consumer_handle = tokio::spawn(async move { consumer.start(ctx).await });
629
630 tokio::time::sleep(Duration::from_millis(50)).await;
632 token.cancel();
633
634 let result = consumer_handle.await.expect("task should not panic");
635 let _ = result;
637 }
638
639 #[tokio::test]
641 async fn stop_closes_pool() {
642 let pool = sqlite_pool().await;
643 seed_consumer_table(&pool).await;
644
645 let mut config = SqlEndpointConfig::from_uri(
646 "sql:select id from jobs?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
647 )
648 .unwrap();
649 config.resolve_defaults();
650
651 let pool_cell = Arc::new(OnceCell::new());
652 pool_cell.set(pool.clone()).unwrap();
653
654 let mut consumer = SqlConsumer::new(config, pool_cell);
655 consumer.stop().await.expect("stop should succeed");
656
657 assert!(
659 pool.is_closed(),
660 "Pool should be closed after consumer.stop()"
661 );
662 }
663
664 #[tokio::test]
666 async fn double_stop_is_safe() {
667 let pool = sqlite_pool().await;
668 let mut config = SqlEndpointConfig::from_uri(
669 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
670 )
671 .unwrap();
672 config.resolve_defaults();
673
674 let pool_cell = Arc::new(OnceCell::new());
675 pool_cell.set(pool.clone()).unwrap();
676
677 let mut consumer = SqlConsumer::new(config, pool_cell);
678 consumer.stop().await.expect("first stop should succeed");
679 consumer
680 .stop()
681 .await
682 .expect("second stop should also succeed");
683 }
684
685 #[tokio::test]
687 async fn start_after_stop_rejected() {
688 let pool = sqlite_pool().await;
689 let mut config = SqlEndpointConfig::from_uri(
690 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
691 )
692 .unwrap();
693 config.resolve_defaults();
694
695 let pool_cell = Arc::new(OnceCell::new());
696 pool_cell.set(pool.clone()).unwrap();
697
698 let mut consumer = SqlConsumer::new(config, pool_cell);
699 consumer.stop().await.expect("stop should succeed");
700
701 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
702 tokio::spawn(async move {
703 while let Some(env) = rx.recv().await {
704 if let Some(reply_tx) = env.reply_tx {
705 let _ = reply_tx.send(Ok(env.exchange));
706 }
707 }
708 });
709 let ctx = ConsumerContext::new(tx, CancellationToken::new());
710
711 let result = consumer.start(ctx).await;
712 assert!(result.is_err());
713 let err_msg = result.unwrap_err().to_string();
714 assert!(
715 err_msg.contains("cannot be restarted") || err_msg.contains("after stop"),
716 "Expected restart error, got: {}",
717 err_msg
718 );
719 }
720
721 #[tokio::test]
723 async fn batch_mode_per_row_post_processing() {
724 let pool = sqlite_pool().await;
725 seed_consumer_table(&pool).await;
726
727 let mut config = SqlEndpointConfig::from_uri(
728 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
729 )
730 .unwrap();
731 config.resolve_defaults();
732
733 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
734 let template = parse_query_template(&config.query, config.placeholder).unwrap();
735
736 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
737 tokio::spawn(async move {
738 while let Some(env) = rx.recv().await {
739 if let Some(reply_tx) = env.reply_tx {
740 let _ = reply_tx.send(Ok(env.exchange));
741 }
742 }
743 });
744 let ctx = ConsumerContext::new(tx, CancellationToken::new());
745
746 consumer
747 .poll_database(&pool, &ctx, &template)
748 .await
749 .expect("poll must succeed");
750
751 let row = sqlx::query("select processed from jobs where id = 1")
753 .fetch_one(&pool)
754 .await
755 .expect("row 1");
756 let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
757
758 let row = sqlx::query("select processed from jobs where id = 2")
759 .fetch_one(&pool)
760 .await
761 .expect("row 2");
762 let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
763
764 assert_eq!(
765 processed_1, 1,
766 "row 1 should be marked processed via per-row onConsume"
767 );
768 assert_eq!(
769 processed_2, 1,
770 "row 2 should be marked processed via per-row onConsume"
771 );
772 }
773
774 #[tokio::test]
776 async fn batch_mode_per_row_post_processing_on_failure() {
777 let pool = sqlite_pool().await;
778 seed_consumer_table(&pool).await;
779
780 let mut config = SqlEndpointConfig::from_uri(
781 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
782 )
783 .unwrap();
784 config.resolve_defaults();
785
786 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
787 let template = parse_query_template(&config.query, config.placeholder).unwrap();
788
789 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
790 tokio::spawn(async move {
791 while let Some(env) = rx.recv().await {
792 if let Some(reply_tx) = env.reply_tx {
793 let _ =
794 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
795 }
796 }
797 });
798 let ctx = ConsumerContext::new(tx, CancellationToken::new());
799
800 consumer
801 .poll_database(&pool, &ctx, &template)
802 .await
803 .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
804
805 let row = sqlx::query("select failed from jobs where id = 1")
807 .fetch_one(&pool)
808 .await
809 .expect("row 1");
810 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
811
812 let row = sqlx::query("select failed from jobs where id = 2")
813 .fetch_one(&pool)
814 .await
815 .expect("row 2");
816 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
817
818 assert_eq!(
819 failed_1, 1,
820 "row 1 should be marked failed via per-row onConsumeFailed"
821 );
822 assert_eq!(
823 failed_2, 1,
824 "row 2 should be marked failed via per-row onConsumeFailed"
825 );
826 }
827
828 #[tokio::test]
829 async fn bridge_error_handler_routes_poll_errors_to_exchange_error() {
830 let mut config = config();
831 config.bridge_error_handler = true;
832 let consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()));
833
834 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
835 tokio::spawn(async move {
836 while let Some(env) = rx.recv().await {
837 assert!(env.exchange.error.is_some(), "exchange must carry error");
838 if let Some(reply_tx) = env.reply_tx {
839 let _ = reply_tx.send(Ok(env.exchange));
840 }
841 break;
842 }
843 });
844
845 let ctx = ConsumerContext::new(tx, CancellationToken::new());
846 consumer
847 .bridge_poll_error(&ctx, CamelError::ProcessorError("poll failed".into()))
848 .await
849 .expect("bridging should succeed");
850 }
851}