Skip to main content

camel_component_sql/
consumer.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use serde_json::Value as JsonValue;
6use sqlx::AnyPool;
7use sqlx::any::AnyPoolOptions;
8use sqlx::any::AnyRow;
9use tokio::sync::OnceCell;
10use tracing::{debug, error, info, warn};
11
12use camel_component_api::{Body, CamelError, Exchange, Message};
13use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
14
15use crate::config::{
16    PollStrategy, ProcessingStrategy, SqlEndpointConfig, TransactionMode, enrich_db_url_with_ssl,
17    redact_db_url,
18};
19use crate::headers;
20use crate::query::{QueryTemplate, parse_query_template, resolve_params};
21use crate::utils::{bind_json_values, row_to_json};
22
23pub struct SqlConsumer {
24    pub(crate) config: SqlEndpointConfig,
25    pub(crate) pool: Arc<OnceCell<AnyPool>>,
26    stopped: bool,
27}
28
29impl SqlConsumer {
30    pub fn new(config: SqlEndpointConfig, pool: Arc<OnceCell<AnyPool>>) -> Self {
31        Self {
32            config,
33            pool,
34            stopped: false,
35        }
36    }
37
38    /// Poll the database for new rows and process them.
39    async fn poll_database(
40        &self,
41        pool: &AnyPool,
42        context: &ConsumerContext,
43        template: &QueryTemplate,
44    ) -> Result<(), CamelError> {
45        // Create an empty exchange for parameter resolution (consumer has no input)
46        let empty_exchange = Exchange::new(Message::default());
47
48        // Resolve parameters
49        let prepared = resolve_params(template, &empty_exchange, &self.config.in_separator)?;
50
51        // Build and execute the query
52        debug!(query = %prepared.sql, "executing SQL consumer poll");
53        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
54        let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
55            warn!(error = %e, "SQL consumer poll query failed");
56            CamelError::ProcessorError(format!("Query execution failed: {}", e))
57        })?;
58
59        debug!(rows = rows.len(), "SQL consumer poll completed");
60
61        // Check for empty result set
62        if rows.is_empty() && !self.config.route_empty_result_set {
63            return Ok(());
64        }
65
66        // Apply max_messages_per_poll limit
67        let rows_to_process: Vec<AnyRow> = if let Some(max) = self.config.max_messages_per_poll {
68            if max > 0 {
69                rows.into_iter().take(max as usize).collect()
70            } else {
71                rows
72            }
73        } else {
74            rows
75        };
76
77        if self.config.use_iterator {
78            // Process each row individually
79            for row in rows_to_process {
80                let row_json = row_to_json(&row)?;
81
82                // Create exchange with the row as JSON body
83                let mut msg = Message::new(Body::Json(row_json.clone()));
84
85                // Set individual column headers with CamelSql. prefix per Apache Camel convention
86                if let Some(obj) = row_json.as_object() {
87                    for (key, value) in obj {
88                        msg.set_header(format!("CamelSql.{}", key), value.clone());
89                    }
90                }
91
92                let exchange = Exchange::new(msg);
93
94                // Send and wait for processing
95                let result = context.send_and_wait(exchange).await;
96
97                // Handle post-processing (onConsume/onConsumeFailed)
98                if let Err(e) = self.handle_post_processing(pool, &result, &row_json).await {
99                    error!(error = %e, "Post-processing failed");
100                    if self.config.break_batch_on_consume_fail {
101                        return Err(e);
102                    }
103                }
104
105                // If downstream processing itself failed, honour break_batch_on_consume_fail
106                if let Err(ref consume_err) = result
107                    && self.config.break_batch_on_consume_fail
108                {
109                    return Err(consume_err.clone());
110                }
111            }
112        } else {
113            // Process all rows as a single batch
114            let rows_json: Vec<JsonValue> = rows_to_process
115                .iter()
116                .map(row_to_json)
117                .collect::<Result<Vec<_>, CamelError>>()?;
118
119            let row_count = rows_json.len();
120
121            // Create exchange with array of rows
122            let mut msg = Message::new(Body::Json(JsonValue::Array(rows_json.clone())));
123            msg.set_header(headers::ROW_COUNT, JsonValue::Number(row_count.into()));
124
125            let exchange = Exchange::new(msg);
126
127            // Send and wait for result
128            let result = context.send_and_wait(exchange).await;
129
130            // SQL-021: Run per-row post-processing even in batch mode so that
131            // onConsume/onConsumeFailed queries can reference row-specific parameters
132            // (e.g. `:#id`). Each row gets its own post-processing query execution.
133            for row_json in rows_json.iter() {
134                if let Err(e) = self.handle_post_processing(pool, &result, row_json).await {
135                    error!(error = %e, "Post-processing failed for batch row");
136                    if self.config.break_batch_on_consume_fail {
137                        return Err(e);
138                    }
139                }
140            }
141
142            // If downstream processing itself failed, honour break_batch_on_consume_fail
143            if let Err(ref consume_err) = result
144                && self.config.break_batch_on_consume_fail
145            {
146                return Err(consume_err.clone());
147            }
148        }
149
150        // Execute on_consume_batch_complete if configured
151        if let Some(ref batch_query) = self.config.on_consume_batch_complete
152            && let Err(e) = self
153                .execute_post_query(pool, batch_query, &JsonValue::Null)
154                .await
155        {
156            error!(error = %e, "onConsumeBatchComplete query failed");
157        }
158
159        Ok(())
160    }
161
162    /// Handle post-processing after a row is processed (onConsume/onConsumeFailed).
163    async fn handle_post_processing(
164        &self,
165        pool: &AnyPool,
166        result: &Result<Exchange, CamelError>,
167        row_json: &JsonValue,
168    ) -> Result<(), CamelError> {
169        match result {
170            Ok(_) => {
171                // Success - execute onConsume if configured
172                if let Some(ref on_consume) = self.config.on_consume {
173                    self.execute_post_query(pool, on_consume, row_json).await?;
174                }
175            }
176            Err(_) => {
177                // Failure - execute onConsumeFailed if configured
178                if let Some(ref on_consume_failed) = self.config.on_consume_failed {
179                    self.execute_post_query(pool, on_consume_failed, row_json)
180                        .await?;
181                }
182            }
183        }
184        Ok(())
185    }
186
187    /// Execute a post-processing query with the row data as parameters.
188    async fn execute_post_query(
189        &self,
190        pool: &AnyPool,
191        query_str: &str,
192        row_json: &JsonValue,
193    ) -> Result<(), CamelError> {
194        // Parse the query template
195        let template = parse_query_template(query_str, self.config.placeholder)?;
196
197        // Create a temporary exchange with the row as body for parameter resolution
198        // Populate CamelSql.* headers so named params can reference them
199        let mut temp_msg = Message::new(Body::Json(row_json.clone()));
200        if let Some(obj) = row_json.as_object() {
201            for (key, value) in obj {
202                temp_msg.set_header(format!("CamelSql.{}", key), value.clone());
203            }
204        }
205        let temp_exchange = Exchange::new(temp_msg);
206
207        // Resolve parameters
208        let prepared = resolve_params(&template, &temp_exchange, &self.config.in_separator)?;
209
210        // Build and execute the query
211        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
212        let result = query.execute(pool).await.map_err(|e| {
213            CamelError::ProcessorError(format!("Post-query execution failed: {}", e))
214        })?;
215
216        // Warn if 0 rows affected (the row may not have been marked correctly)
217        if result.rows_affected() == 0 {
218            warn!(
219                query = query_str,
220                "Post-processing query affected 0 rows — the row may not have been marked correctly"
221            );
222        }
223
224        Ok(())
225    }
226
227    async fn bridge_poll_error(
228        &self,
229        context: &ConsumerContext,
230        error: CamelError,
231    ) -> Result<(), CamelError> {
232        if !self.config.bridge_error_handler {
233            return Ok(());
234        }
235        let mut exchange = Exchange::new(Message::default());
236        exchange.set_error(error);
237        context.send_and_wait(exchange).await.map(|_| ())
238    }
239}
240
241#[async_trait]
242impl Consumer for SqlConsumer {
243    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
244        // Reject double-start
245        if self.stopped {
246            return Err(CamelError::Config(
247                "SQL consumer cannot be restarted after stop".into(),
248            ));
249        }
250
251        // Step 1: Initialize the connection pool
252        let pool = self
253            .pool
254            .get_or_try_init(|| async {
255                // Defensive: ensure config is resolved even if caller didn't use create_endpoint
256                self.config.resolve_defaults();
257                // SQL-014: resolve file-based query asynchronously (not blocking)
258                self.config.resolve_file_query().await?;
259
260                // Install all compiled-in sqlx drivers so AnyPool can resolve them.
261                // This is idempotent; safe to call multiple times.
262                sqlx::any::install_default_drivers();
263                let db_url = enrich_db_url_with_ssl(&self.config.db_url, &self.config)?;
264
265                let max_conn = self.config.max_connections.ok_or_else(|| {
266                    CamelError::Config("max_connections not resolved for SQL consumer pool".into())
267                })?;
268                let min_conn = self.config.min_connections.ok_or_else(|| {
269                    CamelError::Config("min_connections not resolved for SQL consumer pool".into())
270                })?;
271                let idle_timeout = self.config.idle_timeout_secs.ok_or_else(|| {
272                    CamelError::Config(
273                        "idle_timeout_secs not resolved for SQL consumer pool".into(),
274                    )
275                })?;
276                let max_lifetime = self.config.max_lifetime_secs.ok_or_else(|| {
277                    CamelError::Config(
278                        "max_lifetime_secs not resolved for SQL consumer pool".into(),
279                    )
280                })?;
281
282                info!(
283                    db_url = %redact_db_url(&self.config.db_url),
284                    "SQL consumer pool initializing"
285                );
286                AnyPoolOptions::new()
287                    .max_connections(max_conn)
288                    .min_connections(min_conn)
289                    .idle_timeout(Duration::from_secs(idle_timeout))
290                    .max_lifetime(Duration::from_secs(max_lifetime))
291                    .connect(&db_url)
292                    .await
293                    .map_err(|e| {
294                        CamelError::EndpointCreationFailed(format!(
295                            "Failed to connect to database: {}",
296                            e
297                        ))
298                    })
299            })
300            .await?;
301
302        // SQL-002: warn if Managed transaction mode requested
303        if self.config.transaction_mode == TransactionMode::Managed {
304            warn!("transactionManager not yet implemented; using Auto mode");
305        }
306
307        // SQL-017/SQL-018: log processing and poll strategies
308        if self.config.processing_strategy == ProcessingStrategy::Scheduled {
309            debug!(
310                "Processing strategy: Scheduled (rows dispatched individually via send_and_wait)"
311            );
312        }
313        if self.config.poll_strategy == PollStrategy::Burst {
314            debug!("Poll strategy: Burst (rapid successive polls)");
315        }
316
317        // Warn if no onConsume configured
318        if self.config.on_consume.is_none() {
319            warn!(
320                "SQL consumer started without onConsume configured — consumed rows will not be marked/deleted"
321            );
322        }
323
324        info!(
325            db_url = %redact_db_url(&self.config.db_url),
326            query_len = self.config.query.len(),
327            "SQL consumer started"
328        );
329
330        // Step 2: Parse query template once (avoid re-parsing every poll)
331        let template = parse_query_template(&self.config.query, self.config.placeholder)
332            .map_err(|e| CamelError::Config(format!("Invalid query template: {}", e)))?;
333
334        // Step 3: Initial delay before starting polling
335        if self.config.initial_delay_ms > 0 {
336            tokio::select! {
337                _ = context.cancelled() => {
338                    info!("SQL consumer stopped during initial delay");
339                    return Ok(());
340                }
341                _ = tokio::time::sleep(Duration::from_millis(self.config.initial_delay_ms)) => {}
342            }
343        }
344
345        // Step 4: Polling loop
346        let mut poll_count: u32 = 0;
347        loop {
348            // SQL-015: check repeat_count limit
349            if let Some(max_repeats) = self.config.repeat_count
350                && poll_count >= max_repeats
351            {
352                info!(
353                    repeat_count = max_repeats,
354                    "SQL consumer reached repeat_count limit, stopping"
355                );
356                break;
357            }
358
359            tokio::select! {
360                _ = context.cancelled() => {
361                    info!("SQL consumer stopped");
362                    break;
363                }
364                _ = tokio::time::sleep(Duration::from_millis(self.config.delay_ms)) => {
365                    poll_count += 1;
366                    if let Err(e) = self.poll_database(pool, &context, &template).await {
367                        error!(error = %e, "SQL consumer poll failed");
368                        if let Err(route_err) = self.bridge_poll_error(&context, e).await {
369                            error!(error = %route_err, "Failed to bridge SQL consumer error to route");
370                        }
371                    }
372                }
373            }
374        }
375
376        Ok(())
377    }
378
379    async fn stop(&mut self) -> Result<(), CamelError> {
380        // Double-stop is safe — no-op after first stop
381        if self.stopped {
382            debug!("SQL consumer stop called on already-stopped consumer");
383            return Ok(());
384        }
385
386        // Close the connection pool if it was initialized
387        if let Some(pool) = self.pool.get() {
388            debug!("SQL consumer closing connection pool");
389            pool.close().await;
390            debug!("SQL consumer pool closed");
391        }
392
393        self.stopped = true;
394        info!("SQL consumer stopped");
395        Ok(())
396    }
397
398    fn concurrency_model(&self) -> ConcurrencyModel {
399        // Sequential is correct for SQL consumers: concurrent polls would fetch
400        // duplicate rows. The design doc mentioned SharedState (which doesn't exist
401        // in this runtime) — Sequential is the correct equivalent.
402        ConcurrencyModel::Sequential
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409    use crate::config::SqlEndpointConfig;
410    use camel_component_api::ExchangeEnvelope;
411    use camel_component_api::UriConfig;
412    use sqlx::any::AnyPoolOptions;
413    use std::sync::Arc;
414    use std::time::Duration;
415    use tokio::sync::mpsc;
416    use tokio_util::sync::CancellationToken;
417
418    async fn sqlite_pool() -> AnyPool {
419        sqlx::any::install_default_drivers();
420        AnyPoolOptions::new()
421            .max_connections(1)
422            .connect("sqlite::memory:")
423            .await
424            .expect("sqlite pool")
425    }
426
427    async fn seed_consumer_table(pool: &AnyPool) {
428        sqlx::query("CREATE TABLE jobs (id INTEGER PRIMARY KEY, processed INTEGER DEFAULT 0, failed INTEGER DEFAULT 0)")
429            .execute(pool)
430            .await
431            .expect("create table");
432        sqlx::query("INSERT INTO jobs (id, processed, failed) VALUES (1, 0, 0), (2, 0, 0)")
433            .execute(pool)
434            .await
435            .expect("seed rows");
436    }
437
438    fn config() -> SqlEndpointConfig {
439        let mut c =
440            SqlEndpointConfig::from_uri("sql:select * from t?db_url=postgres://localhost/test")
441                .unwrap();
442        c.resolve_defaults();
443        c
444    }
445
446    #[test]
447    fn consumer_concurrency_model() {
448        let c = SqlConsumer::new(config(), Arc::new(OnceCell::new()));
449        assert_eq!(c.concurrency_model(), ConcurrencyModel::Sequential);
450    }
451
452    #[test]
453    fn consumer_stores_config() {
454        let mut config = SqlEndpointConfig::from_uri(
455            "sql:select * from t?db_url=postgres://localhost/test&delay=2000&onConsume=update t set done=true"
456        ).unwrap();
457        config.resolve_defaults();
458        let c = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
459        assert_eq!(c.config.delay_ms, 2000);
460        assert!(c.config.on_consume.is_some());
461    }
462
463    #[tokio::test]
464    async fn poll_database_runs_on_consume_for_successful_rows() {
465        let pool = sqlite_pool().await;
466        seed_consumer_table(&pool).await;
467
468        let mut config = SqlEndpointConfig::from_uri(
469            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
470        )
471        .unwrap();
472        config.resolve_defaults();
473
474        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
475        let template = parse_query_template(&config.query, config.placeholder).unwrap();
476
477        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
478        tokio::spawn(async move {
479            while let Some(env) = rx.recv().await {
480                if let Some(reply_tx) = env.reply_tx {
481                    let _ = reply_tx.send(Ok(env.exchange));
482                }
483            }
484        });
485        let ctx = ConsumerContext::new(tx, CancellationToken::new());
486
487        consumer
488            .poll_database(&pool, &ctx, &template)
489            .await
490            .expect("poll must succeed");
491
492        let row = sqlx::query("select processed from jobs where id = 1")
493            .fetch_one(&pool)
494            .await
495            .expect("row 1");
496        let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
497
498        let row = sqlx::query("select processed from jobs where id = 2")
499            .fetch_one(&pool)
500            .await
501            .expect("row 2");
502        let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
503
504        assert_eq!(processed_1, 1);
505        assert_eq!(processed_2, 1);
506    }
507
508    #[tokio::test]
509    async fn poll_database_runs_on_consume_failed_when_downstream_fails() {
510        let pool = sqlite_pool().await;
511        seed_consumer_table(&pool).await;
512
513        let mut config = SqlEndpointConfig::from_uri(
514            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&initialDelay=0&delay=1",
515        )
516        .unwrap();
517        config.resolve_defaults();
518
519        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
520        let template = parse_query_template(&config.query, config.placeholder).unwrap();
521
522        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
523        tokio::spawn(async move {
524            while let Some(env) = rx.recv().await {
525                if let Some(reply_tx) = env.reply_tx {
526                    let _ =
527                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
528                }
529            }
530        });
531        let ctx = ConsumerContext::new(tx, CancellationToken::new());
532
533        consumer
534            .poll_database(&pool, &ctx, &template)
535            .await
536            .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
537
538        let row = sqlx::query("select failed from jobs where id = 1")
539            .fetch_one(&pool)
540            .await
541            .expect("row 1");
542        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
543
544        let row = sqlx::query("select failed from jobs where id = 2")
545            .fetch_one(&pool)
546            .await
547            .expect("row 2");
548        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
549
550        assert_eq!(failed_1, 1);
551        assert_eq!(failed_2, 1);
552    }
553
554    #[tokio::test]
555    async fn poll_database_breaks_batch_on_consume_fail() {
556        let pool = sqlite_pool().await;
557        seed_consumer_table(&pool).await;
558
559        let mut config = SqlEndpointConfig::from_uri(
560            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&breakBatchOnConsumeFail=true&initialDelay=0&delay=1",
561        )
562        .unwrap();
563        config.resolve_defaults();
564
565        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
566        let template = parse_query_template(&config.query, config.placeholder).unwrap();
567
568        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
569        tokio::spawn(async move {
570            while let Some(env) = rx.recv().await {
571                if let Some(reply_tx) = env.reply_tx {
572                    let _ =
573                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
574                }
575            }
576        });
577        let ctx = ConsumerContext::new(tx, CancellationToken::new());
578
579        let err = consumer
580            .poll_database(&pool, &ctx, &template)
581            .await
582            .expect_err("must stop on first downstream failure");
583        assert!(err.to_string().contains("downstream boom"));
584
585        let row = sqlx::query("select failed from jobs where id = 1")
586            .fetch_one(&pool)
587            .await
588            .expect("row 1");
589        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
590
591        let row = sqlx::query("select failed from jobs where id = 2")
592            .fetch_one(&pool)
593            .await
594            .expect("row 2");
595        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
596
597        assert_eq!(failed_1, 1);
598        assert_eq!(failed_2, 0, "second row must not be processed");
599    }
600
601    // --- Phase B hardening tests ---
602
603    // SQL-001: Direct consumer construction without resolve_defaults does not panic.
604    // The consumer defensively calls resolve_defaults() during pool init, so the pool
605    // fields get resolved. This test verifies no panic occurs.
606    #[tokio::test]
607    async fn consumer_no_panic_without_prior_resolve_defaults() {
608        let config = SqlEndpointConfig::from_uri(
609            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
610        )
611        .unwrap();
612        // Deliberately NOT calling resolve_defaults() — pool fields remain None
613        assert!(config.max_connections.is_none());
614
615        let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()));
616        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
617        tokio::spawn(async move {
618            while let Some(env) = rx.recv().await {
619                if let Some(reply_tx) = env.reply_tx {
620                    let _ = reply_tx.send(Ok(env.exchange));
621                }
622            }
623        });
624        let token = CancellationToken::new();
625        let ctx = ConsumerContext::new(tx, token.clone());
626
627        // Spawn the consumer and cancel it quickly — it should not panic
628        let consumer_handle = tokio::spawn(async move { consumer.start(ctx).await });
629
630        // Cancel after a short delay
631        tokio::time::sleep(Duration::from_millis(50)).await;
632        token.cancel();
633
634        let result = consumer_handle.await.expect("task should not panic");
635        // Should complete without panic (may be Ok or Err depending on timing)
636        let _ = result;
637    }
638
639    // SQL-008: stop() closes the pool
640    #[tokio::test]
641    async fn stop_closes_pool() {
642        let pool = sqlite_pool().await;
643        seed_consumer_table(&pool).await;
644
645        let mut config = SqlEndpointConfig::from_uri(
646            "sql:select id from jobs?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
647        )
648        .unwrap();
649        config.resolve_defaults();
650
651        let pool_cell = Arc::new(OnceCell::new());
652        pool_cell.set(pool.clone()).unwrap();
653
654        let mut consumer = SqlConsumer::new(config, pool_cell);
655        consumer.stop().await.expect("stop should succeed");
656
657        // After stop, the pool should be closed
658        assert!(
659            pool.is_closed(),
660            "Pool should be closed after consumer.stop()"
661        );
662    }
663
664    // SQL-008: double-stop is safe
665    #[tokio::test]
666    async fn double_stop_is_safe() {
667        let pool = sqlite_pool().await;
668        let mut config = SqlEndpointConfig::from_uri(
669            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
670        )
671        .unwrap();
672        config.resolve_defaults();
673
674        let pool_cell = Arc::new(OnceCell::new());
675        pool_cell.set(pool.clone()).unwrap();
676
677        let mut consumer = SqlConsumer::new(config, pool_cell);
678        consumer.stop().await.expect("first stop should succeed");
679        consumer
680            .stop()
681            .await
682            .expect("second stop should also succeed");
683    }
684
685    // SQL-008: start after stop is rejected
686    #[tokio::test]
687    async fn start_after_stop_rejected() {
688        let pool = sqlite_pool().await;
689        let mut config = SqlEndpointConfig::from_uri(
690            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
691        )
692        .unwrap();
693        config.resolve_defaults();
694
695        let pool_cell = Arc::new(OnceCell::new());
696        pool_cell.set(pool.clone()).unwrap();
697
698        let mut consumer = SqlConsumer::new(config, pool_cell);
699        consumer.stop().await.expect("stop should succeed");
700
701        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
702        tokio::spawn(async move {
703            while let Some(env) = rx.recv().await {
704                if let Some(reply_tx) = env.reply_tx {
705                    let _ = reply_tx.send(Ok(env.exchange));
706                }
707            }
708        });
709        let ctx = ConsumerContext::new(tx, CancellationToken::new());
710
711        let result = consumer.start(ctx).await;
712        assert!(result.is_err());
713        let err_msg = result.unwrap_err().to_string();
714        assert!(
715            err_msg.contains("cannot be restarted") || err_msg.contains("after stop"),
716            "Expected restart error, got: {}",
717            err_msg
718        );
719    }
720
721    // SQL-021: batch mode per-row post-processing
722    #[tokio::test]
723    async fn batch_mode_per_row_post_processing() {
724        let pool = sqlite_pool().await;
725        seed_consumer_table(&pool).await;
726
727        let mut config = SqlEndpointConfig::from_uri(
728            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
729        )
730        .unwrap();
731        config.resolve_defaults();
732
733        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
734        let template = parse_query_template(&config.query, config.placeholder).unwrap();
735
736        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
737        tokio::spawn(async move {
738            while let Some(env) = rx.recv().await {
739                if let Some(reply_tx) = env.reply_tx {
740                    let _ = reply_tx.send(Ok(env.exchange));
741                }
742            }
743        });
744        let ctx = ConsumerContext::new(tx, CancellationToken::new());
745
746        consumer
747            .poll_database(&pool, &ctx, &template)
748            .await
749            .expect("poll must succeed");
750
751        // SQL-021: Each row should have been processed individually via onConsume
752        let row = sqlx::query("select processed from jobs where id = 1")
753            .fetch_one(&pool)
754            .await
755            .expect("row 1");
756        let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
757
758        let row = sqlx::query("select processed from jobs where id = 2")
759            .fetch_one(&pool)
760            .await
761            .expect("row 2");
762        let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
763
764        assert_eq!(
765            processed_1, 1,
766            "row 1 should be marked processed via per-row onConsume"
767        );
768        assert_eq!(
769            processed_2, 1,
770            "row 2 should be marked processed via per-row onConsume"
771        );
772    }
773
774    // SQL-021: batch mode per-row onConsumeFailed when downstream fails
775    #[tokio::test]
776    async fn batch_mode_per_row_post_processing_on_failure() {
777        let pool = sqlite_pool().await;
778        seed_consumer_table(&pool).await;
779
780        let mut config = SqlEndpointConfig::from_uri(
781            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
782        )
783        .unwrap();
784        config.resolve_defaults();
785
786        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
787        let template = parse_query_template(&config.query, config.placeholder).unwrap();
788
789        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
790        tokio::spawn(async move {
791            while let Some(env) = rx.recv().await {
792                if let Some(reply_tx) = env.reply_tx {
793                    let _ =
794                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
795                }
796            }
797        });
798        let ctx = ConsumerContext::new(tx, CancellationToken::new());
799
800        consumer
801            .poll_database(&pool, &ctx, &template)
802            .await
803            .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
804
805        // SQL-021: Each row should have onConsumeFailed executed individually
806        let row = sqlx::query("select failed from jobs where id = 1")
807            .fetch_one(&pool)
808            .await
809            .expect("row 1");
810        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
811
812        let row = sqlx::query("select failed from jobs where id = 2")
813            .fetch_one(&pool)
814            .await
815            .expect("row 2");
816        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
817
818        assert_eq!(
819            failed_1, 1,
820            "row 1 should be marked failed via per-row onConsumeFailed"
821        );
822        assert_eq!(
823            failed_2, 1,
824            "row 2 should be marked failed via per-row onConsumeFailed"
825        );
826    }
827
828    #[tokio::test]
829    async fn bridge_error_handler_routes_poll_errors_to_exchange_error() {
830        let mut config = config();
831        config.bridge_error_handler = true;
832        let consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()));
833
834        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
835        tokio::spawn(async move {
836            while let Some(env) = rx.recv().await {
837                assert!(env.exchange.error.is_some(), "exchange must carry error");
838                if let Some(reply_tx) = env.reply_tx {
839                    let _ = reply_tx.send(Ok(env.exchange));
840                }
841                break;
842            }
843        });
844
845        let ctx = ConsumerContext::new(tx, CancellationToken::new());
846        consumer
847            .bridge_poll_error(&ctx, CamelError::ProcessorError("poll failed".into()))
848            .await
849            .expect("bridging should succeed");
850    }
851}