PICOKAFKA
Kafka driver for distributed systems built with tarantool-module.
Consumer
Create new consumer:
use consumer;
let consumer = new
.with_group
.append_topic
.start;
You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts method:
let consumer = new
.with_opt
.with_session_timeout
.start;
For handling consumer output use Consumer::output method:
let consumed = consumer.output.;
consumed.iter.for_each;
Note that consumer prepare kafka records for output in separate tokio threads.
Producer
Create new producer:
use producer;
let producer = new
.with_message_timeout
.build
.unwrap;
You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts method.
Send message:
producer.send;
Note that sent callback executed in separate tokio threads. If you want to use
tarantool API - use TarantoolProducer instead of Producer.
TarantoolProducer
Create:
use producer;
let producer = new
.with_message_timeout
.build
.unwrap
.tarantool;
Send:
producer.send;
Sent result handling:
producer.output.for_each;
TarantoolProducer use IdentifiedRecord instead of Record because we need a way to distinguish messages in the output from each other.
SSL and SASL
For enabling ssl and sasl protocols use a "ssl" feature. See auth test for familiarize with it.
Statistic
Picokafka supports a statistic callbacks, use an own context implementation on producer/receiver for acquire it. Note that all callbacks implemented in context will be executed in librdkafka threads, not in TX thread. About statistic format: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md. See test as example:
- producer -
test_producer_statistic - consumer -
test_consumer_statistic
Tests
You need start kafka before testing. You can use tests/docker-compose.yml file:
Or create your own environment (set KAFKA_ADDR and KAFKA_REST_ADDR if you do that).
Then run cargo test or cargo test --features "ssl" if you need ssl feature.
Benchmarks
After starting kafka environment use cargo bench command.