Skip to main content

camel_component_sql/
consumer.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures::TryStreamExt;
7use serde_json::Value as JsonValue;
8use sqlx::AnyPool;
9use sqlx::any::AnyPoolOptions;
10use sqlx::any::AnyRow;
11use tokio::sync::OnceCell;
12use tracing::{debug, error, info, warn};
13
14use camel_component_api::{Body, CamelError, Exchange, Message, StreamBody, StreamMetadata};
15use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
16
17use crate::config::{
18    PollStrategy, ProcessingStrategy, SqlEndpointConfig, SqlOutputType, TransactionMode,
19    enrich_db_url_with_ssl, redact_db_url,
20};
21use crate::headers;
22use crate::query::{QueryTemplate, parse_query_template, resolve_params};
23use crate::utils::{bind_json_values, row_to_json};
24
25pub struct SqlConsumer {
26    pub(crate) config: SqlEndpointConfig,
27    pub(crate) pool: Arc<OnceCell<AnyPool>>,
28    stopped: bool,
29}
30
31impl SqlConsumer {
32    pub fn new(config: SqlEndpointConfig, pool: Arc<OnceCell<AnyPool>>) -> Self {
33        Self {
34            config,
35            pool,
36            stopped: false,
37        }
38    }
39
40    /// Poll the database for new rows and process them.
41    async fn poll_database(
42        &self,
43        pool: &AnyPool,
44        context: &ConsumerContext,
45        template: &QueryTemplate,
46    ) -> Result<(), CamelError> {
47        // Create an empty exchange for parameter resolution (consumer has no input)
48        let empty_exchange = Exchange::new(Message::default());
49
50        // Resolve parameters
51        let prepared = resolve_params(template, &empty_exchange, &self.config.in_separator)?;
52
53        debug!(query = %prepared.sql, "executing SQL consumer poll");
54
55        if self.config.output_type == SqlOutputType::StreamList {
56            return self.poll_database_stream(pool, context, &prepared).await;
57        }
58
59        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
60        let rows: Vec<AnyRow> = query.fetch_all(pool).await.map_err(|e| {
61            warn!(error = %e, "SQL consumer poll query failed");
62            CamelError::ProcessorError(format!("Query execution failed: {}", e))
63        })?;
64
65        debug!(rows = rows.len(), "SQL consumer poll completed");
66
67        if rows.is_empty() && !self.config.route_empty_result_set {
68            return Ok(());
69        }
70
71        let rows_to_process: Vec<AnyRow> = if let Some(max) = self.config.max_messages_per_poll {
72            if max > 0 {
73                rows.into_iter().take(max as usize).collect()
74            } else {
75                rows
76            }
77        } else {
78            rows
79        };
80
81        if self.config.use_iterator {
82            // Process each row individually
83            for row in rows_to_process {
84                let row_json = row_to_json(&row)?;
85
86                // Create exchange with the row as JSON body
87                let mut msg = Message::new(Body::Json(row_json.clone()));
88
89                // Set individual column headers with CamelSql. prefix per Apache Camel convention
90                if let Some(obj) = row_json.as_object() {
91                    for (key, value) in obj {
92                        msg.set_header(format!("CamelSql.{}", key), value.clone());
93                    }
94                }
95
96                let exchange = Exchange::new(msg);
97
98                // Send and wait for processing
99                let result = context.send_and_wait(exchange).await;
100
101                // Handle post-processing (onConsume/onConsumeFailed)
102                if let Err(e) = self.handle_post_processing(pool, &result, &row_json).await {
103                    error!(error = %e, "Post-processing failed");
104                    if self.config.break_batch_on_consume_fail {
105                        return Err(e);
106                    }
107                }
108
109                // If downstream processing itself failed, honour break_batch_on_consume_fail
110                if let Err(ref consume_err) = result
111                    && self.config.break_batch_on_consume_fail
112                {
113                    return Err(consume_err.clone());
114                }
115            }
116        } else {
117            // Process all rows as a single batch
118            let rows_json: Vec<JsonValue> = rows_to_process
119                .iter()
120                .map(row_to_json)
121                .collect::<Result<Vec<_>, CamelError>>()?;
122
123            let row_count = rows_json.len();
124
125            // Create exchange with array of rows
126            let mut msg = Message::new(Body::Json(JsonValue::Array(rows_json.clone())));
127            msg.set_header(headers::ROW_COUNT, JsonValue::Number(row_count.into()));
128
129            let exchange = Exchange::new(msg);
130
131            // Send and wait for result
132            let result = context.send_and_wait(exchange).await;
133
134            // SQL-021: Run per-row post-processing even in batch mode so that
135            // onConsume/onConsumeFailed queries can reference row-specific parameters
136            // (e.g. `:#id`). Each row gets its own post-processing query execution.
137            for row_json in rows_json.iter() {
138                if let Err(e) = self.handle_post_processing(pool, &result, row_json).await {
139                    error!(error = %e, "Post-processing failed for batch row");
140                    if self.config.break_batch_on_consume_fail {
141                        return Err(e);
142                    }
143                }
144            }
145
146            // If downstream processing itself failed, honour break_batch_on_consume_fail
147            if let Err(ref consume_err) = result
148                && self.config.break_batch_on_consume_fail
149            {
150                return Err(consume_err.clone());
151            }
152        }
153
154        // Execute on_consume_batch_complete if configured
155        if let Some(ref batch_query) = self.config.on_consume_batch_complete
156            && let Err(e) = self
157                .execute_post_query(pool, batch_query, &JsonValue::Null)
158                .await
159        {
160            error!(error = %e, "onConsumeBatchComplete query failed");
161        }
162
163        Ok(())
164    }
165
166    async fn poll_database_stream(
167        &self,
168        pool: &AnyPool,
169        context: &ConsumerContext,
170        prepared: &crate::query::PreparedQuery,
171    ) -> Result<(), CamelError> {
172        let pool_clone = pool.clone();
173        let sql_str = prepared.sql.clone();
174        let bindings = prepared.bindings.clone();
175
176        let byte_stream = async_stream::try_stream! {
177            let mut q = sqlx::query(&sql_str);
178            q = bind_json_values(q, &bindings);
179            let mut rows = q.fetch(&pool_clone);
180            while let Some(row) = rows.try_next().await.map_err(|e| {
181                CamelError::ProcessorError(format!("Query execution failed: {}", e))
182            })? {
183                let json_val = row_to_json(&row).map_err(|e| {
184                    CamelError::ProcessorError(format!("JSON serialization failed: {}", e))
185                })?;
186                let mut bytes = serde_json::to_vec(&json_val)
187                    .map_err(|e| CamelError::ProcessorError(format!("JSON serialization failed: {}", e)))?;
188                bytes.push(b'\n');
189                yield Bytes::from(bytes);
190            }
191        };
192
193        let msg = Message::new(Body::Stream(StreamBody {
194            stream: Arc::new(tokio::sync::Mutex::new(Some(Box::pin(byte_stream)))),
195            metadata: StreamMetadata {
196                content_type: Some("application/x-ndjson".to_string()),
197                size_hint: None,
198                origin: None,
199            },
200        }));
201
202        let exchange = Exchange::new(msg);
203        let result = context.send_and_wait(exchange).await;
204        if let Err(e) = result {
205            error!(error = %e, "StreamList consumer downstream processing failed");
206            return Err(e);
207        }
208
209        debug!("StreamList: consumer poll completed (lazy stream emitted)");
210        Ok(())
211    }
212
213    /// Handle post-processing after a row is processed (onConsume/onConsumeFailed).
214    async fn handle_post_processing(
215        &self,
216        pool: &AnyPool,
217        result: &Result<Exchange, CamelError>,
218        row_json: &JsonValue,
219    ) -> Result<(), CamelError> {
220        match result {
221            Ok(_) => {
222                // Success - execute onConsume if configured
223                if let Some(ref on_consume) = self.config.on_consume {
224                    self.execute_post_query(pool, on_consume, row_json).await?;
225                }
226            }
227            Err(_) => {
228                // Failure - execute onConsumeFailed if configured
229                if let Some(ref on_consume_failed) = self.config.on_consume_failed {
230                    self.execute_post_query(pool, on_consume_failed, row_json)
231                        .await?;
232                }
233            }
234        }
235        Ok(())
236    }
237
238    /// Execute a post-processing query with the row data as parameters.
239    async fn execute_post_query(
240        &self,
241        pool: &AnyPool,
242        query_str: &str,
243        row_json: &JsonValue,
244    ) -> Result<(), CamelError> {
245        // Parse the query template
246        let template = parse_query_template(query_str, self.config.placeholder)?;
247
248        // Create a temporary exchange with the row as body for parameter resolution
249        // Populate CamelSql.* headers so named params can reference them
250        let mut temp_msg = Message::new(Body::Json(row_json.clone()));
251        if let Some(obj) = row_json.as_object() {
252            for (key, value) in obj {
253                temp_msg.set_header(format!("CamelSql.{}", key), value.clone());
254            }
255        }
256        let temp_exchange = Exchange::new(temp_msg);
257
258        // Resolve parameters
259        let prepared = resolve_params(&template, &temp_exchange, &self.config.in_separator)?;
260
261        // Build and execute the query
262        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
263        let result = query.execute(pool).await.map_err(|e| {
264            CamelError::ProcessorError(format!("Post-query execution failed: {}", e))
265        })?;
266
267        // Warn if 0 rows affected (the row may not have been marked correctly)
268        if result.rows_affected() == 0 {
269            warn!(
270                query = query_str,
271                "Post-processing query affected 0 rows — the row may not have been marked correctly"
272            );
273        }
274
275        Ok(())
276    }
277
278    async fn bridge_poll_error(
279        &self,
280        context: &ConsumerContext,
281        error: CamelError,
282    ) -> Result<(), CamelError> {
283        if !self.config.bridge_error_handler {
284            return Ok(());
285        }
286        let mut exchange = Exchange::new(Message::default());
287        exchange.set_error(error);
288        context.send_and_wait(exchange).await.map(|_| ())
289    }
290}
291
292#[async_trait]
293impl Consumer for SqlConsumer {
294    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
295        // Reject double-start
296        if self.stopped {
297            return Err(CamelError::Config(
298                "SQL consumer cannot be restarted after stop".into(),
299            ));
300        }
301
302        // Step 1: Initialize the connection pool
303        let pool = self
304            .pool
305            .get_or_try_init(|| async {
306                // Defensive: ensure config is resolved even if caller didn't use create_endpoint
307                self.config.resolve_defaults();
308                // SQL-014: resolve file-based query asynchronously (not blocking)
309                self.config.resolve_file_query().await?;
310
311                // Install all compiled-in sqlx drivers so AnyPool can resolve them.
312                // This is idempotent; safe to call multiple times.
313                sqlx::any::install_default_drivers();
314                let db_url = enrich_db_url_with_ssl(&self.config.db_url, &self.config)?;
315
316                let max_conn = self.config.max_connections.ok_or_else(|| {
317                    CamelError::Config("max_connections not resolved for SQL consumer pool".into())
318                })?;
319                let min_conn = self.config.min_connections.ok_or_else(|| {
320                    CamelError::Config("min_connections not resolved for SQL consumer pool".into())
321                })?;
322                let idle_timeout = self.config.idle_timeout_secs.ok_or_else(|| {
323                    CamelError::Config(
324                        "idle_timeout_secs not resolved for SQL consumer pool".into(),
325                    )
326                })?;
327                let max_lifetime = self.config.max_lifetime_secs.ok_or_else(|| {
328                    CamelError::Config(
329                        "max_lifetime_secs not resolved for SQL consumer pool".into(),
330                    )
331                })?;
332
333                info!(
334                    db_url = %redact_db_url(&self.config.db_url),
335                    "SQL consumer pool initializing"
336                );
337                AnyPoolOptions::new()
338                    .max_connections(max_conn)
339                    .min_connections(min_conn)
340                    .idle_timeout(Duration::from_secs(idle_timeout))
341                    .max_lifetime(Duration::from_secs(max_lifetime))
342                    .connect(&db_url)
343                    .await
344                    .map_err(|e| {
345                        CamelError::EndpointCreationFailed(format!(
346                            "Failed to connect to database: {}",
347                            e
348                        ))
349                    })
350            })
351            .await?;
352
353        // SQL-002: warn if Managed transaction mode requested
354        if self.config.transaction_mode == TransactionMode::Managed {
355            warn!("transactionManager not yet implemented; using Auto mode");
356        }
357
358        // SQL-017/SQL-018: log processing and poll strategies
359        if self.config.processing_strategy == ProcessingStrategy::Scheduled {
360            debug!(
361                "Processing strategy: Scheduled (rows dispatched individually via send_and_wait)"
362            );
363        }
364        if self.config.poll_strategy == PollStrategy::Burst {
365            debug!("Poll strategy: Burst (rapid successive polls)");
366        }
367
368        if self.config.output_type == SqlOutputType::StreamList
369            && (self.config.on_consume.is_some()
370                || self.config.on_consume_failed.is_some()
371                || self.config.on_consume_batch_complete.is_some())
372        {
373            warn!(
374                "onConsume/onConsumeFailed/onConsumeBatchComplete are not executed in StreamList mode \
375                 (rows are consumed lazily downstream)"
376            );
377        }
378
379        // Warn if no onConsume configured
380        if self.config.on_consume.is_none() {
381            warn!(
382                "SQL consumer started without onConsume configured — consumed rows will not be marked/deleted"
383            );
384        }
385
386        info!(
387            db_url = %redact_db_url(&self.config.db_url),
388            query_len = self.config.query.len(),
389            "SQL consumer started"
390        );
391
392        // Step 2: Parse query template once (avoid re-parsing every poll)
393        let template = parse_query_template(&self.config.query, self.config.placeholder)
394            .map_err(|e| CamelError::Config(format!("Invalid query template: {}", e)))?;
395
396        // Step 3: Initial delay before starting polling
397        if self.config.initial_delay_ms > 0 {
398            tokio::select! {
399                _ = context.cancelled() => {
400                    info!("SQL consumer stopped during initial delay");
401                    return Ok(());
402                }
403                _ = tokio::time::sleep(Duration::from_millis(self.config.initial_delay_ms)) => {}
404            }
405        }
406
407        // Step 4: Polling loop
408        let mut poll_count: u32 = 0;
409        loop {
410            // SQL-015: check repeat_count limit
411            if let Some(max_repeats) = self.config.repeat_count
412                && poll_count >= max_repeats
413            {
414                info!(
415                    repeat_count = max_repeats,
416                    "SQL consumer reached repeat_count limit, stopping"
417                );
418                break;
419            }
420
421            tokio::select! {
422                _ = context.cancelled() => {
423                    info!("SQL consumer stopped");
424                    break;
425                }
426                _ = tokio::time::sleep(Duration::from_millis(self.config.delay_ms)) => {
427                    poll_count += 1;
428                    if let Err(e) = self.poll_database(pool, &context, &template).await {
429                        error!(error = %e, "SQL consumer poll failed");
430                        if let Err(route_err) = self.bridge_poll_error(&context, e).await {
431                            error!(error = %route_err, "Failed to bridge SQL consumer error to route");
432                        }
433                    }
434                }
435            }
436        }
437
438        Ok(())
439    }
440
441    async fn stop(&mut self) -> Result<(), CamelError> {
442        // Double-stop is safe — no-op after first stop
443        if self.stopped {
444            debug!("SQL consumer stop called on already-stopped consumer");
445            return Ok(());
446        }
447
448        // Close the connection pool if it was initialized
449        if let Some(pool) = self.pool.get() {
450            debug!("SQL consumer closing connection pool");
451            pool.close().await;
452            debug!("SQL consumer pool closed");
453        }
454
455        self.stopped = true;
456        info!("SQL consumer stopped");
457        Ok(())
458    }
459
460    fn concurrency_model(&self) -> ConcurrencyModel {
461        // Sequential is correct for SQL consumers: concurrent polls would fetch
462        // duplicate rows. The design doc mentioned SharedState (which doesn't exist
463        // in this runtime) — Sequential is the correct equivalent.
464        ConcurrencyModel::Sequential
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471    use crate::config::SqlEndpointConfig;
472    use camel_component_api::ExchangeEnvelope;
473    use camel_component_api::UriConfig;
474    use sqlx::any::AnyPoolOptions;
475    use std::sync::Arc;
476    use std::time::Duration;
477    use tokio::sync::mpsc;
478    use tokio_util::sync::CancellationToken;
479
480    async fn sqlite_pool() -> AnyPool {
481        sqlx::any::install_default_drivers();
482        AnyPoolOptions::new()
483            .max_connections(1)
484            .connect("sqlite::memory:")
485            .await
486            .expect("sqlite pool")
487    }
488
489    async fn seed_consumer_table(pool: &AnyPool) {
490        sqlx::query("CREATE TABLE jobs (id INTEGER PRIMARY KEY, processed INTEGER DEFAULT 0, failed INTEGER DEFAULT 0)")
491            .execute(pool)
492            .await
493            .expect("create table");
494        sqlx::query("INSERT INTO jobs (id, processed, failed) VALUES (1, 0, 0), (2, 0, 0)")
495            .execute(pool)
496            .await
497            .expect("seed rows");
498    }
499
500    fn config() -> SqlEndpointConfig {
501        let mut c =
502            SqlEndpointConfig::from_uri("sql:select * from t?db_url=postgres://localhost/test")
503                .unwrap();
504        c.resolve_defaults();
505        c
506    }
507
508    #[test]
509    fn consumer_concurrency_model() {
510        let c = SqlConsumer::new(config(), Arc::new(OnceCell::new()));
511        assert_eq!(c.concurrency_model(), ConcurrencyModel::Sequential);
512    }
513
514    #[test]
515    fn consumer_stores_config() {
516        let mut config = SqlEndpointConfig::from_uri(
517            "sql:select * from t?db_url=postgres://localhost/test&delay=2000&onConsume=update t set done=true"
518        ).unwrap();
519        config.resolve_defaults();
520        let c = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
521        assert_eq!(c.config.delay_ms, 2000);
522        assert!(c.config.on_consume.is_some());
523    }
524
525    #[tokio::test]
526    async fn poll_database_runs_on_consume_for_successful_rows() {
527        let pool = sqlite_pool().await;
528        seed_consumer_table(&pool).await;
529
530        let mut config = SqlEndpointConfig::from_uri(
531            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
532        )
533        .unwrap();
534        config.resolve_defaults();
535
536        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
537        let template = parse_query_template(&config.query, config.placeholder).unwrap();
538
539        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
540        tokio::spawn(async move {
541            while let Some(env) = rx.recv().await {
542                if let Some(reply_tx) = env.reply_tx {
543                    let _ = reply_tx.send(Ok(env.exchange));
544                }
545            }
546        });
547        let ctx = ConsumerContext::new(tx, CancellationToken::new());
548
549        consumer
550            .poll_database(&pool, &ctx, &template)
551            .await
552            .expect("poll must succeed");
553
554        let row = sqlx::query("select processed from jobs where id = 1")
555            .fetch_one(&pool)
556            .await
557            .expect("row 1");
558        let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
559
560        let row = sqlx::query("select processed from jobs where id = 2")
561            .fetch_one(&pool)
562            .await
563            .expect("row 2");
564        let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
565
566        assert_eq!(processed_1, 1);
567        assert_eq!(processed_2, 1);
568    }
569
570    #[tokio::test]
571    async fn poll_database_runs_on_consume_failed_when_downstream_fails() {
572        let pool = sqlite_pool().await;
573        seed_consumer_table(&pool).await;
574
575        let mut config = SqlEndpointConfig::from_uri(
576            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&initialDelay=0&delay=1",
577        )
578        .unwrap();
579        config.resolve_defaults();
580
581        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
582        let template = parse_query_template(&config.query, config.placeholder).unwrap();
583
584        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
585        tokio::spawn(async move {
586            while let Some(env) = rx.recv().await {
587                if let Some(reply_tx) = env.reply_tx {
588                    let _ =
589                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
590                }
591            }
592        });
593        let ctx = ConsumerContext::new(tx, CancellationToken::new());
594
595        consumer
596            .poll_database(&pool, &ctx, &template)
597            .await
598            .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
599
600        let row = sqlx::query("select failed from jobs where id = 1")
601            .fetch_one(&pool)
602            .await
603            .expect("row 1");
604        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
605
606        let row = sqlx::query("select failed from jobs where id = 2")
607            .fetch_one(&pool)
608            .await
609            .expect("row 2");
610        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
611
612        assert_eq!(failed_1, 1);
613        assert_eq!(failed_2, 1);
614    }
615
616    #[tokio::test]
617    async fn poll_database_breaks_batch_on_consume_fail() {
618        let pool = sqlite_pool().await;
619        seed_consumer_table(&pool).await;
620
621        let mut config = SqlEndpointConfig::from_uri(
622            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&breakBatchOnConsumeFail=true&initialDelay=0&delay=1",
623        )
624        .unwrap();
625        config.resolve_defaults();
626
627        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
628        let template = parse_query_template(&config.query, config.placeholder).unwrap();
629
630        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
631        tokio::spawn(async move {
632            while let Some(env) = rx.recv().await {
633                if let Some(reply_tx) = env.reply_tx {
634                    let _ =
635                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
636                }
637            }
638        });
639        let ctx = ConsumerContext::new(tx, CancellationToken::new());
640
641        let err = consumer
642            .poll_database(&pool, &ctx, &template)
643            .await
644            .expect_err("must stop on first downstream failure");
645        assert!(err.to_string().contains("downstream boom"));
646
647        let row = sqlx::query("select failed from jobs where id = 1")
648            .fetch_one(&pool)
649            .await
650            .expect("row 1");
651        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
652
653        let row = sqlx::query("select failed from jobs where id = 2")
654            .fetch_one(&pool)
655            .await
656            .expect("row 2");
657        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
658
659        assert_eq!(failed_1, 1);
660        assert_eq!(failed_2, 0, "second row must not be processed");
661    }
662
663    // --- Phase B hardening tests ---
664
665    // SQL-001: Direct consumer construction without resolve_defaults does not panic.
666    // The consumer defensively calls resolve_defaults() during pool init, so the pool
667    // fields get resolved. This test verifies no panic occurs.
668    #[tokio::test]
669    async fn consumer_no_panic_without_prior_resolve_defaults() {
670        let config = SqlEndpointConfig::from_uri(
671            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
672        )
673        .unwrap();
674        // Deliberately NOT calling resolve_defaults() — pool fields remain None
675        assert!(config.max_connections.is_none());
676
677        let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()));
678        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
679        tokio::spawn(async move {
680            while let Some(env) = rx.recv().await {
681                if let Some(reply_tx) = env.reply_tx {
682                    let _ = reply_tx.send(Ok(env.exchange));
683                }
684            }
685        });
686        let token = CancellationToken::new();
687        let ctx = ConsumerContext::new(tx, token.clone());
688
689        // Spawn the consumer and cancel it quickly — it should not panic
690        let consumer_handle = tokio::spawn(async move { consumer.start(ctx).await });
691
692        // Cancel after a short delay
693        tokio::time::sleep(Duration::from_millis(50)).await;
694        token.cancel();
695
696        let result = consumer_handle.await.expect("task should not panic");
697        // Should complete without panic (may be Ok or Err depending on timing)
698        let _ = result;
699    }
700
701    // SQL-008: stop() closes the pool
702    #[tokio::test]
703    async fn stop_closes_pool() {
704        let pool = sqlite_pool().await;
705        seed_consumer_table(&pool).await;
706
707        let mut config = SqlEndpointConfig::from_uri(
708            "sql:select id from jobs?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
709        )
710        .unwrap();
711        config.resolve_defaults();
712
713        let pool_cell = Arc::new(OnceCell::new());
714        pool_cell.set(pool.clone()).unwrap();
715
716        let mut consumer = SqlConsumer::new(config, pool_cell);
717        consumer.stop().await.expect("stop should succeed");
718
719        // After stop, the pool should be closed
720        assert!(
721            pool.is_closed(),
722            "Pool should be closed after consumer.stop()"
723        );
724    }
725
726    // SQL-008: double-stop is safe
727    #[tokio::test]
728    async fn double_stop_is_safe() {
729        let pool = sqlite_pool().await;
730        let mut config = SqlEndpointConfig::from_uri(
731            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
732        )
733        .unwrap();
734        config.resolve_defaults();
735
736        let pool_cell = Arc::new(OnceCell::new());
737        pool_cell.set(pool.clone()).unwrap();
738
739        let mut consumer = SqlConsumer::new(config, pool_cell);
740        consumer.stop().await.expect("first stop should succeed");
741        consumer
742            .stop()
743            .await
744            .expect("second stop should also succeed");
745    }
746
747    // SQL-008: start after stop is rejected
748    #[tokio::test]
749    async fn start_after_stop_rejected() {
750        let pool = sqlite_pool().await;
751        let mut config = SqlEndpointConfig::from_uri(
752            "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
753        )
754        .unwrap();
755        config.resolve_defaults();
756
757        let pool_cell = Arc::new(OnceCell::new());
758        pool_cell.set(pool.clone()).unwrap();
759
760        let mut consumer = SqlConsumer::new(config, pool_cell);
761        consumer.stop().await.expect("stop should succeed");
762
763        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
764        tokio::spawn(async move {
765            while let Some(env) = rx.recv().await {
766                if let Some(reply_tx) = env.reply_tx {
767                    let _ = reply_tx.send(Ok(env.exchange));
768                }
769            }
770        });
771        let ctx = ConsumerContext::new(tx, CancellationToken::new());
772
773        let result = consumer.start(ctx).await;
774        assert!(result.is_err());
775        let err_msg = result.unwrap_err().to_string();
776        assert!(
777            err_msg.contains("cannot be restarted") || err_msg.contains("after stop"),
778            "Expected restart error, got: {}",
779            err_msg
780        );
781    }
782
783    // SQL-021: batch mode per-row post-processing
784    #[tokio::test]
785    async fn batch_mode_per_row_post_processing() {
786        let pool = sqlite_pool().await;
787        seed_consumer_table(&pool).await;
788
789        let mut config = SqlEndpointConfig::from_uri(
790            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
791        )
792        .unwrap();
793        config.resolve_defaults();
794
795        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
796        let template = parse_query_template(&config.query, config.placeholder).unwrap();
797
798        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
799        tokio::spawn(async move {
800            while let Some(env) = rx.recv().await {
801                if let Some(reply_tx) = env.reply_tx {
802                    let _ = reply_tx.send(Ok(env.exchange));
803                }
804            }
805        });
806        let ctx = ConsumerContext::new(tx, CancellationToken::new());
807
808        consumer
809            .poll_database(&pool, &ctx, &template)
810            .await
811            .expect("poll must succeed");
812
813        // SQL-021: Each row should have been processed individually via onConsume
814        let row = sqlx::query("select processed from jobs where id = 1")
815            .fetch_one(&pool)
816            .await
817            .expect("row 1");
818        let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
819
820        let row = sqlx::query("select processed from jobs where id = 2")
821            .fetch_one(&pool)
822            .await
823            .expect("row 2");
824        let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
825
826        assert_eq!(
827            processed_1, 1,
828            "row 1 should be marked processed via per-row onConsume"
829        );
830        assert_eq!(
831            processed_2, 1,
832            "row 2 should be marked processed via per-row onConsume"
833        );
834    }
835
836    // SQL-021: batch mode per-row onConsumeFailed when downstream fails
837    #[tokio::test]
838    async fn batch_mode_per_row_post_processing_on_failure() {
839        let pool = sqlite_pool().await;
840        seed_consumer_table(&pool).await;
841
842        let mut config = SqlEndpointConfig::from_uri(
843            "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
844        )
845        .unwrap();
846        config.resolve_defaults();
847
848        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
849        let template = parse_query_template(&config.query, config.placeholder).unwrap();
850
851        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
852        tokio::spawn(async move {
853            while let Some(env) = rx.recv().await {
854                if let Some(reply_tx) = env.reply_tx {
855                    let _ =
856                        reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
857                }
858            }
859        });
860        let ctx = ConsumerContext::new(tx, CancellationToken::new());
861
862        consumer
863            .poll_database(&pool, &ctx, &template)
864            .await
865            .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
866
867        // SQL-021: Each row should have onConsumeFailed executed individually
868        let row = sqlx::query("select failed from jobs where id = 1")
869            .fetch_one(&pool)
870            .await
871            .expect("row 1");
872        let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
873
874        let row = sqlx::query("select failed from jobs where id = 2")
875            .fetch_one(&pool)
876            .await
877            .expect("row 2");
878        let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
879
880        assert_eq!(
881            failed_1, 1,
882            "row 1 should be marked failed via per-row onConsumeFailed"
883        );
884        assert_eq!(
885            failed_2, 1,
886            "row 2 should be marked failed via per-row onConsumeFailed"
887        );
888    }
889
890    #[tokio::test]
891    async fn bridge_error_handler_routes_poll_errors_to_exchange_error() {
892        let mut config = config();
893        config.bridge_error_handler = true;
894        let consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()));
895
896        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(4);
897        tokio::spawn(async move {
898            while let Some(env) = rx.recv().await {
899                assert!(env.exchange.error.is_some(), "exchange must carry error");
900                if let Some(reply_tx) = env.reply_tx {
901                    let _ = reply_tx.send(Ok(env.exchange));
902                }
903                break;
904            }
905        });
906
907        let ctx = ConsumerContext::new(tx, CancellationToken::new());
908        consumer
909            .bridge_poll_error(&ctx, CamelError::ProcessorError("poll failed".into()))
910            .await
911            .expect("bridging should succeed");
912    }
913
914    #[tokio::test]
915    async fn stream_list_consumer_emits_ndjson_body() {
916        let pool = sqlite_pool().await;
917        sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
918            .execute(&pool)
919            .await
920            .expect("create table");
921        sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')")
922            .execute(&pool)
923            .await
924            .expect("seed rows");
925
926        let mut config = SqlEndpointConfig::from_uri(
927            "sql:select id, name from items order by id?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
928        )
929        .unwrap();
930        config.resolve_defaults();
931
932        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
933        let template = parse_query_template(&config.query, config.placeholder).unwrap();
934
935        let (tx, rx) = mpsc::channel::<ExchangeEnvelope>(8);
936        let (result_tx, result_rx) = tokio::sync::oneshot::channel::<Exchange>();
937        tokio::spawn(async move {
938            let mut rx = rx;
939            if let Some(env) = rx.recv().await {
940                if let Some(reply_tx) = env.reply_tx {
941                    let _ = reply_tx.send(Ok(env.exchange.clone()));
942                }
943                let _ = result_tx.send(env.exchange);
944            }
945        });
946        let ctx = ConsumerContext::new(tx, CancellationToken::new());
947
948        consumer
949            .poll_database(&pool, &ctx, &template)
950            .await
951            .expect("poll must succeed");
952
953        let exchange = result_rx.await.expect("should have received one exchange");
954
955        match exchange.input.body {
956            Body::Stream(ref stream_body) => {
957                let stream = stream_body.stream.clone();
958                let mut guard = stream.lock().await;
959                let stream_opt = guard.take();
960                assert!(stream_opt.is_some(), "stream should be present");
961
962                use futures::StreamExt;
963                let mut collected = Vec::new();
964                let mut stream = stream_opt.unwrap();
965                while let Some(chunk) = stream.next().await {
966                    let chunk = chunk.expect("stream chunk should not error");
967                    collected.extend_from_slice(&chunk);
968                }
969
970                let ndjson = String::from_utf8(collected).expect("valid utf8");
971                let lines: Vec<&str> = ndjson.trim().lines().collect();
972                assert_eq!(lines.len(), 3, "should have 3 NDJSON lines");
973
974                let row0: serde_json::Value =
975                    serde_json::from_str(lines[0]).expect("valid json line 0");
976                assert_eq!(row0["id"], 1);
977                assert_eq!(row0["name"], "alpha");
978
979                let row1: serde_json::Value =
980                    serde_json::from_str(lines[1]).expect("valid json line 1");
981                assert_eq!(row1["id"], 2);
982                assert_eq!(row1["name"], "beta");
983
984                let row2: serde_json::Value =
985                    serde_json::from_str(lines[2]).expect("valid json line 2");
986                assert_eq!(row2["id"], 3);
987                assert_eq!(row2["name"], "gamma");
988            }
989            ref other => panic!("expected Body::Stream, got {:?}", other),
990        }
991    }
992
993    #[tokio::test]
994    async fn stream_list_consumer_empty_result_set_emits_empty_stream() {
995        let pool = sqlite_pool().await;
996        sqlx::query("CREATE TABLE empty_items (id INTEGER PRIMARY KEY, name TEXT)")
997            .execute(&pool)
998            .await
999            .expect("create table");
1000
1001        let mut config = SqlEndpointConfig::from_uri(
1002            "sql:select id, name from empty_items?db_url=sqlite::memory:&outputType=StreamList&initialDelay=0&delay=1",
1003        )
1004        .unwrap();
1005        config.resolve_defaults();
1006
1007        let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
1008        let template = parse_query_template(&config.query, config.placeholder).unwrap();
1009
1010        let (tx, rx) = tokio::sync::oneshot::channel();
1011        let (mpsc_tx, mut mpsc_rx) = mpsc::channel::<ExchangeEnvelope>(8);
1012        tokio::spawn(async move {
1013            while let Some(env) = mpsc_rx.recv().await {
1014                if let Some(reply_tx) = env.reply_tx {
1015                    let _ = reply_tx.send(Ok(env.exchange.clone()));
1016                }
1017                let _ = tx.send(env.exchange);
1018                break;
1019            }
1020        });
1021        let ctx = ConsumerContext::new(mpsc_tx, CancellationToken::new());
1022
1023        consumer
1024            .poll_database(&pool, &ctx, &template)
1025            .await
1026            .expect("poll must succeed");
1027
1028        let exchange = rx
1029            .await
1030            .expect("StreamList should emit exchange even for empty results");
1031
1032        match exchange.input.body {
1033            Body::Stream(ref stream_body) => {
1034                let stream = stream_body.stream.clone();
1035                let mut guard = stream.lock().await;
1036                let stream_opt = guard.take();
1037
1038                use futures::StreamExt;
1039                let mut count = 0;
1040                if let Some(mut stream) = stream_opt {
1041                    while let Some(chunk) = stream.next().await {
1042                        let chunk = chunk.expect("stream chunk should not error");
1043                        count += chunk.len();
1044                    }
1045                }
1046                assert_eq!(count, 0, "empty table should produce zero stream bytes");
1047            }
1048            ref other => panic!("expected Body::Stream, got {:?}", other),
1049        }
1050    }
1051}