camel-component-sql
SQL component for rust-camel integration framework
Overview
The SQL component provides comprehensive database integration for rust-camel, supporting both producer (query execution) and consumer (polling) patterns. It enables seamless interaction with SQL databases using parameterized queries, batch operations, and streaming results.
Features
- Producer Mode: Execute SQL queries (SELECT, INSERT, UPDATE, DELETE)
- Consumer Mode: Poll database tables for new rows
- Parameter Binding: Named (
:#name), positional (#), IN clause (:#in:ids), and expressions (:#${body.field}) - Batch Operations: Execute multiple inserts/updates in a single transaction
- Streaming Results: Stream large result sets as NDJSON without loading all rows into memory
- Post-Processing Hooks:
onConsume,onConsumeFailed,onConsumeBatchCompletefor consumer workflows - Connection Pooling: Configurable pool with min/max connections, idle timeout, and max lifetime
- Async Native: Built on
tokioandsqlx
Installation
Add to your Cargo.toml:
[]
= "*"
URI Format
sql:<query>?db_url=<database-url>&options
sql:file:<path-to-sql-file>?db_url=<database-url>&options
<query>: SQL query with optional parameter placeholders<path-to-sql-file>: Path to a file containing the SQL query<database-url>: Database connection URL (required)
URI Options
Connection Options
| Option | Default | Description |
|---|---|---|
db_url |
(required) | Database connection URL |
maxConnections |
5 |
Maximum connections in pool |
minConnections |
1 |
Minimum connections in pool |
idleTimeoutSecs |
300 |
Idle connection timeout in seconds |
maxLifetimeSecs |
1800 |
Maximum connection lifetime in seconds |
Query Options
| Option | Default | Description |
|---|---|---|
outputType |
SelectList |
Output format: SelectList, SelectOne, StreamList |
placeholder |
# |
Character used for parameter placeholders |
separator |
; |
Statement separator for multi-statement queries |
noop |
false |
If true, preserve original body after DML operations |
Consumer Options
| Option | Default | Description |
|---|---|---|
delay |
500 |
Polling delay in milliseconds |
initialDelay |
1000 |
Initial delay before first poll (ms) |
maxMessagesPerPoll |
- | Maximum rows to process per poll |
onConsume |
- | SQL to execute after successful row processing |
onConsumeFailed |
- | SQL to execute after failed row processing |
onConsumeBatchComplete |
- | SQL to execute after batch completes |
routeEmptyResultSet |
false |
Process empty result sets |
useIterator |
true |
Process rows individually (true) or as batch (false) |
expectedUpdateCount |
- | Expected rows affected (error if mismatch) |
breakBatchOnConsumeFail |
false |
Stop batch processing on failure |
Producer Options
| Option | Default | Description |
|---|---|---|
batch |
false |
Enable batch mode (body must be array of arrays) |
useMessageBodyForSql |
false |
Use message body as SQL query |
Parameter Binding
The SQL component supports multiple parameter placeholder styles:
Positional Parameters (#)
INSERT INTO users (name, age) VALUES (#, #)
Body must be a JSON array: ["Alice", 30]
Named Parameters (:#name)
SELECT * FROM users WHERE id = :#id AND status = :#status
Values resolved from body (if JSON object), headers, or properties.
IN Clause (:#in:name)
SELECT * FROM users WHERE id IN (:#in:ids)
Value must be a JSON array: [1, 2, 3] → expands to IN ($1, $2, $3)
Expression Parameters (:#${expr})
SELECT * FROM users WHERE id = :#${body.user_id} AND name = :#${header.userName}
Supported expressions: body.field, header.name, property.key
Usage
SELECT Query
use RouteBuilder;
use SqlComponent;
let mut ctx = new;
ctx.register_component;
let route = from
.to
.build?;
ctx.add_route_definition.await?;
Parameterized Query
// Named parameters from body
let route = from
.set_body
.to
.build?;
// Positional parameters from array body
let route = from
.set_body
.to
.build?;
Streaming Large Results
// Stream results as NDJSON (memory efficient for large datasets)
let route = from
.to
.to
.build?;
Batch Insert
// Batch insert with transaction
let route = from
.set_body
.to
.build?;
Dynamic Query from Body
// Use message body as SQL
let route = from
.set_body
.to
.build?;
Polling Consumer
// Poll for pending orders and mark as processed
let route = from
.process
.build?;
Load Query from File
// Load SQL from external file
let route = from
.to
.build?;
Exchange Headers
Input Headers
| Header | Description |
|---|---|
CamelSql.Query |
Override the SQL query from URI |
CamelSql.Parameters |
Override parameters as JSON array |
Output Headers
| Header | Direction | Description |
|---|---|---|
CamelSql.RowCount |
out | Number of rows returned by SELECT |
CamelSql.UpdateCount |
out | Number of rows affected by INSERT/UPDATE/DELETE |
Consumer Row Headers
When useIterator=true, each row's columns are also set as headers:
| Header Pattern | Description |
|---|---|
CamelSql.<column> |
Column value from current row (e.g., CamelSql.id, CamelSql.name) |
Output Types
| Type | Description | Body Format |
|---|---|---|
SelectList |
All rows as array | [{...}, {...}] |
SelectOne |
First row only | {...} or empty |
StreamList |
Stream rows on demand | NDJSON stream ({...}\n{...}\n) |
Example: Order Processing Pipeline
use RouteBuilder;
use SqlComponent;
use CamelContext;
async
Global Configuration
Configure default connection pool settings in Camel.toml that apply to all SQL endpoints:
[]
= 5 # Maximum pool connections (default: 5)
= 1 # Minimum pool connections (default: 1)
= 300 # Idle connection timeout (default: 300)
= 1800 # Max connection lifetime (default: 1800)
URI parameters always override global defaults:
// Uses global pool settings
.to
// Overrides maxConnections from global config
.to
Profile-Specific Configuration
[]
= 5
= 1
[]
= 20
= 5
= 600
Documentation
License
Apache-2.0
Contributing
Contributions are welcome! Please see the main repository for details.