# camel-component-sql
> SQL component for rust-camel integration framework
## Overview
The SQL component provides comprehensive database integration for rust-camel, supporting both producer (query execution) and consumer (polling) patterns. It enables seamless interaction with SQL databases using parameterized queries, batch operations, and streaming results.
## Features
- **Producer Mode**: Execute SQL queries (SELECT, INSERT, UPDATE, DELETE)
- **Consumer Mode**: Poll database tables for new rows
- **Parameter Binding**: Named (`:#name`), positional (`#`), IN clause (`:#in:ids`), and expressions (`:#${body.field}`)
- **Batch Operations**: Execute multiple inserts/updates in a single transaction
- **Streaming Results**: Stream large result sets as NDJSON without loading all rows into memory
- **Post-Processing Hooks**: `onConsume`, `onConsumeFailed`, `onConsumeBatchComplete` for consumer workflows
- **Connection Pooling**: Configurable pool with min/max connections, idle timeout, and max lifetime
- **Async Native**: Built on `tokio` and `sqlx`
## Installation
Add to your `Cargo.toml`:
```toml
[dependencies]
camel-component-sql = "*"
```
## URI Format
```
sql:<query>?db_url=<database-url>&options
sql:file:<path-to-sql-file>?db_url=<database-url>&options
```
- `<query>`: SQL query with optional parameter placeholders
- `<path-to-sql-file>`: Path to a file containing the SQL query
- `<database-url>`: Database connection URL (required)
## URI Options
### Connection Options
| `db_url` | (required) | Database connection URL |
| `maxConnections` | `5` | Maximum connections in pool |
| `minConnections` | `1` | Minimum connections in pool |
| `idleTimeoutSecs` | `300` | Idle connection timeout in seconds |
| `maxLifetimeSecs` | `1800` | Maximum connection lifetime in seconds |
### Query Options
| `outputType` | `SelectList` | Output format: `SelectList`, `SelectOne`, `StreamList` |
| `placeholder` | `#` | Character used for parameter placeholders |
| `separator` | `;` | Statement separator for multi-statement queries |
| `noop` | `false` | If true, preserve original body after DML operations |
### Consumer Options
| `delay` | `500` | Polling delay in milliseconds |
| `initialDelay` | `1000` | Initial delay before first poll (ms) |
| `maxMessagesPerPoll` | - | Maximum rows to process per poll |
| `onConsume` | - | SQL to execute after successful row processing |
| `onConsumeFailed` | - | SQL to execute after failed row processing |
| `onConsumeBatchComplete` | - | SQL to execute after batch completes |
| `routeEmptyResultSet` | `false` | Process empty result sets |
| `useIterator` | `true` | Process rows individually (true) or as batch (false) |
| `expectedUpdateCount` | - | Expected rows affected (error if mismatch) |
| `breakBatchOnConsumeFail` | `false` | Stop batch processing on failure |
### Producer Options
| `batch` | `false` | Enable batch mode (body must be array of arrays) |
| `useMessageBodyForSql` | `false` | Use message body as SQL query |
## Parameter Binding
The SQL component supports multiple parameter placeholder styles:
### Positional Parameters (`#`)
```sql
INSERT INTO users (name, age) VALUES (#, #)
```
Body must be a JSON array: `["Alice", 30]`
### Named Parameters (`:#name`)
```sql
SELECT * FROM users WHERE id = :#id AND status = :#status
```
Values resolved from body (if JSON object), headers, or properties.
### IN Clause (`:#in:name`)
```sql
SELECT * FROM users WHERE id IN (:#in:ids)
```
Value must be a JSON array: `[1, 2, 3]` → expands to `IN ($1, $2, $3)`
### Expression Parameters (`:#${expr}`)
```sql
SELECT * FROM users WHERE id = :#${body.user_id} AND name = :#${header.userName}
```
Supported expressions: `body.field`, `header.name`, `property.key`
## Usage
### SELECT Query
```rust
use camel_builder::RouteBuilder;
use camel_component_sql::SqlComponent;
let mut ctx = CamelContext::new();
ctx.register_component(SqlComponent::new());
let route = RouteBuilder::from("direct:get-users")
.to("sql:SELECT * FROM users?db_url=postgres://localhost/mydb")
.build()?;
ctx.add_route_definition(route)?;
```
### Parameterized Query
```rust
// Named parameters from body
let route = RouteBuilder::from("direct:get-user")
.set_body(Body::Json(serde_json::json!({"id": 42})))
.to("sql:SELECT * FROM users WHERE id = :#id?db_url=postgres://localhost/mydb")
.build()?;
// Positional parameters from array body
let route = RouteBuilder::from("direct:insert")
.set_body(Body::Json(serde_json::json!(["Alice", 30])))
.to("sql:INSERT INTO users (name, age) VALUES (#, #)?db_url=postgres://localhost/mydb")
.build()?;
```
### Streaming Large Results
```rust
// Stream results as NDJSON (memory efficient for large datasets)
let route = RouteBuilder::from("direct:export")
.to("sql:SELECT * FROM large_table?db_url=postgres://localhost/mydb&outputType=StreamList")
.to("file:/tmp/export.ndjson")
.build()?;
```
### Batch Insert
```rust
// Batch insert with transaction
let route = RouteBuilder::from("direct:batch-insert")
.set_body(Body::Json(serde_json::json!([
["Alice", 30],
["Bob", 25],
["Charlie", 35]
])))
.to("sql:INSERT INTO users (name, age) VALUES (#, #)?db_url=postgres://localhost/mydb&batch=true")
.build()?;
```
### Dynamic Query from Body
```rust
// Use message body as SQL
let route = RouteBuilder::from("direct:dynamic")
.set_body(Body::Text("SELECT COUNT(*) FROM users".to_string()))
.to("sql:placeholder?db_url=postgres://localhost/mydb&useMessageBodyForSql=true")
.build()?;
```
### Polling Consumer
```rust
// Poll for pending orders and mark as processed
let route = RouteBuilder::from(
"sql:SELECT * FROM orders WHERE status = 'pending'?\
db_url=postgres://localhost/mydb\
&delay=5000\
&onConsume=UPDATE orders SET status = 'processed' WHERE id = :#id\
&onConsumeFailed=UPDATE orders SET status = 'failed' WHERE id = :#id"
)
.process(|ex| async move {
// Process each order
Ok(ex)
})
.build()?;
```
### Load Query from File
```rust
// Load SQL from external file
let route = RouteBuilder::from("direct:query")
.to("sql:file:/etc/queries/get-users.sql?db_url=postgres://localhost/mydb")
.build()?;
```
## Exchange Headers
### Input Headers
| `CamelSql.Query` | Override the SQL query from URI |
| `CamelSql.Parameters` | Override parameters as JSON array |
### Output Headers
| `CamelSql.RowCount` | out | Number of rows returned by SELECT |
| `CamelSql.UpdateCount` | out | Number of rows affected by INSERT/UPDATE/DELETE |
### Consumer Row Headers
When `useIterator=true`, each row's columns are also set as headers:
| `CamelSql.<column>` | Column value from current row (e.g., `CamelSql.id`, `CamelSql.name`) |
## Output Types
| `SelectList` | All rows as array | `[{...}, {...}]` |
| `SelectOne` | First row only | `{...}` or empty |
| `StreamList` | Stream rows on demand | NDJSON stream (`{...}\n{...}\n`) |
## Example: Order Processing Pipeline
```rust
use camel_builder::RouteBuilder;
use camel_component_sql::SqlComponent;
use camel_core::CamelContext;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut ctx = CamelContext::new();
ctx.register_component(SqlComponent::new());
// Producer: Insert new orders
let producer = RouteBuilder::from("direct:create-order")
.process(|ex| async move {
// Transform input to order record
Ok(ex)
})
.to("sql:INSERT INTO orders (customer_id, total) VALUES (#, #)?db_url=postgres://localhost/mydb&expectedUpdateCount=1")
.build()?;
// Consumer: Process pending orders
let consumer = RouteBuilder::from(
"sql:SELECT * FROM orders WHERE status = 'pending' ORDER BY created_at?\
db_url=postgres://localhost/mydb\
&delay=2000\
&maxMessagesPerPoll=10\
&onConsume=UPDATE orders SET status = 'completed', processed_at = NOW() WHERE id = :#id"
)
.process(|ex| async move {
let order_id = ex.input.header("CamelSql.id").and_then(|v| v.as_i64());
println!("Processing order: {:?}", order_id);
Ok(ex)
})
.build()?;
ctx.add_route_definition(producer)?;
ctx.add_route_definition(consumer)?;
ctx.start().await?;
Ok(())
}
```
## Documentation
- [API Documentation](https://docs.rs/camel-component-sql)
- [Repository](https://github.com/kennycallado/rust-camel)
## License
Apache-2.0
## Contributing
Contributions are welcome! Please see the [main repository](https://github.com/kennycallado/rust-camel) for details.