Skip to main content

camel_component_sql/
consumer.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use serde_json::Value as JsonValue;
6use sqlx::AnyPool;
7use sqlx::any::AnyPoolOptions;
8use sqlx::any::AnyRow;
9use tokio::sync::OnceCell;
10use tracing::{error, info, warn};
11
12use camel_api::{Body, CamelError, Exchange, Message};
13use camel_component::{ConcurrencyModel, Consumer, ConsumerContext};
14
15use crate::config::SqlConfig;
16use crate::headers;
17use crate::query::{QueryTemplate, parse_query_template, resolve_params};
18use crate::utils::{bind_json_values, row_to_json};
19
20pub struct SqlConsumer {
21    pub(crate) config: SqlConfig,
22    pub(crate) pool: Arc<OnceCell<AnyPool>>,
23}
24
25impl SqlConsumer {
26    pub fn new(config: SqlConfig, pool: Arc<OnceCell<AnyPool>>) -> Self {
27        Self { config, pool }
28    }
29
30    /// Poll the database for new rows and process them.
31    async fn poll_database(
32        &self,
33        pool: &AnyPool,
34        context: &ConsumerContext,
35        template: &QueryTemplate,
36    ) -> Result<(), CamelError> {
37        // Create an empty exchange for parameter resolution (consumer has no input)
38        let empty_exchange = Exchange::new(Message::default());
39
40        // Resolve parameters
41        let prepared = resolve_params(template, &empty_exchange)?;
42
43        // Build and execute the query
44        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
45        let rows: Vec<AnyRow> = query
46            .fetch_all(pool)
47            .await
48            .map_err(|e| CamelError::ProcessorError(format!("Query execution failed: {}", e)))?;
49
50        // Check for empty result set
51        if rows.is_empty() && !self.config.route_empty_result_set {
52            return Ok(());
53        }
54
55        // Apply max_messages_per_poll limit
56        let rows_to_process: Vec<AnyRow> = if let Some(max) = self.config.max_messages_per_poll {
57            if max > 0 {
58                rows.into_iter().take(max as usize).collect()
59            } else {
60                rows
61            }
62        } else {
63            rows
64        };
65
66        if self.config.use_iterator {
67            // Process each row individually
68            for row in rows_to_process {
69                let row_json = row_to_json(&row)?;
70
71                // Create exchange with the row as JSON body
72                let mut msg = Message::new(Body::Json(row_json.clone()));
73
74                // Set individual column headers with CamelSql. prefix per Apache Camel convention
75                if let Some(obj) = row_json.as_object() {
76                    for (key, value) in obj {
77                        msg.set_header(format!("CamelSql.{}", key), value.clone());
78                    }
79                }
80
81                let exchange = Exchange::new(msg);
82
83                // Send and wait for processing
84                let result = context.send_and_wait(exchange).await;
85
86                // Handle post-processing (onConsume/onConsumeFailed)
87                if let Err(e) = self.handle_post_processing(pool, &result, &row_json).await {
88                    error!(error = %e, "Post-processing failed");
89                    if self.config.break_batch_on_consume_fail {
90                        return Err(e);
91                    }
92                }
93
94                // If downstream processing itself failed, honour break_batch_on_consume_fail
95                if let Err(ref consume_err) = result
96                    && self.config.break_batch_on_consume_fail
97                {
98                    return Err(consume_err.clone());
99                }
100            }
101        } else {
102            // Process all rows as a single batch
103            let rows_json: Vec<JsonValue> = rows_to_process
104                .iter()
105                .map(row_to_json)
106                .collect::<Result<Vec<_>, CamelError>>()?;
107
108            let row_count = rows_json.len();
109
110            // Create exchange with array of rows
111            let mut msg = Message::new(Body::Json(JsonValue::Array(rows_json)));
112            msg.set_header(headers::ROW_COUNT, JsonValue::Number(row_count.into()));
113
114            let exchange = Exchange::new(msg);
115
116            // Send and wait for result, then run post-processing with Null row
117            let result = context.send_and_wait(exchange).await;
118            if let Err(e) = self
119                .handle_post_processing(pool, &result, &JsonValue::Null)
120                .await
121            {
122                error!(error = %e, "Post-processing failed for batch");
123                if self.config.break_batch_on_consume_fail {
124                    return Err(e);
125                }
126            }
127            // If downstream processing itself failed, honour break_batch_on_consume_fail
128            if let Err(ref consume_err) = result
129                && self.config.break_batch_on_consume_fail
130            {
131                return Err(consume_err.clone());
132            }
133        }
134
135        // Execute on_consume_batch_complete if configured
136        if let Some(ref batch_query) = self.config.on_consume_batch_complete
137            && let Err(e) = self
138                .execute_post_query(pool, batch_query, &JsonValue::Null)
139                .await
140        {
141            error!(error = %e, "onConsumeBatchComplete query failed");
142        }
143
144        Ok(())
145    }
146
147    /// Handle post-processing after a row is processed (onConsume/onConsumeFailed).
148    async fn handle_post_processing(
149        &self,
150        pool: &AnyPool,
151        result: &Result<Exchange, CamelError>,
152        row_json: &JsonValue,
153    ) -> Result<(), CamelError> {
154        match result {
155            Ok(_) => {
156                // Success - execute onConsume if configured
157                if let Some(ref on_consume) = self.config.on_consume {
158                    self.execute_post_query(pool, on_consume, row_json).await?;
159                }
160            }
161            Err(_) => {
162                // Failure - execute onConsumeFailed if configured
163                if let Some(ref on_consume_failed) = self.config.on_consume_failed {
164                    self.execute_post_query(pool, on_consume_failed, row_json)
165                        .await?;
166                }
167            }
168        }
169        Ok(())
170    }
171
172    /// Execute a post-processing query with the row data as parameters.
173    async fn execute_post_query(
174        &self,
175        pool: &AnyPool,
176        query_str: &str,
177        row_json: &JsonValue,
178    ) -> Result<(), CamelError> {
179        // Parse the query template
180        let template = parse_query_template(query_str, self.config.placeholder)?;
181
182        // Create a temporary exchange with the row as body for parameter resolution
183        // Populate CamelSql.* headers so named params can reference them
184        let mut temp_msg = Message::new(Body::Json(row_json.clone()));
185        if let Some(obj) = row_json.as_object() {
186            for (key, value) in obj {
187                temp_msg.set_header(format!("CamelSql.{}", key), value.clone());
188            }
189        }
190        let temp_exchange = Exchange::new(temp_msg);
191
192        // Resolve parameters
193        let prepared = resolve_params(&template, &temp_exchange)?;
194
195        // Build and execute the query
196        let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
197        let result = query.execute(pool).await.map_err(|e| {
198            CamelError::ProcessorError(format!("Post-query execution failed: {}", e))
199        })?;
200
201        // Warn if 0 rows affected (the row may not have been marked correctly)
202        if result.rows_affected() == 0 {
203            warn!(
204                query = query_str,
205                "Post-processing query affected 0 rows — the row may not have been marked correctly"
206            );
207        }
208
209        Ok(())
210    }
211}
212
213#[async_trait]
214impl Consumer for SqlConsumer {
215    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
216        // Step 1: Initialize the connection pool
217        let pool = self
218            .pool
219            .get_or_try_init(|| async {
220                // Install all compiled-in sqlx drivers so AnyPool can resolve them.
221                // This is idempotent; safe to call multiple times.
222                sqlx::any::install_default_drivers();
223                AnyPoolOptions::new()
224                    .max_connections(self.config.max_connections)
225                    .min_connections(self.config.min_connections)
226                    .idle_timeout(Duration::from_secs(self.config.idle_timeout_secs))
227                    .max_lifetime(Duration::from_secs(self.config.max_lifetime_secs))
228                    .connect(&self.config.db_url)
229                    .await
230                    .map_err(|e| {
231                        CamelError::EndpointCreationFailed(format!(
232                            "Failed to connect to database: {}",
233                            e
234                        ))
235                    })
236            })
237            .await?;
238
239        // Warn if no onConsume configured
240        if self.config.on_consume.is_none() {
241            warn!(
242                "SQL consumer started without onConsume configured — consumed rows will not be marked/deleted"
243            );
244        }
245
246        // Step 2: Parse query template once (avoid re-parsing every poll)
247        let template = parse_query_template(&self.config.query, self.config.placeholder)
248            .map_err(|e| CamelError::Config(format!("Invalid query template: {}", e)))?;
249
250        // Step 3: Initial delay before starting polling
251        if self.config.initial_delay_ms > 0 {
252            tokio::select! {
253                _ = context.cancelled() => {
254                    info!("SQL consumer stopped during initial delay");
255                    return Ok(());
256                }
257                _ = tokio::time::sleep(Duration::from_millis(self.config.initial_delay_ms)) => {}
258            }
259        }
260
261        // Step 4: Polling loop
262        loop {
263            tokio::select! {
264                _ = context.cancelled() => {
265                    info!("SQL consumer stopped");
266                    break;
267                }
268                _ = tokio::time::sleep(Duration::from_millis(self.config.delay_ms)) => {
269                    if let Err(e) = self.poll_database(pool, &context, &template).await {
270                        error!(error = %e, "SQL consumer poll failed");
271                    }
272                }
273            }
274        }
275
276        Ok(())
277    }
278
279    async fn stop(&mut self) -> Result<(), CamelError> {
280        Ok(())
281    }
282
283    fn concurrency_model(&self) -> ConcurrencyModel {
284        // Sequential is correct for SQL consumers: concurrent polls would fetch
285        // duplicate rows. The design doc mentioned SharedState (which doesn't exist
286        // in this runtime) — Sequential is the correct equivalent.
287        ConcurrencyModel::Sequential
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294    use crate::config::SqlConfig;
295
296    fn test_config() -> SqlConfig {
297        SqlConfig::from_uri("sql:select * from t?db_url=postgres://localhost/test").unwrap()
298    }
299
300    #[test]
301    fn test_consumer_concurrency_model() {
302        let c = SqlConsumer::new(test_config(), Arc::new(OnceCell::new()));
303        assert_eq!(c.concurrency_model(), ConcurrencyModel::Sequential);
304    }
305
306    #[test]
307    fn test_consumer_stores_config() {
308        let config = SqlConfig::from_uri(
309            "sql:select * from t?db_url=postgres://localhost/test&delay=2000&onConsume=update t set done=true"
310        ).unwrap();
311        let c = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
312        assert_eq!(c.config.delay_ms, 2000);
313        assert!(c.config.on_consume.is_some());
314    }
315}