Skip to main content

camel_component_sql/
consumer.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use serde_json::Value as JsonValue;
6use sqlx::AnyPool;
7use sqlx::any::AnyPoolOptions;
8use sqlx::any::AnyRow;
9use tokio::sync::OnceCell;
10use tracing::{debug, error, info, warn};
11
12use camel_component_api::{Body, CamelError, Exchange, Message};
13use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
14
15use crate::config::{SqlEndpointConfig, enrich_db_url_with_ssl, redact_db_url};
16use crate::headers;
17use crate::query::{QueryTemplate, parse_query_template, resolve_params};
18use crate::utils::{bind_json_values, row_to_json};
19
20pub struct SqlConsumer {
21    pub(crate) config: SqlEndpointConfig,
22    pub(crate) pool: Arc<OnceCell<AnyPool>>,
23    stopped: bool,
24}
25
26impl SqlConsumer {
27    pub fn new(config: SqlEndpointConfig, pool: Arc<OnceCell<AnyPool>>) -> Self {
28        Self {
29            config,
30            pool,
31            stopped: false,
32        }
33    }
34
35    /// Poll the database for new rows and process them.
36    async fn poll_database(
37        &self,
38        pool: &AnyPool,
39        context: &ConsumerContext,
40        template: &QueryTemplate,
41    ) -> Result<(), CamelError> {
42        // Create an empty exchange for parameter resolution (consumer has no input)
43        let empty_exchange = Exchange::new(Message::default());
44
45        // Resolve parameters
46        let prepared = resolve_params(template, &empty_exchange, &self.config.in_separator)?;
47
48        // Build and execute the query
49        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
50        let rows: Vec<AnyRow> = query
51            .fetch_all(pool)
52            .await
53            .map_err(|e| CamelError::ProcessorError(format!("Query execution failed: {}", e)))?;
54
55        // Check for empty result set
56        if rows.is_empty() && !self.config.route_empty_result_set {
57            return Ok(());
58        }
59
60        // Apply max_messages_per_poll limit
61        let rows_to_process: Vec<AnyRow> = if let Some(max) = self.config.max_messages_per_poll {
62            if max > 0 {
63                rows.into_iter().take(max as usize).collect()
64            } else {
65                rows
66            }
67        } else {
68            rows
69        };
70
71        if self.config.use_iterator {
72            // Process each row individually
73            for row in rows_to_process {
74                let row_json = row_to_json(&row)?;
75
76                // Create exchange with the row as JSON body
77                let mut msg = Message::new(Body::Json(row_json.clone()));
78
79                // Set individual column headers with CamelSql. prefix per Apache Camel convention
80                if let Some(obj) = row_json.as_object() {
81                    for (key, value) in obj {
82                        msg.set_header(format!("CamelSql.{}", key), value.clone());
83                    }
84                }
85
86                let exchange = Exchange::new(msg);
87
88                // Send and wait for processing
89                let result = context.send_and_wait(exchange).await;
90
91                // Handle post-processing (onConsume/onConsumeFailed)
92                if let Err(e) = self.handle_post_processing(pool, &result, &row_json).await {
93                    error!(error = %e, "Post-processing failed");
94                    if self.config.break_batch_on_consume_fail {
95                        return Err(e);
96                    }
97                }
98
99                // If downstream processing itself failed, honour break_batch_on_consume_fail
100                if let Err(ref consume_err) = result
101                    && self.config.break_batch_on_consume_fail
102                {
103                    return Err(consume_err.clone());
104                }
105            }
106        } else {
107            // Process all rows as a single batch
108            let rows_json: Vec<JsonValue> = rows_to_process
109                .iter()
110                .map(row_to_json)
111                .collect::<Result<Vec<_>, CamelError>>()?;
112
113            let row_count = rows_json.len();
114
115            // Create exchange with array of rows
116            let mut msg = Message::new(Body::Json(JsonValue::Array(rows_json.clone())));
117            msg.set_header(headers::ROW_COUNT, JsonValue::Number(row_count.into()));
118
119            let exchange = Exchange::new(msg);
120
121            // Send and wait for result
122            let result = context.send_and_wait(exchange).await;
123
124            // SQL-021: Run per-row post-processing even in batch mode so that
125            // onConsume/onConsumeFailed queries can reference row-specific parameters
126            // (e.g. `:#id`). Each row gets its own post-processing query execution.
127            for row_json in rows_json.iter() {
128                if let Err(e) = self.handle_post_processing(pool, &result, row_json).await {
129                    error!(error = %e, "Post-processing failed for batch row");
130                    if self.config.break_batch_on_consume_fail {
131                        return Err(e);
132                    }
133                }
134            }
135
136            // If downstream processing itself failed, honour break_batch_on_consume_fail
137            if let Err(ref consume_err) = result
138                && self.config.break_batch_on_consume_fail
139            {
140                return Err(consume_err.clone());
141            }
142        }
143
144        // Execute on_consume_batch_complete if configured
145        if let Some(ref batch_query) = self.config.on_consume_batch_complete
146            && let Err(e) = self
147                .execute_post_query(pool, batch_query, &JsonValue::Null)
148                .await
149        {
150            error!(error = %e, "onConsumeBatchComplete query failed");
151        }
152
153        Ok(())
154    }
155
156    /// Handle post-processing after a row is processed (onConsume/onConsumeFailed).
157    async fn handle_post_processing(
158        &self,
159        pool: &AnyPool,
160        result: &Result<Exchange, CamelError>,
161        row_json: &JsonValue,
162    ) -> Result<(), CamelError> {
163        match result {
164            Ok(_) => {
165                // Success - execute onConsume if configured
166                if let Some(ref on_consume) = self.config.on_consume {
167                    self.execute_post_query(pool, on_consume, row_json).await?;
168                }
169            }
170            Err(_) => {
171                // Failure - execute onConsumeFailed if configured
172                if let Some(ref on_consume_failed) = self.config.on_consume_failed {
173                    self.execute_post_query(pool, on_consume_failed, row_json)
174                        .await?;
175                }
176            }
177        }
178        Ok(())
179    }
180
181    /// Execute a post-processing query with the row data as parameters.
182    async fn execute_post_query(
183        &self,
184        pool: &AnyPool,
185        query_str: &str,
186        row_json: &JsonValue,
187    ) -> Result<(), CamelError> {
188        // Parse the query template
189        let template = parse_query_template(query_str, self.config.placeholder)?;
190
191        // Create a temporary exchange with the row as body for parameter resolution
192        // Populate CamelSql.* headers so named params can reference them
193        let mut temp_msg = Message::new(Body::Json(row_json.clone()));
194        if let Some(obj) = row_json.as_object() {
195            for (key, value) in obj {
196                temp_msg.set_header(format!("CamelSql.{}", key), value.clone());
197            }
198        }
199        let temp_exchange = Exchange::new(temp_msg);
200
201        // Resolve parameters
202        let prepared = resolve_params(&template, &temp_exchange, &self.config.in_separator)?;
203
204        // Build and execute the query
205        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
206        let result = query.execute(pool).await.map_err(|e| {
207            CamelError::ProcessorError(format!("Post-query execution failed: {}", e))
208        })?;
209
210        // Warn if 0 rows affected (the row may not have been marked correctly)
211        if result.rows_affected() == 0 {
212            warn!(
213                query = query_str,
214                "Post-processing query affected 0 rows — the row may not have been marked correctly"
215            );
216        }
217
218        Ok(())
219    }
220}
221
222#[async_trait]
223impl Consumer for SqlConsumer {
224    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
225        // Reject double-start
226        if self.stopped {
227            return Err(CamelError::Config(
228                "SQL consumer cannot be restarted after stop".into(),
229            ));
230        }
231
232        // Step 1: Initialize the connection pool
233        let pool = self
234            .pool
235            .get_or_try_init(|| async {
236                // Defensive: ensure config is resolved even if caller didn't use create_endpoint
237                self.config.resolve_defaults();
238
239                // Install all compiled-in sqlx drivers so AnyPool can resolve them.
240                // This is idempotent; safe to call multiple times.
241                sqlx::any::install_default_drivers();
242                let db_url = enrich_db_url_with_ssl(&self.config.db_url, &self.config)?;
243
244                let max_conn = self.config.max_connections.ok_or_else(|| {
245                    CamelError::Config("max_connections not resolved for SQL consumer pool".into())
246                })?;
247                let min_conn = self.config.min_connections.ok_or_else(|| {
248                    CamelError::Config("min_connections not resolved for SQL consumer pool".into())
249                })?;
250                let idle_timeout = self.config.idle_timeout_secs.ok_or_else(|| {
251                    CamelError::Config(
252                        "idle_timeout_secs not resolved for SQL consumer pool".into(),
253                    )
254                })?;
255                let max_lifetime = self.config.max_lifetime_secs.ok_or_else(|| {
256                    CamelError::Config(
257                        "max_lifetime_secs not resolved for SQL consumer pool".into(),
258                    )
259                })?;
260
261                info!(
262                    db_url = %redact_db_url(&self.config.db_url),
263                    "SQL consumer pool initializing"
264                );
265                AnyPoolOptions::new()
266                    .max_connections(max_conn)
267                    .min_connections(min_conn)
268                    .idle_timeout(Duration::from_secs(idle_timeout))
269                    .max_lifetime(Duration::from_secs(max_lifetime))
270                    .connect(&db_url)
271                    .await
272                    .map_err(|e| {
273                        CamelError::EndpointCreationFailed(format!(
274                            "Failed to connect to database: {}",
275                            e
276                        ))
277                    })
278            })
279            .await?;
280
281        // Warn if no onConsume configured
282        if self.config.on_consume.is_none() {
283            warn!(
284                "SQL consumer started without onConsume configured — consumed rows will not be marked/deleted"
285            );
286        }
287
288        info!(
289            db_url = %redact_db_url(&self.config.db_url),
290            query_len = self.config.query.len(),
291            "SQL consumer started"
292        );
293
294        // Step 2: Parse query template once (avoid re-parsing every poll)
295        let template = parse_query_template(&self.config.query, self.config.placeholder)
296            .map_err(|e| CamelError::Config(format!("Invalid query template: {}", e)))?;
297
298        // Step 3: Initial delay before starting polling
299        if self.config.initial_delay_ms > 0 {
300            tokio::select! {
301                _ = context.cancelled() => {
302                    info!("SQL consumer stopped during initial delay");
303                    return Ok(());
304                }
305                _ = tokio::time::sleep(Duration::from_millis(self.config.initial_delay_ms)) => {}
306            }
307        }
308
309        // Step 4: Polling loop
310        loop {
311            tokio::select! {
312                _ = context.cancelled() => {
313                    info!("SQL consumer stopped");
314                    break;
315                }
316                _ = tokio::time::sleep(Duration::from_millis(self.config.delay_ms)) => {
317                    if let Err(e) = self.poll_database(pool, &context, &template).await {
318                        error!(error = %e, "SQL consumer poll failed");
319                    }
320                }
321            }
322        }
323
324        Ok(())
325    }
326
327    async fn stop(&mut self) -> Result<(), CamelError> {
328        // Double-stop is safe — no-op after first stop
329        if self.stopped {
330            debug!("SQL consumer stop called on already-stopped consumer");
331            return Ok(());
332        }
333
334        // Close the connection pool if it was initialized
335        if let Some(pool) = self.pool.get() {
336            debug!("SQL consumer closing connection pool");
337            pool.close().await;
338            debug!("SQL consumer pool closed");
339        }
340
341        self.stopped = true;
342        info!("SQL consumer stopped");
343        Ok(())
344    }
345
346    fn concurrency_model(&self) -> ConcurrencyModel {
347        // Sequential is correct for SQL consumers: concurrent polls would fetch
348        // duplicate rows. The design doc mentioned SharedState (which doesn't exist
349        // in this runtime) — Sequential is the correct equivalent.
350        ConcurrencyModel::Sequential
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357    use crate::config::SqlEndpointConfig;
358    use camel_component_api::ExchangeEnvelope;
359    use camel_component_api::UriConfig;
360    use sqlx::any::AnyPoolOptions;
361    use std::sync::Arc;
362    use std::time::Duration;
363    use tokio::sync::mpsc;
364    use tokio_util::sync::CancellationToken;
365
366    async fn sqlite_pool() -> AnyPool {
367        sqlx::any::install_default_drivers();
368        AnyPoolOptions::new()
369            .max_connections(1)
370            .connect("sqlite::memory:")
371            .await
372            .expect("sqlite pool")
373    }
374
375    async fn seed_consumer_table(pool: &AnyPool) {
376        sqlx::query("CREATE TABLE jobs (id INTEGER PRIMARY KEY, processed INTEGER DEFAULT 0, failed INTEGER DEFAULT 0)")
377            .execute(pool)
378            .await
379            .expect("create table");
380        sqlx::query("INSERT INTO jobs (id, processed, failed) VALUES (1, 0, 0), (2, 0, 0)")
381            .execute(pool)
382            .await
383            .expect("seed rows");
384    }
385
386    fn config() -> SqlEndpointConfig {
387        let mut c =
388            SqlEndpointConfig::from_uri("sql:select * from t?db_url=postgres://localhost/test")
389                .unwrap();
390        c.resolve_defaults();
391        c
392    }
393
394    #[test]
395    fn consumer_concurrency_model() {
396        let c = SqlConsumer::new(config(), Arc::new(OnceCell::new()));
397        assert_eq!(c.concurrency_model(), ConcurrencyModel::Sequential);
398    }
399
400    #[test]
401    fn consumer_stores_config() {
402        let mut config = SqlEndpointConfig::from_uri(
403            "sql:select * from t?db_url=postgres://localhost/test&delay=2000&onConsume=update t set done=true"
404        ).unwrap();
405        config.resolve_defaults();
406        let c = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
407        assert_eq!(c.config.delay_ms, 2000);
408        assert!(c.config.on_consume.is_some());
409    }
410
411    #[tokio::test]
412    async fn poll_database_runs_on_consume_for_successful_rows() {
413        let pool = sqlite_pool().await;
414        seed_consumer_table(&pool).await;
415
416        let mut config = SqlEndpointConfig::from_uri(
417            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
418        )
419        .unwrap();
420        config.resolve_defaults();
421
422        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
423        let template = parse_query_template(&config.query, config.placeholder).unwrap();
424
425        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
426        tokio::spawn(async move {
427            while let Some(env) = rx.recv().await {
428                if let Some(reply_tx) = env.reply_tx {
429                    let _ = reply_tx.send(Ok(env.exchange));
430                }
431            }
432        });
433        let ctx = ConsumerContext::new(tx, CancellationToken::new());
434
435        consumer
436            .poll_database(&pool, &ctx, &template)
437            .await
438            .expect("poll must succeed");
439
440        let row = sqlx::query("select processed from jobs where id = 1")
441            .fetch_one(&pool)
442            .await
443            .expect("row 1");
444        let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
445
446        let row = sqlx::query("select processed from jobs where id = 2")
447            .fetch_one(&pool)
448            .await
449            .expect("row 2");
450        let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
451
452        assert_eq!(processed_1, 1);
453        assert_eq!(processed_2, 1);
454    }
455
456    #[tokio::test]
457    async fn poll_database_runs_on_consume_failed_when_downstream_fails() {
458        let pool = sqlite_pool().await;
459        seed_consumer_table(&pool).await;
460
461        let mut config = SqlEndpointConfig::from_uri(
462            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&initialDelay=0&delay=1",
463        )
464        .unwrap();
465        config.resolve_defaults();
466
467        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
468        let template = parse_query_template(&config.query, config.placeholder).unwrap();
469
470        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
471        tokio::spawn(async move {
472            while let Some(env) = rx.recv().await {
473                if let Some(reply_tx) = env.reply_tx {
474                    let _ =
475                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
476                }
477            }
478        });
479        let ctx = ConsumerContext::new(tx, CancellationToken::new());
480
481        consumer
482            .poll_database(&pool, &ctx, &template)
483            .await
484            .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
485
486        let row = sqlx::query("select failed from jobs where id = 1")
487            .fetch_one(&pool)
488            .await
489            .expect("row 1");
490        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
491
492        let row = sqlx::query("select failed from jobs where id = 2")
493            .fetch_one(&pool)
494            .await
495            .expect("row 2");
496        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
497
498        assert_eq!(failed_1, 1);
499        assert_eq!(failed_2, 1);
500    }
501
502    #[tokio::test]
503    async fn poll_database_breaks_batch_on_consume_fail() {
504        let pool = sqlite_pool().await;
505        seed_consumer_table(&pool).await;
506
507        let mut config = SqlEndpointConfig::from_uri(
508            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&breakBatchOnConsumeFail=true&initialDelay=0&delay=1",
509        )
510        .unwrap();
511        config.resolve_defaults();
512
513        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
514        let template = parse_query_template(&config.query, config.placeholder).unwrap();
515
516        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
517        tokio::spawn(async move {
518            while let Some(env) = rx.recv().await {
519                if let Some(reply_tx) = env.reply_tx {
520                    let _ =
521                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
522                }
523            }
524        });
525        let ctx = ConsumerContext::new(tx, CancellationToken::new());
526
527        let err = consumer
528            .poll_database(&pool, &ctx, &template)
529            .await
530            .expect_err("must stop on first downstream failure");
531        assert!(err.to_string().contains("downstream boom"));
532
533        let row = sqlx::query("select failed from jobs where id = 1")
534            .fetch_one(&pool)
535            .await
536            .expect("row 1");
537        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
538
539        let row = sqlx::query("select failed from jobs where id = 2")
540            .fetch_one(&pool)
541            .await
542            .expect("row 2");
543        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
544
545        assert_eq!(failed_1, 1);
546        assert_eq!(failed_2, 0, "second row must not be processed");
547    }
548
549    // --- Phase B hardening tests ---
550
551    // SQL-001: Direct consumer construction without resolve_defaults does not panic.
552    // The consumer defensively calls resolve_defaults() during pool init, so the pool
553    // fields get resolved. This test verifies no panic occurs.
554    #[tokio::test]
555    async fn consumer_no_panic_without_prior_resolve_defaults() {
556        let config = SqlEndpointConfig::from_uri(
557            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
558        )
559        .unwrap();
560        // Deliberately NOT calling resolve_defaults() — pool fields remain None
561        assert!(config.max_connections.is_none());
562
563        let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()));
564        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
565        tokio::spawn(async move {
566            while let Some(env) = rx.recv().await {
567                if let Some(reply_tx) = env.reply_tx {
568                    let _ = reply_tx.send(Ok(env.exchange));
569                }
570            }
571        });
572        let token = CancellationToken::new();
573        let ctx = ConsumerContext::new(tx, token.clone());
574
575        // Spawn the consumer and cancel it quickly — it should not panic
576        let consumer_handle = tokio::spawn(async move { consumer.start(ctx).await });
577
578        // Cancel after a short delay
579        tokio::time::sleep(Duration::from_millis(50)).await;
580        token.cancel();
581
582        let result = consumer_handle.await.expect("task should not panic");
583        // Should complete without panic (may be Ok or Err depending on timing)
584        let _ = result;
585    }
586
587    // SQL-008: stop() closes the pool
588    #[tokio::test]
589    async fn stop_closes_pool() {
590        let pool = sqlite_pool().await;
591        seed_consumer_table(&pool).await;
592
593        let mut config = SqlEndpointConfig::from_uri(
594            "sql:select id from jobs?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
595        )
596        .unwrap();
597        config.resolve_defaults();
598
599        let pool_cell = Arc::new(OnceCell::new());
600        pool_cell.set(pool.clone()).unwrap();
601
602        let mut consumer = SqlConsumer::new(config, pool_cell);
603        consumer.stop().await.expect("stop should succeed");
604
605        // After stop, the pool should be closed
606        assert!(
607            pool.is_closed(),
608            "Pool should be closed after consumer.stop()"
609        );
610    }
611
612    // SQL-008: double-stop is safe
613    #[tokio::test]
614    async fn double_stop_is_safe() {
615        let pool = sqlite_pool().await;
616        let mut config = SqlEndpointConfig::from_uri(
617            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
618        )
619        .unwrap();
620        config.resolve_defaults();
621
622        let pool_cell = Arc::new(OnceCell::new());
623        pool_cell.set(pool.clone()).unwrap();
624
625        let mut consumer = SqlConsumer::new(config, pool_cell);
626        consumer.stop().await.expect("first stop should succeed");
627        consumer
628            .stop()
629            .await
630            .expect("second stop should also succeed");
631    }
632
633    // SQL-008: start after stop is rejected
634    #[tokio::test]
635    async fn start_after_stop_rejected() {
636        let pool = sqlite_pool().await;
637        let mut config = SqlEndpointConfig::from_uri(
638            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
639        )
640        .unwrap();
641        config.resolve_defaults();
642
643        let pool_cell = Arc::new(OnceCell::new());
644        pool_cell.set(pool.clone()).unwrap();
645
646        let mut consumer = SqlConsumer::new(config, pool_cell);
647        consumer.stop().await.expect("stop should succeed");
648
649        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
650        tokio::spawn(async move {
651            while let Some(env) = rx.recv().await {
652                if let Some(reply_tx) = env.reply_tx {
653                    let _ = reply_tx.send(Ok(env.exchange));
654                }
655            }
656        });
657        let ctx = ConsumerContext::new(tx, CancellationToken::new());
658
659        let result = consumer.start(ctx).await;
660        assert!(result.is_err());
661        let err_msg = result.unwrap_err().to_string();
662        assert!(
663            err_msg.contains("cannot be restarted") || err_msg.contains("after stop"),
664            "Expected restart error, got: {}",
665            err_msg
666        );
667    }
668
669    // SQL-021: batch mode per-row post-processing
670    #[tokio::test]
671    async fn batch_mode_per_row_post_processing() {
672        let pool = sqlite_pool().await;
673        seed_consumer_table(&pool).await;
674
675        let mut config = SqlEndpointConfig::from_uri(
676            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
677        )
678        .unwrap();
679        config.resolve_defaults();
680
681        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
682        let template = parse_query_template(&config.query, config.placeholder).unwrap();
683
684        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
685        tokio::spawn(async move {
686            while let Some(env) = rx.recv().await {
687                if let Some(reply_tx) = env.reply_tx {
688                    let _ = reply_tx.send(Ok(env.exchange));
689                }
690            }
691        });
692        let ctx = ConsumerContext::new(tx, CancellationToken::new());
693
694        consumer
695            .poll_database(&pool, &ctx, &template)
696            .await
697            .expect("poll must succeed");
698
699        // SQL-021: Each row should have been processed individually via onConsume
700        let row = sqlx::query("select processed from jobs where id = 1")
701            .fetch_one(&pool)
702            .await
703            .expect("row 1");
704        let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
705
706        let row = sqlx::query("select processed from jobs where id = 2")
707            .fetch_one(&pool)
708            .await
709            .expect("row 2");
710        let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
711
712        assert_eq!(
713            processed_1, 1,
714            "row 1 should be marked processed via per-row onConsume"
715        );
716        assert_eq!(
717            processed_2, 1,
718            "row 2 should be marked processed via per-row onConsume"
719        );
720    }
721
722    // SQL-021: batch mode per-row onConsumeFailed when downstream fails
723    #[tokio::test]
724    async fn batch_mode_per_row_post_processing_on_failure() {
725        let pool = sqlite_pool().await;
726        seed_consumer_table(&pool).await;
727
728        let mut config = SqlEndpointConfig::from_uri(
729            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
730        )
731        .unwrap();
732        config.resolve_defaults();
733
734        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
735        let template = parse_query_template(&config.query, config.placeholder).unwrap();
736
737        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
738        tokio::spawn(async move {
739            while let Some(env) = rx.recv().await {
740                if let Some(reply_tx) = env.reply_tx {
741                    let _ =
742                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
743                }
744            }
745        });
746        let ctx = ConsumerContext::new(tx, CancellationToken::new());
747
748        consumer
749            .poll_database(&pool, &ctx, &template)
750            .await
751            .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
752
753        // SQL-021: Each row should have onConsumeFailed executed individually
754        let row = sqlx::query("select failed from jobs where id = 1")
755            .fetch_one(&pool)
756            .await
757            .expect("row 1");
758        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
759
760        let row = sqlx::query("select failed from jobs where id = 2")
761            .fetch_one(&pool)
762            .await
763            .expect("row 2");
764        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
765
766        assert_eq!(
767            failed_1, 1,
768            "row 1 should be marked failed via per-row onConsumeFailed"
769        );
770        assert_eq!(
771            failed_2, 1,
772            "row 2 should be marked failed via per-row onConsumeFailed"
773        );
774    }
775}