1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures::TryStreamExt;
7use serde_json::Value as JsonValue;
8use sqlx::AnyPool;
9use sqlx::any::AnyPoolOptions;
10use sqlx::any::AnyRow;
11use tokio::sync::OnceCell;
12use tracing::{debug, error, info, warn};
13
14use camel_component_api::{Body, CamelError, Exchange, Message, StreamBody, StreamMetadata};
15use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
16
17use crate::config::{
18 PollStrategy, ProcessingStrategy, SqlEndpointConfig, SqlOutputType, TransactionMode,
19 enrich_db_url_with_ssl, redact_db_url,
20};
21use crate::headers;
22use crate::query::{QueryTemplate, parse_query_template, resolve_params};
23use crate::utils::{bind_json_values, row_to_json};
24
25pub struct SqlConsumer {
26 pub(crate) config: SqlEndpointConfig,
27 pub(crate) pool: Arc<OnceCell<AnyPool>>,
28 stopped: bool,
29}
30
31impl SqlConsumer {
32 pub fn new(config: SqlEndpointConfig, pool: Arc<OnceCell<AnyPool>>) -> Self {
33 Self {
34 config,
35 pool,
36 stopped: false,
37 }
38 }
39
40 async fn poll_database(
42 &self,
43 pool: &AnyPool,
44 context: &ConsumerContext,
45 template: &QueryTemplate,
46 ) -> Result<(), CamelError> {
47 let empty_exchange = Exchange::new(Message::default());
49
50 let prepared = resolve_params(template, &empty_exchange, &self.config.in_separator)?;
52
53 debug!(query = %prepared.sql, "executing SQL consumer poll");
54
55 if self.config.output_type == SqlOutputType::StreamList {
56 return self.poll_database_stream(pool, context, &prepared).await;
57 }
58
59 let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
60 let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
61 warn!(error = %e, "SQL consumer poll query failed");
62 CamelError::ProcessorError(format!("Query execution failed: {}", e))
63 })?;
64
65 debug!(rows = rows.len(), "SQL consumer poll completed");
66
67 if rows.is_empty() && !self.config.route_empty_result_set {
68 return Ok(());
69 }
70
71 let rows_to_process: Vec<AnyRow> = if let Some(max) = self.config.max_messages_per_poll {
72 if max > 0 {
73 rows.into_iter().take(max as usize).collect()
74 } else {
75 rows
76 }
77 } else {
78 rows
79 };
80
81 if self.config.use_iterator {
82 for row in rows_to_process {
84 let row_json = row_to_json(&row)?;
85
86 let mut msg = Message::new(Body::Json(row_json.clone()));
88
89 if let Some(obj) = row_json.as_object() {
91 for (key, value) in obj {
92 msg.set_header(format!("CamelSql.{}", key), value.clone());
93 }
94 }
95
96 let exchange = Exchange::new(msg);
97
98 let result = context.send_and_wait(exchange).await;
100
101 if let Err(e) = self.handle_post_processing(pool, &result, &row_json).await {
103 error!(error = %e, "Post-processing failed");
104 if self.config.break_batch_on_consume_fail {
105 return Err(e);
106 }
107 }
108
109 if let Err(ref consume_err) = result
111 && self.config.break_batch_on_consume_fail
112 {
113 return Err(consume_err.clone());
114 }
115 }
116 } else {
117 let rows_json: Vec<JsonValue> = rows_to_process
119 .iter()
120 .map(row_to_json)
121 .collect::<Result<Vec<_>, CamelError>>()?;
122
123 let row_count = rows_json.len();
124
125 let mut msg = Message::new(Body::Json(JsonValue::Array(rows_json.clone())));
127 msg.set_header(headers::ROW_COUNT, JsonValue::Number(row_count.into()));
128
129 let exchange = Exchange::new(msg);
130
131 let result = context.send_and_wait(exchange).await;
133
134 for row_json in rows_json.iter() {
138 if let Err(e) = self.handle_post_processing(pool, &result, row_json).await {
139 error!(error = %e, "Post-processing failed for batch row");
140 if self.config.break_batch_on_consume_fail {
141 return Err(e);
142 }
143 }
144 }
145
146 if let Err(ref consume_err) = result
148 && self.config.break_batch_on_consume_fail
149 {
150 return Err(consume_err.clone());
151 }
152 }
153
154 if let Some(ref batch_query) = self.config.on_consume_batch_complete
156 && let Err(e) = self
157 .execute_post_query(pool, batch_query, &JsonValue::Null)
158 .await
159 {
160 error!(error = %e, "onConsumeBatchComplete query failed");
161 }
162
163 Ok(())
164 }
165
166 async fn poll_database_stream(
167 &self,
168 pool: &AnyPool,
169 context: &ConsumerContext,
170 prepared: &crate::query::PreparedQuery,
171 ) -> Result<(), CamelError> {
172 let pool_clone = pool.clone();
173 let sql_str = prepared.sql.clone();
174 let bindings = prepared.bindings.clone();
175
176 let byte_stream = async_stream::try_stream! {
177 let mut q = sqlx::query(&sql_str);
178 q = bind_json_values(q, &bindings);
179 let mut rows = q.fetch(&pool_clone);
180 while let Some(row) = rows.try_next().await.map_err(|e| {
181 CamelError::ProcessorError(format!("Query execution failed: {}", e))
182 })? {
183 let json_val = row_to_json(&row).map_err(|e| {
184 CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
185 })?;
186 let mut bytes = serde_json::to_vec(&json_val)
187 .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
188 bytes.push(b'\n');
189 yield Bytes::from(bytes);
190 }
191 };
192
193 let msg = Message::new(Body::Stream(StreamBody {
194 stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
195 metadata: StreamMetadata {
196 content_type: Some("application/x-ndjson".to_string()),
197 size_hint: None,
198 origin: None,
199 },
200 }));
201
202 let exchange = Exchange::new(msg);
203 let result = context.send_and_wait(exchange).await;
204 if let Err(e) = result {
205 error!(error = %e, "StreamList consumer downstream processing failed");
206 return Err(e);
207 }
208
209 debug!("StreamList: consumer poll completed (lazy stream emitted)");
210 Ok(())
211 }
212
213 async fn handle_post_processing(
215 &self,
216 pool: &AnyPool,
217 result: &Result<Exchange, CamelError>,
218 row_json: &JsonValue,
219 ) -> Result<(), CamelError> {
220 match result {
221 Ok(_) => {
222 if let Some(ref on_consume) = self.config.on_consume {
224 self.execute_post_query(pool, on_consume, row_json).await?;
225 }
226 }
227 Err(_) => {
228 if let Some(ref on_consume_failed) = self.config.on_consume_failed {
230 self.execute_post_query(pool, on_consume_failed, row_json)
231 .await?;
232 }
233 }
234 }
235 Ok(())
236 }
237
238 async fn execute_post_query(
240 &self,
241 pool: &AnyPool,
242 query_str: &str,
243 row_json: &JsonValue,
244 ) -> Result<(), CamelError> {
245 let template = parse_query_template(query_str, self.config.placeholder)?;
247
248 let mut temp_msg = Message::new(Body::Json(row_json.clone()));
251 if let Some(obj) = row_json.as_object() {
252 for (key, value) in obj {
253 temp_msg.set_header(format!("CamelSql.{}", key), value.clone());
254 }
255 }
256 let temp_exchange = Exchange::new(temp_msg);
257
258 let prepared = resolve_params(&template, &temp_exchange, &self.config.in_separator)?;
260
261 let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
263 let result = query.execute(pool).await.map_err(|e| {
264 CamelError::ProcessorError(format!("Post-query execution failed: {}", e))
265 })?;
266
267 if result.rows_affected() == 0 {
269 warn!(
270 query = query_str,
271 "Post-processing query affected 0 rows — the row may not have been marked correctly"
272 );
273 }
274
275 Ok(())
276 }
277
278 async fn bridge_poll_error(
279 &self,
280 context: &ConsumerContext,
281 error: CamelError,
282 ) -> Result<(), CamelError> {
283 if !self.config.bridge_error_handler {
284 return Ok(());
285 }
286 let mut exchange = Exchange::new(Message::default());
287 exchange.set_error(error);
288 context.send_and_wait(exchange).await.map(|_| ())
289 }
290}
291
292#[async_trait]
293impl Consumer for SqlConsumer {
294 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
295 if self.stopped {
297 return Err(CamelError::Config(
298 "SQL consumer cannot be restarted after stop".into(),
299 ));
300 }
301
302 let pool = self
304 .pool
305 .get_or_try_init(|| async {
306 self.config.resolve_defaults();
308 self.config.resolve_file_query().await?;
310
311 sqlx::any::install_default_drivers();
314 let db_url = enrich_db_url_with_ssl(&self.config.db_url, &self.config)?;
315
316 let max_conn = self.config.max_connections.ok_or_else(|| {
317 CamelError::Config("max_connections not resolved for SQL consumer pool".into())
318 })?;
319 let min_conn = self.config.min_connections.ok_or_else(|| {
320 CamelError::Config("min_connections not resolved for SQL consumer pool".into())
321 })?;
322 let idle_timeout = self.config.idle_timeout_secs.ok_or_else(|| {
323 CamelError::Config(
324 "idle_timeout_secs not resolved for SQL consumer pool".into(),
325 )
326 })?;
327 let max_lifetime = self.config.max_lifetime_secs.ok_or_else(|| {
328 CamelError::Config(
329 "max_lifetime_secs not resolved for SQL consumer pool".into(),
330 )
331 })?;
332
333 info!(
334 db_url = %redact_db_url(&self.config.db_url),
335 "SQL consumer pool initializing"
336 );
337 AnyPoolOptions::new()
338 .max_connections(max_conn)
339 .min_connections(min_conn)
340 .idle_timeout(Duration::from_secs(idle_timeout))
341 .max_lifetime(Duration::from_secs(max_lifetime))
342 .connect(&db_url)
343 .await
344 .map_err(|e| {
345 CamelError::EndpointCreationFailed(format!(
346 "Failed to connect to database: {}",
347 e
348 ))
349 })
350 })
351 .await?;
352
353 if self.config.transaction_mode == TransactionMode::Managed {
355 warn!("transactionManager not yet implemented; using Auto mode");
356 }
357
358 if self.config.processing_strategy == ProcessingStrategy::Scheduled {
360 debug!(
361 "Processing strategy: Scheduled (rows dispatched individually via send_and_wait)"
362 );
363 }
364 if self.config.poll_strategy == PollStrategy::Burst {
365 debug!("Poll strategy: Burst (rapid successive polls)");
366 }
367
368 if self.config.output_type == SqlOutputType::StreamList
369 && (self.config.on_consume.is_some()
370 || self.config.on_consume_failed.is_some()
371 || self.config.on_consume_batch_complete.is_some())
372 {
373 warn!(
374 "onConsume/onConsumeFailed/onConsumeBatchComplete are not executed in StreamList mode \
375 (rows are consumed lazily downstream)"
376 );
377 }
378
379 if self.config.on_consume.is_none() {
381 warn!(
382 "SQL consumer started without onConsume configured — consumed rows will not be marked/deleted"
383 );
384 }
385
386 info!(
387 db_url = %redact_db_url(&self.config.db_url),
388 query_len = self.config.query.len(),
389 "SQL consumer started"
390 );
391
392 let template = parse_query_template(&self.config.query, self.config.placeholder)
394 .map_err(|e| CamelError::Config(format!("Invalid query template: {}", e)))?;
395
396 if self.config.initial_delay_ms > 0 {
398 tokio::select! {
399 _ = context.cancelled() => {
400 info!("SQL consumer stopped during initial delay");
401 return Ok(());
402 }
403 _ = tokio::time::sleep(Duration::from_millis(self.config.initial_delay_ms)) => {}
404 }
405 }
406
407 let mut poll_count: u32 = 0;
409 loop {
410 if let Some(max_repeats) = self.config.repeat_count
412 && poll_count >= max_repeats
413 {
414 info!(
415 repeat_count = max_repeats,
416 "SQL consumer reached repeat_count limit, stopping"
417 );
418 break;
419 }
420
421 tokio::select! {
422 _ = context.cancelled() => {
423 info!("SQL consumer stopped");
424 break;
425 }
426 _ = tokio::time::sleep(Duration::from_millis(self.config.delay_ms)) => {
427 poll_count += 1;
428 if let Err(e) = self.poll_database(pool, &context, &template).await {
429 error!(error = %e, "SQL consumer poll failed");
430 if let Err(route_err) = self.bridge_poll_error(&context, e).await {
431 error!(error = %route_err, "Failed to bridge SQL consumer error to route");
432 }
433 }
434 }
435 }
436 }
437
438 Ok(())
439 }
440
441 async fn stop(&mut self) -> Result<(), CamelError> {
442 if self.stopped {
444 debug!("SQL consumer stop called on already-stopped consumer");
445 return Ok(());
446 }
447
448 if let Some(pool) = self.pool.get() {
450 debug!("SQL consumer closing connection pool");
451 pool.close().await;
452 debug!("SQL consumer pool closed");
453 }
454
455 self.stopped = true;
456 info!("SQL consumer stopped");
457 Ok(())
458 }
459
460 fn concurrency_model(&self) -> ConcurrencyModel {
461 ConcurrencyModel::Sequential
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471 use crate::config::SqlEndpointConfig;
472 use camel_component_api::ExchangeEnvelope;
473 use camel_component_api::UriConfig;
474 use sqlx::any::AnyPoolOptions;
475 use std::sync::Arc;
476 use std::time::Duration;
477 use tokio::sync::mpsc;
478 use tokio_util::sync::CancellationToken;
479
480 async fn sqlite_pool() -> AnyPool {
481 sqlx::any::install_default_drivers();
482 AnyPoolOptions::new()
483 .max_connections(1)
484 .connect("sqlite::memory:")
485 .await
486 .expect("sqlite pool")
487 }
488
489 async fn seed_consumer_table(pool: &AnyPool) {
490 sqlx::query("CREATE TABLE jobs (id INTEGER PRIMARY KEY, processed INTEGER DEFAULT 0, failed INTEGER DEFAULT 0)")
491 .execute(pool)
492 .await
493 .expect("create table");
494 sqlx::query("INSERT INTO jobs (id, processed, failed) VALUES (1, 0, 0), (2, 0, 0)")
495 .execute(pool)
496 .await
497 .expect("seed rows");
498 }
499
500 fn config() -> SqlEndpointConfig {
501 let mut c =
502 SqlEndpointConfig::from_uri("sql:select * from t?db_url=postgres://localhost/test")
503 .unwrap();
504 c.resolve_defaults();
505 c
506 }
507
508 #[test]
509 fn consumer_concurrency_model() {
510 let c = SqlConsumer::new(config(), Arc::new(OnceCell::new()));
511 assert_eq!(c.concurrency_model(), ConcurrencyModel::Sequential);
512 }
513
514 #[test]
515 fn consumer_stores_config() {
516 let mut config = SqlEndpointConfig::from_uri(
517 "sql:select * from t?db_url=postgres://localhost/test&delay=2000&onConsume=update t set done=true"
518 ).unwrap();
519 config.resolve_defaults();
520 let c = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
521 assert_eq!(c.config.delay_ms, 2000);
522 assert!(c.config.on_consume.is_some());
523 }
524
525 #[tokio::test]
526 async fn poll_database_runs_on_consume_for_successful_rows() {
527 let pool = sqlite_pool().await;
528 seed_consumer_table(&pool).await;
529
530 let mut config = SqlEndpointConfig::from_uri(
531 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
532 )
533 .unwrap();
534 config.resolve_defaults();
535
536 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
537 let template = parse_query_template(&config.query, config.placeholder).unwrap();
538
539 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
540 tokio::spawn(async move {
541 while let Some(env) = rx.recv().await {
542 if let Some(reply_tx) = env.reply_tx {
543 let _ = reply_tx.send(Ok(env.exchange));
544 }
545 }
546 });
547 let ctx = ConsumerContext::new(tx, CancellationToken::new());
548
549 consumer
550 .poll_database(&pool, &ctx, &template)
551 .await
552 .expect("poll must succeed");
553
554 let row = sqlx::query("select processed from jobs where id = 1")
555 .fetch_one(&pool)
556 .await
557 .expect("row 1");
558 let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
559
560 let row = sqlx::query("select processed from jobs where id = 2")
561 .fetch_one(&pool)
562 .await
563 .expect("row 2");
564 let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
565
566 assert_eq!(processed_1, 1);
567 assert_eq!(processed_2, 1);
568 }
569
570 #[tokio::test]
571 async fn poll_database_runs_on_consume_failed_when_downstream_fails() {
572 let pool = sqlite_pool().await;
573 seed_consumer_table(&pool).await;
574
575 let mut config = SqlEndpointConfig::from_uri(
576 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&initialDelay=0&delay=1",
577 )
578 .unwrap();
579 config.resolve_defaults();
580
581 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
582 let template = parse_query_template(&config.query, config.placeholder).unwrap();
583
584 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
585 tokio::spawn(async move {
586 while let Some(env) = rx.recv().await {
587 if let Some(reply_tx) = env.reply_tx {
588 let _ =
589 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
590 }
591 }
592 });
593 let ctx = ConsumerContext::new(tx, CancellationToken::new());
594
595 consumer
596 .poll_database(&pool, &ctx, &template)
597 .await
598 .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
599
600 let row = sqlx::query("select failed from jobs where id = 1")
601 .fetch_one(&pool)
602 .await
603 .expect("row 1");
604 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
605
606 let row = sqlx::query("select failed from jobs where id = 2")
607 .fetch_one(&pool)
608 .await
609 .expect("row 2");
610 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
611
612 assert_eq!(failed_1, 1);
613 assert_eq!(failed_2, 1);
614 }
615
616 #[tokio::test]
617 async fn poll_database_breaks_batch_on_consume_fail() {
618 let pool = sqlite_pool().await;
619 seed_consumer_table(&pool).await;
620
621 let mut config = SqlEndpointConfig::from_uri(
622 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&breakBatchOnConsumeFail=true&initialDelay=0&delay=1",
623 )
624 .unwrap();
625 config.resolve_defaults();
626
627 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
628 let template = parse_query_template(&config.query, config.placeholder).unwrap();
629
630 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
631 tokio::spawn(async move {
632 while let Some(env) = rx.recv().await {
633 if let Some(reply_tx) = env.reply_tx {
634 let _ =
635 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
636 }
637 }
638 });
639 let ctx = ConsumerContext::new(tx, CancellationToken::new());
640
641 let err = consumer
642 .poll_database(&pool, &ctx, &template)
643 .await
644 .expect_err("must stop on first downstream failure");
645 assert!(err.to_string().contains("downstream boom"));
646
647 let row = sqlx::query("select failed from jobs where id = 1")
648 .fetch_one(&pool)
649 .await
650 .expect("row 1");
651 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
652
653 let row = sqlx::query("select failed from jobs where id = 2")
654 .fetch_one(&pool)
655 .await
656 .expect("row 2");
657 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
658
659 assert_eq!(failed_1, 1);
660 assert_eq!(failed_2, 0, "second row must not be processed");
661 }
662
663 #[tokio::test]
669 async fn consumer_no_panic_without_prior_resolve_defaults() {
670 let config = SqlEndpointConfig::from_uri(
671 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
672 )
673 .unwrap();
674 assert!(config.max_connections.is_none());
676
677 let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()));
678 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
679 tokio::spawn(async move {
680 while let Some(env) = rx.recv().await {
681 if let Some(reply_tx) = env.reply_tx {
682 let _ = reply_tx.send(Ok(env.exchange));
683 }
684 }
685 });
686 let token = CancellationToken::new();
687 let ctx = ConsumerContext::new(tx, token.clone());
688
689 let consumer_handle = tokio::spawn(async move { consumer.start(ctx).await });
691
692 tokio::time::sleep(Duration::from_millis(50)).await;
694 token.cancel();
695
696 let result = consumer_handle.await.expect("task should not panic");
697 let _ = result;
699 }
700
701 #[tokio::test]
703 async fn stop_closes_pool() {
704 let pool = sqlite_pool().await;
705 seed_consumer_table(&pool).await;
706
707 let mut config = SqlEndpointConfig::from_uri(
708 "sql:select id from jobs?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
709 )
710 .unwrap();
711 config.resolve_defaults();
712
713 let pool_cell = Arc::new(OnceCell::new());
714 pool_cell.set(pool.clone()).unwrap();
715
716 let mut consumer = SqlConsumer::new(config, pool_cell);
717 consumer.stop().await.expect("stop should succeed");
718
719 assert!(
721 pool.is_closed(),
722 "Pool should be closed after consumer.stop()"
723 );
724 }
725
726 #[tokio::test]
728 async fn double_stop_is_safe() {
729 let pool = sqlite_pool().await;
730 let mut config = SqlEndpointConfig::from_uri(
731 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
732 )
733 .unwrap();
734 config.resolve_defaults();
735
736 let pool_cell = Arc::new(OnceCell::new());
737 pool_cell.set(pool.clone()).unwrap();
738
739 let mut consumer = SqlConsumer::new(config, pool_cell);
740 consumer.stop().await.expect("first stop should succeed");
741 consumer
742 .stop()
743 .await
744 .expect("second stop should also succeed");
745 }
746
747 #[tokio::test]
749 async fn start_after_stop_rejected() {
750 let pool = sqlite_pool().await;
751 let mut config = SqlEndpointConfig::from_uri(
752 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
753 )
754 .unwrap();
755 config.resolve_defaults();
756
757 let pool_cell = Arc::new(OnceCell::new());
758 pool_cell.set(pool.clone()).unwrap();
759
760 let mut consumer = SqlConsumer::new(config, pool_cell);
761 consumer.stop().await.expect("stop should succeed");
762
763 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
764 tokio::spawn(async move {
765 while let Some(env) = rx.recv().await {
766 if let Some(reply_tx) = env.reply_tx {
767 let _ = reply_tx.send(Ok(env.exchange));
768 }
769 }
770 });
771 let ctx = ConsumerContext::new(tx, CancellationToken::new());
772
773 let result = consumer.start(ctx).await;
774 assert!(result.is_err());
775 let err_msg = result.unwrap_err().to_string();
776 assert!(
777 err_msg.contains("cannot be restarted") || err_msg.contains("after stop"),
778 "Expected restart error, got: {}",
779 err_msg
780 );
781 }
782
783 #[tokio::test]
785 async fn batch_mode_per_row_post_processing() {
786 let pool = sqlite_pool().await;
787 seed_consumer_table(&pool).await;
788
789 let mut config = SqlEndpointConfig::from_uri(
790 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
791 )
792 .unwrap();
793 config.resolve_defaults();
794
795 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
796 let template = parse_query_template(&config.query, config.placeholder).unwrap();
797
798 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
799 tokio::spawn(async move {
800 while let Some(env) = rx.recv().await {
801 if let Some(reply_tx) = env.reply_tx {
802 let _ = reply_tx.send(Ok(env.exchange));
803 }
804 }
805 });
806 let ctx = ConsumerContext::new(tx, CancellationToken::new());
807
808 consumer
809 .poll_database(&pool, &ctx, &template)
810 .await
811 .expect("poll must succeed");
812
813 let row = sqlx::query("select processed from jobs where id = 1")
815 .fetch_one(&pool)
816 .await
817 .expect("row 1");
818 let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
819
820 let row = sqlx::query("select processed from jobs where id = 2")
821 .fetch_one(&pool)
822 .await
823 .expect("row 2");
824 let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
825
826 assert_eq!(
827 processed_1, 1,
828 "row 1 should be marked processed via per-row onConsume"
829 );
830 assert_eq!(
831 processed_2, 1,
832 "row 2 should be marked processed via per-row onConsume"
833 );
834 }
835
836 #[tokio::test]
838 async fn batch_mode_per_row_post_processing_on_failure() {
839 let pool = sqlite_pool().await;
840 seed_consumer_table(&pool).await;
841
842 let mut config = SqlEndpointConfig::from_uri(
843 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
844 )
845 .unwrap();
846 config.resolve_defaults();
847
848 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
849 let template = parse_query_template(&config.query, config.placeholder).unwrap();
850
851 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
852 tokio::spawn(async move {
853 while let Some(env) = rx.recv().await {
854 if let Some(reply_tx) = env.reply_tx {
855 let _ =
856 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
857 }
858 }
859 });
860 let ctx = ConsumerContext::new(tx, CancellationToken::new());
861
862 consumer
863 .poll_database(&pool, &ctx, &template)
864 .await
865 .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
866
867 let row = sqlx::query("select failed from jobs where id = 1")
869 .fetch_one(&pool)
870 .await
871 .expect("row 1");
872 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
873
874 let row = sqlx::query("select failed from jobs where id = 2")
875 .fetch_one(&pool)
876 .await
877 .expect("row 2");
878 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
879
880 assert_eq!(
881 failed_1, 1,
882 "row 1 should be marked failed via per-row onConsumeFailed"
883 );
884 assert_eq!(
885 failed_2, 1,
886 "row 2 should be marked failed via per-row onConsumeFailed"
887 );
888 }
889
890 #[tokio::test]
891 async fn bridge_error_handler_routes_poll_errors_to_exchange_error() {
892 let mut config = config();
893 config.bridge_error_handler = true;
894 let consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()));
895
896 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
897 tokio::spawn(async move {
898 while let Some(env) = rx.recv().await {
899 assert!(env.exchange.error.is_some(), "exchange must carry error");
900 if let Some(reply_tx) = env.reply_tx {
901 let _ = reply_tx.send(Ok(env.exchange));
902 }
903 break;
904 }
905 });
906
907 let ctx = ConsumerContext::new(tx, CancellationToken::new());
908 consumer
909 .bridge_poll_error(&ctx, CamelError::ProcessorError("poll failed".into()))
910 .await
911 .expect("bridging should succeed");
912 }
913
914 #[tokio::test]
915 async fn stream_list_consumer_emits_ndjson_body() {
916 let pool = sqlite_pool().await;
917 sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
918 .execute(&pool)
919 .await
920 .expect("create table");
921 sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')")
922 .execute(&pool)
923 .await
924 .expect("seed rows");
925
926 let mut config = SqlEndpointConfig::from_uri(
927 "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
928 )
929 .unwrap();
930 config.resolve_defaults();
931
932 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
933 let template = parse_query_template(&config.query, config.placeholder).unwrap();
934
935 let (tx, rx) = mpsc::channel::<ExchangeEnvelope>(8);
936 let (result_tx, result_rx) = tokio::sync::oneshot::channel::<Exchange>();
937 tokio::spawn(async move {
938 let mut rx = rx;
939 if let Some(env) = rx.recv().await {
940 if let Some(reply_tx) = env.reply_tx {
941 let _ = reply_tx.send(Ok(env.exchange.clone()));
942 }
943 let _ = result_tx.send(env.exchange);
944 }
945 });
946 let ctx = ConsumerContext::new(tx, CancellationToken::new());
947
948 consumer
949 .poll_database(&pool, &ctx, &template)
950 .await
951 .expect("poll must succeed");
952
953 let exchange = result_rx.await.expect("should have received one exchange");
954
955 match exchange.input.body {
956 Body::Stream(ref stream_body) => {
957 let stream = stream_body.stream.clone();
958 let mut guard = stream.lock().await;
959 let stream_opt = guard.take();
960 assert!(stream_opt.is_some(), "stream should be present");
961
962 use futures::StreamExt;
963 let mut collected = Vec::new();
964 let mut stream = stream_opt.unwrap();
965 while let Some(chunk) = stream.next().await {
966 let chunk = chunk.expect("stream chunk should not error");
967 collected.extend_from_slice(&chunk);
968 }
969
970 let ndjson = String::from_utf8(collected).expect("valid utf8");
971 let lines: Vec<&str> = ndjson.trim().lines().collect();
972 assert_eq!(lines.len(), 3, "should have 3 NDJSON lines");
973
974 let row0: serde_json::Value =
975 serde_json::from_str(lines[0]).expect("valid json line 0");
976 assert_eq!(row0["id"], 1);
977 assert_eq!(row0["name"], "alpha");
978
979 let row1: serde_json::Value =
980 serde_json::from_str(lines[1]).expect("valid json line 1");
981 assert_eq!(row1["id"], 2);
982 assert_eq!(row1["name"], "beta");
983
984 let row2: serde_json::Value =
985 serde_json::from_str(lines[2]).expect("valid json line 2");
986 assert_eq!(row2["id"], 3);
987 assert_eq!(row2["name"], "gamma");
988 }
989 ref other => panic!("expected Body::Stream, got {:?}", other),
990 }
991 }
992
993 #[tokio::test]
994 async fn stream_list_consumer_empty_result_set_emits_empty_stream() {
995 let pool = sqlite_pool().await;
996 sqlx::query("CREATE TABLE empty_items (id INTEGER PRIMARY KEY, name TEXT)")
997 .execute(&pool)
998 .await
999 .expect("create table");
1000
1001 let mut config = SqlEndpointConfig::from_uri(
1002 "sql:select id, name from empty_items?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1003 )
1004 .unwrap();
1005 config.resolve_defaults();
1006
1007 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
1008 let template = parse_query_template(&config.query, config.placeholder).unwrap();
1009
1010 let (tx, rx) = tokio::sync::oneshot::channel();
1011 let (mpsc_tx, mut mpsc_rx) = mpsc::channel::<ExchangeEnvelope>(8);
1012 tokio::spawn(async move {
1013 while let Some(env) = mpsc_rx.recv().await {
1014 if let Some(reply_tx) = env.reply_tx {
1015 let _ = reply_tx.send(Ok(env.exchange.clone()));
1016 }
1017 let _ = tx.send(env.exchange);
1018 break;
1019 }
1020 });
1021 let ctx = ConsumerContext::new(mpsc_tx, CancellationToken::new());
1022
1023 consumer
1024 .poll_database(&pool, &ctx, &template)
1025 .await
1026 .expect("poll must succeed");
1027
1028 let exchange = rx
1029 .await
1030 .expect("StreamList should emit exchange even for empty results");
1031
1032 match exchange.input.body {
1033 Body::Stream(ref stream_body) => {
1034 let stream = stream_body.stream.clone();
1035 let mut guard = stream.lock().await;
1036 let stream_opt = guard.take();
1037
1038 use futures::StreamExt;
1039 let mut count = 0;
1040 if let Some(mut stream) = stream_opt {
1041 while let Some(chunk) = stream.next().await {
1042 let chunk = chunk.expect("stream chunk should not error");
1043 count += chunk.len();
1044 }
1045 }
1046 assert_eq!(count, 0, "empty table should produce zero stream bytes");
1047 }
1048 ref other => panic!("expected Body::Stream, got {:?}", other),
1049 }
1050 }
1051}