PICOKAFKA
Kafka driver for distributed systems built with tarantool-module. This driver use cbus channels for communication between tokio and tarantool threads. Please familiarize with it first.
Consumer
Create new consumer:
use Rc;
use consumer;
use ;
use Endpoint;
You can pass additional configuration parameters for librdkafka, see: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts
method:
use consumer;
let consumer = new
.with_opt
.with_session_timeout
.start;
Note that the callback executed in tarantool TX thread, in special fiber.
Producer
Create new producer:
use producer;
let producer = new
.with_message_timeout
.build
.unwrap;
You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts
method.
Send a message:
static SEEN_RESULT: AtomicBool = new;
producer.send;
Note that sent callback executed in tarantool TX thread, in special fiber.
SSL and SASL
For enabling ssl and sasl protocols use a "ssl" feature. See auth test for familiarize with it.
Statistic
Picokafka supports a statistic callbacks, use an own context implementation on producer/receiver for acquire it. Note that all callbacks implemented in context will be executed in librdkafka threads, not in TX thread. About statistic format: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md. See test as example:
- producer -
test_producer_statistic
- consumer -
test_consumer_statistic
Tests
You need start kafka before testing. You can use tests/docker-compose.yml file:
Or create your own environment (set KAFKA_ADDR and KAFKA_REST_ADDR if you do that).
Then use tarantool-test utilit:
cargo build
tarantool-test -p ./target/debug/libtests.so
You may want to skip auth tests if SASL is not configured in your kafka instance:
cargo build --features skip_ssl_test
tarantool-test -p ./target/debug/libtests.so
Benchmarks
Run benchmarks (using tarantool-runner util):
cargo build
tarantool-runner run -p ./target/debug/libbenches.so -e entrypoint
Result of produce 1000 messages:
producer_sync 10000 messages (1 samples)
[ave.] 32.461472ms
32.461472ms (>50%), 32.461472ms (>95%), 32.461472ms (>99%)
Result of consume 1_000_000 messages:
consume 1000000 messages (1 samples)
[ave.] 6.182463391s
6.182463391s (>50%), 6.182463391s (>95%), 6.182463391s (>99%)