camel-component-sql 0.16.0

SQL component for rust-camel (PostgreSQL, MySQL, SQLite via sqlx)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
# camel-component-sql

> SQL component for rust-camel integration framework

## Overview

The SQL component provides comprehensive database integration for rust-camel, supporting both producer (query execution) and consumer (polling) patterns. It enables seamless interaction with SQL databases using parameterized queries, batch operations, and streaming results.

## Features

- **Producer Mode**: Execute SQL queries (SELECT, INSERT, UPDATE, DELETE)
- **Consumer Mode**: Poll database tables for new rows
- **Parameter Binding**: Named (`:#name`), positional (`#`), IN clause (`:#in:ids`), and expressions (`:#${body.field}`)
- **Batch Operations**: Execute multiple inserts/updates in a single transaction
- **Streaming Results**: Stream large result sets as NDJSON without loading all rows into memory
- **Post-Processing Hooks**: `onConsume`, `onConsumeFailed`, `onConsumeBatchComplete` for consumer workflows
- **Connection Pooling**: Configurable pool with min/max connections, idle timeout, and max lifetime
- **Async Native**: Built on `tokio` and `sqlx`
- **Health Check**: Async connection validation probe for SQL databases

## Installation

Add to your `Cargo.toml`:

```toml
[dependencies]
camel-component-sql = "*"
```

## URI Format

```
sql:<query>?datasource=<name>&options
sql:<query>?db_url=<database-url>&options
sql:file:<path-to-sql-file>?datasource=<name>&options
sql:file:<path-to-sql-file>?db_url=<database-url>&options
```

- `<query>`: SQL query with optional parameter placeholders
- `<path-to-sql-file>`: Path to a file containing the SQL query
- `<datasource>`: Named datasource from Camel.toml (alternative to `db_url`)
- `<database-url>`: Database connection URL

Either `datasource` or `db_url` is required (not both).

## URI Options

### Connection Options

| Option | Default | Description |
|--------|---------|-------------|
| `datasource` | - | Named datasource from Camel.toml (alternative to `db_url`) |
| `db_url` | (required if no `datasource`) | Database connection URL |
| `maxConnections` | `5` | Maximum connections in pool |
| `minConnections` | `1` | Minimum connections in pool |
| `idleTimeoutSecs` | `300` | Idle connection timeout in seconds |
| `maxLifetimeSecs` | `1800` | Maximum connection lifetime in seconds |

### Named Datasources

Instead of embedding `db_url` in every URI, define named datasources in `Camel.toml`:

```toml
[default.datasources.orders]
db_url = "postgres://user:pass@db:5432/orders"
max_connections = 20
min_connections = 2

[default.datasources.analytics]
db_url = "postgres://user:pass@db:5432/analytics"
max_connections = 5
```

Then reference by name in URIs:

```
sql:SELECT * FROM orders?datasource=orders
sql:INSERT INTO events VALUES (#, #)?datasource=analytics
```

**Benefits:**
- Shared connection pools across endpoints referencing the same datasource
- Credentials not exposed in route URIs
- Pool configuration centralized in one place
- Health checks per datasource via `PoolFactory::check()`

**Restrictions:** When using `datasource`, the following pool-affecting URI parameters are rejected (set them in `Camel.toml` instead):
`maxConnections`, `minConnections`, `idleTimeoutSecs`, `maxLifetimeSecs`, `sslMode`, `sslRootCert`, `sslCert`, `sslKey`

Non-pool parameters like `outputType`, `onConsume`, `delay` etc. work normally with named datasources.

**Backward compatible:** Inline `db_url` continues to work as before.

### Query Options

| Option | Default | Description |
|--------|---------|-------------|
| `outputType` | `SelectList` | Output format: `SelectList`, `SelectOne`, `StreamList` |
| `placeholder` | `#` | Character used for parameter placeholders |
| `inSeparator` | `", "` | Separator for IN clause expansion (e.g. `", "``1, 2, 3`) |
| `noop` | `false` | If true, preserve original body after DML operations |

### Consumer Options

| Option | Default | Description |
|--------|---------|-------------|
| `delay` | `500` | Polling delay in milliseconds |
| `initialDelay` | `1000` | Initial delay before first poll (ms) |
| `maxMessagesPerPoll` | - | Maximum rows to process per poll |
| `onConsume` | - | SQL to execute after successful row processing |
| `onConsumeFailed` | - | SQL to execute after failed row processing |
| `onConsumeBatchComplete` | - | SQL to execute after batch completes |
| `routeEmptyResultSet` | `false` | Process empty result sets |
| `useIterator` | `true` | Process rows individually (true) or as batch (false) |
| `expectedUpdateCount` | - | Expected rows affected (error if mismatch) |
| `breakBatchOnConsumeFail` | `false` | Stop batch processing on failure |

### Producer Options

| Option | Default | Description |
|--------|---------|-------------|
| `batch` | `false` | Enable batch mode (body must be array of arrays) |
| `useMessageBodyForSql` | `false` | Use message body as SQL query |

### SSL/TLS Options

| Option | Default | Description |
|--------|---------|-------------|
| `sslMode` | - | SSL mode (e.g. `require`, `verify-ca`, `verify-full`) |
| `sslRootCert` | - | Path to CA certificate for server verification |
| `sslCert` | - | Path to client certificate for mTLS |
| `sslKey` | - | Path to client private key for mTLS |

SSL parameters are appended as query string parameters to the database URL, with database-specific key names:

| URI Parameter | PostgreSQL Key | MySQL Key |
|---------------|---------------|-----------|
| `sslMode` | `sslmode` | `ssl-mode` |
| `sslRootCert` | `sslrootcert` | `ssl-ca` |
| `sslCert` | `sslcert` | `ssl-cert` |
| `sslKey` | `sslkey` | `ssl-key` |

## Parameter Binding

The SQL component supports multiple parameter placeholder styles:

### Positional Parameters (`#`)

```sql
INSERT INTO users (name, age) VALUES (#, #)
```

Body must be a JSON array: `["Alice", 30]`

### Named Parameters (`:#name`)

```sql
SELECT * FROM users WHERE id = :#id AND status = :#status
```

Values resolved from body (if JSON object), headers, or properties.

### IN Clause (`:#in:name`)

```sql
SELECT * FROM users WHERE id IN (:#in:ids)
```

Value must be a JSON array: `[1, 2, 3]` → expands to `IN ($1, $2, $3)`.

Customize the separator with `inSeparator` (default `", "`):

```
sql:SELECT * FROM users WHERE id IN (:#in:ids)?...&inSeparator=|
```

Produces: `IN ($1|$2|$3)`

### Expression Parameters (`:#${expr}`)

```sql
SELECT * FROM users WHERE id = :#${body.user_id} AND name = :#${header.userName}
```

Supported expressions: `body.field`, `header.name`, `property.key`

## Usage

### SELECT Query

```rust
use camel_builder::RouteBuilder;
use camel_component_sql::SqlComponent;

let mut ctx = CamelContext::new();
ctx.register_component(SqlComponent::new());

let route = RouteBuilder::from("direct:get-users")
    .to("sql:SELECT * FROM users?db_url=postgres://localhost/mydb")
    .build()?;
ctx.add_route_definition(route).await?;
```

### Parameterized Query

```rust
// Named parameters from body
let route = RouteBuilder::from("direct:get-user")
    .set_body(Body::Json(serde_json::json!({"id": 42})))
    .to("sql:SELECT * FROM users WHERE id = :#id?db_url=postgres://localhost/mydb")
    .build()?;

// Positional parameters from array body
let route = RouteBuilder::from("direct:insert")
    .set_body(Body::Json(serde_json::json!(["Alice", 30])))
    .to("sql:INSERT INTO users (name, age) VALUES (#, #)?db_url=postgres://localhost/mydb")
    .build()?;
```

### Streaming Large Results

```rust
// Stream results as NDJSON (memory efficient for large datasets)
let route = RouteBuilder::from("direct:export")
    .to("sql:SELECT * FROM large_table?db_url=postgres://localhost/mydb&outputType=StreamList")
    .to("file:/tmp/export.ndjson")
    .build()?;
```

### Batch Insert

```rust
// Batch insert with transaction
let route = RouteBuilder::from("direct:batch-insert")
    .set_body(Body::Json(serde_json::json!([
        ["Alice", 30],
        ["Bob", 25],
        ["Charlie", 35]
    ])))
    .to("sql:INSERT INTO users (name, age) VALUES (#, #)?db_url=postgres://localhost/mydb&batch=true")
    .build()?;
```

### Dynamic Query from Body

```rust
// Use message body as SQL
let route = RouteBuilder::from("direct:dynamic")
    .set_body(Body::Text("SELECT COUNT(*) FROM users".to_string()))
    .to("sql:placeholder?db_url=postgres://localhost/mydb&useMessageBodyForSql=true")
    .build()?;
```

### Polling Consumer

```rust
// Poll for pending orders and mark as processed
let route = RouteBuilder::from(
    "sql:SELECT * FROM orders WHERE status = 'pending'?\
     db_url=postgres://localhost/mydb\
     &delay=5000\
     &onConsume=UPDATE orders SET status = 'processed' WHERE id = :#id\
     &onConsumeFailed=UPDATE orders SET status = 'failed' WHERE id = :#id"
)
    .process(|ex| async move {
        // Process each order
        Ok(ex)
    })
    .build()?;
```

### Load Query from File

```rust
// Load SQL from external file
let route = RouteBuilder::from("direct:query")
    .to("sql:file:/etc/queries/get-users.sql?db_url=postgres://localhost/mydb")
    .build()?;
```

## Exchange Headers

### Input Headers

| Header | Description |
|--------|-------------|
| `CamelSql.Query` | Override the SQL query from URI |
| `CamelSql.Parameters` | Override parameters as JSON array |

### Output Headers

| Header | Direction | Description |
|--------|-----------|-------------|
| `CamelSql.RowCount` | out | Number of rows returned by SELECT |
| `CamelSql.UpdateCount` | out | Number of rows affected by INSERT/UPDATE/DELETE |

### Consumer Row Headers

When `useIterator=true`, each row's columns are also set as headers:

| Header Pattern | Description |
|----------------|-------------|
| `CamelSql.<column>` | Column value from current row (e.g., `CamelSql.id`, `CamelSql.name`) |

## Output Types

| Type | Description | Body Format |
|------|-------------|-------------|
| `SelectList` | All rows as array | `[{...}, {...}]` |
| `SelectOne` | First row only | `{...}` or empty |
| `StreamList` | Stream rows on demand | NDJSON stream (`{...}\n{...}\n`) |

## Example: Order Processing Pipeline

```rust
use camel_builder::RouteBuilder;
use camel_component_sql::SqlComponent;
use camel_core::CamelContext;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut ctx = CamelContext::new();
    ctx.register_component(SqlComponent::new());

    // Producer: Insert new orders
    let producer = RouteBuilder::from("direct:create-order")
        .process(|ex| async move {
            // Transform input to order record
            Ok(ex)
        })
        .to("sql:INSERT INTO orders (customer_id, total) VALUES (#, #)?db_url=postgres://localhost/mydb&expectedUpdateCount=1")
        .build()?;

    // Consumer: Process pending orders
    let consumer = RouteBuilder::from(
        "sql:SELECT * FROM orders WHERE status = 'pending' ORDER BY created_at?\
         db_url=postgres://localhost/mydb\
         &delay=2000\
         &maxMessagesPerPoll=10\
         &onConsume=UPDATE orders SET status = 'completed', processed_at = NOW() WHERE id = :#id"
    )
        .process(|ex| async move {
            let order_id = ex.input.header("CamelSql.id").and_then(|v| v.as_i64());
            println!("Processing order: {:?}", order_id);
            Ok(ex)
        })
        .build()?;

    ctx.add_route_definition(producer).await?;
    ctx.add_route_definition(consumer).await?;
    ctx.start().await?;

    Ok(())
}
```

## Global Configuration

Configure default connection pool settings in `Camel.toml` that apply to all SQL endpoints:

```toml
[default.components.sql]
max_connections = 5          # Maximum pool connections (default: 5)
min_connections = 1          # Minimum pool connections (default: 1)
idle_timeout_secs = 300      # Idle connection timeout (default: 300)
max_lifetime_secs = 1800     # Max connection lifetime (default: 1800)
ssl_mode = "require"         # SSL mode (optional)
ssl_root_cert = "/etc/ssl/ca.pem"  # CA certificate path (optional)
ssl_cert = "/etc/ssl/client.pem"   # Client certificate path (optional)
ssl_key = "/etc/ssl/client.key"    # Client key path (optional)
```

URI parameters always override global defaults:

```rust
// Uses global pool settings
.to("sql:SELECT * FROM users?db_url=postgres://localhost/mydb")

// Overrides maxConnections from global config
.to("sql:SELECT * FROM users?db_url=postgres://localhost/mydb&maxConnections=10")
```

### Profile-Specific Configuration

```toml
[default.components.sql]
max_connections = 5
min_connections = 1

[production.components.sql]
max_connections = 20
min_connections = 5
idle_timeout_secs = 600
```

## Health Check

The `camel-component-sql` component registers an async health check via `AsyncHealthCheck`.

- **Probe**: Executes a `SELECT 1` validation query against the connection pool
- **Healthy**: Connection pool returns a valid connection and query succeeds
- **Degraded**: Connection fails or validation query times out

Health checks are exposed via the health server:

```toml
[observability.health]
enabled = true
port = 8080
```

## Documentation

- [API Documentation]https://docs.rs/camel-component-sql
- [Repository]https://github.com/kennycallado/rust-camel

## License

Apache-2.0

## Contributing

Contributions are welcome! Please see the [main repository](https://github.com/kennycallado/rust-camel) for details.