camel-component-kafka
Kafka component for rust-camel.
URI Format
kafka:topic[?param=value&...]
Parameters
| Parameter | Default | Description |
|---|---|---|
brokers |
localhost:9092 |
Bootstrap servers |
groupId |
camel |
Consumer group ID |
autoOffsetReset |
latest |
earliest/latest/none |
sessionTimeoutMs |
45000 |
Consumer session timeout (ms) |
pollTimeoutMs |
5000 |
Poll timeout (ms) |
maxPollRecords |
500 |
Max records per poll |
acks |
all |
Producer acknowledgment: 0/1/all |
requestTimeoutMs |
30000 |
Producer delivery timeout (ms) |
allowManualCommit |
false |
Enable manual offset commit via KafkaManualCommit |
securityProtocol |
— | PLAINTEXT/SSL/SASL_PLAINTEXT/SASL_SSL |
saslAuthType |
— | PLAIN/SCRAM_SHA_256/SCRAM_SHA_512/SSL |
saslUsername |
— | SASL username (required when saslAuthType is PLAIN/SCRAM) |
saslPassword |
— | SASL password (required when saslAuthType is PLAIN/SCRAM) |
sslKeystoreLocation |
— | Path to client keystore (PEM/PKCS12) |
sslTruststoreLocation |
— | Path to CA truststore |
Headers
| Header | Direction | Description |
|---|---|---|
CamelKafkaTopic |
In/Out | Topic name |
CamelKafkaPartition |
In/Out | Partition number |
CamelKafkaOffset |
Out (consumer) | Message offset |
CamelKafkaKey |
In/Out | Message key (UTF-8 only; binary keys are logged as warnings and dropped) |
CamelKafkaTimestamp |
Out (consumer) | Epoch milliseconds timestamp |
CamelKafkaGroupId |
Out (consumer) | Consumer group ID |
CamelKafkaRecordMetadata |
Out (producer) | {topic, partition, offset} delivery ack |
Exchange Properties
| Key | Description |
|---|---|
kafka.manual.commit |
JSON object: {topic, partition, offset} of the consumed record |
kafka.manual_commit |
KafkaManualCommit handle (present when allowManualCommit=true); call .commit() or .commit_async() to ack the offset |
Security
# SASL/SCRAM over TLS
kafka:orders?securityProtocol=SASL_SSL&saslAuthType=SCRAM_SHA_512&saslUsername=user&saslPassword=pass
# TLS client certificate
kafka:orders?securityProtocol=SSL&sslKeystoreLocation=/certs/client.pem&sslTruststoreLocation=/certs/ca.pem
Manual Offset Commit
let route = from
.process
.build?;
Quick Start
use KafkaComponent;
ctx.register_component;
// Consumer
let route = from
.to
.build?;
// Producer
let route = from
.set_body
.to
.build?;
Running the Example
# Start Kafka
# Run the example
Integration Tests
KAFKA_BROKERS=localhost:9092
Global Configuration
Configure default Kafka settings in Camel.toml that apply to all Kafka endpoints:
[]
= "localhost:9092" # Bootstrap servers (default: localhost:9092)
= "camel" # Consumer group ID (default: camel)
= 45000 # Consumer session timeout (default: 45000)
= 30000 # Producer request timeout (default: 30000)
= "latest" # Offset reset policy (default: latest)
= "plaintext" # Security protocol (default: plaintext)
URI parameters always override global defaults:
// Uses global brokers (localhost:9092) but overrides groupId
.to
// Overrides both global settings
.to
Profile-Specific Configuration
[]
= "localhost:9092"
= "camel-dev"
[]
= "kafka-prod-1:9092,kafka-prod-2:9092"
= "camel-prod"
= "SASL_SSL"
Known Limitations
- Batch consumption mode not implemented
- Multiple parallel consumers (
consumersCount) not implemented