# PICOKAFKA
[![Latest Version]][crates.io] [![Docs badge]][docs.rs]
[Latest Version]: https://img.shields.io/crates/v/picokafka.svg
[crates.io]: https://crates.io/crates/picokafka
[Docs badge]: https://img.shields.io/badge/docs.rs-rustdoc-green
[docs.rs]: https://docs.rs/picokafka/
Kafka driver for distributed systems built with [tarantool-module](https://github.com/picodata/tarantool-module).
### Consumer
Create new consumer:
```rust
use picokafka::consumer;
let consumer = consumer::Builder::new("kafka:29092")
.with_group("group_1")
.append_topic("topic_1")
.start();
```
You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with `Builder::with_opts` method:
```rust
let consumer = consumer::Builder::new("kafka:29092")
.with_opt("enable.auto.offset.store", "false")
.with_session_timeout(Duration::from_secs(10))
.start();
```
For handling consumer output use `Consumer::output` method:
```rust
let consumed = consumer.output().collect::<Vec<_>>();
consumed.iter().for_each(|received| {
assert!(received.is_ok());
});
```
Note that consumer prepare kafka records for output in separate tokio threads.
### Producer
Create new producer:
```rust
use picokafka::producer;
let producer = producer::Builder::new("kafka:29092")
.with_message_timeout(Duration::from_secs(1))
.build()
.unwrap();
```
You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with `Builder::with_opts` method.
Send message:
```rust
producer.send(
Record::new("topic_1")
.key(String::from("key_1"))
.payload(String::from("payload_1")),
Duration::from_secs(1),
move |result| {
println!("send result: {:?}", result);
},
);
```
Note that sent callback executed in separate tokio threads. If you want to use
tarantool API - use `TarantoolProducer` instead of `Producer`.
#### TarantoolProducer
Create:
```rust
use picokafka::producer;
let producer = producer::Builder::new("kafka:29092")
.with_message_timeout(Duration::from_secs(1))
.build()
.unwrap()
.tarantool();
```
Send:
```rust
producer.send(
IdentifiedRecord::new(
2,
Record::new("topic_1")
.key(String::from("key_1"))
.payload(String::from("payload_1")),
),
Duration::from_secs(1),
);
```
Sent result handling:
```rust
producer.output().for_each(|(descriptor, _)| {
let descriptor = descriptor.downcast::<i32>().unwrap();
assert!(*descriptor == 2);
});
```
`TarantoolProducer` use `IdentifiedRecord` instead of `Record` because we need a way to distinguish messages in the output from each other.
## SSL and SASL
For enabling ssl and sasl protocols use a "ssl" feature. See auth test for familiarize with it.
## Statistic
Picokafka supports a statistic callbacks, use an own context implementation on producer/receiver for acquire it.
Note that all callbacks implemented in context will be executed in librdkafka threads, not in TX thread.
About statistic format: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md.
See test as example:
* producer - `test_producer_statistic`
* consumer - `test_consumer_statistic`
## Tests
You need start kafka before testing. You can use tests/docker-compose.yml file:
```bash
docker run --rm -v $(pwd)/tests:/opt/kafka confluentinc/cp-kafka:latest /opt/kafka/setup_ssl.sh
docker-compose -f tests/docker-compose.yml up -d
```
Or create your own environment (set KAFKA_ADDR and KAFKA_REST_ADDR if you do that).
Then run `cargo test` or `cargo test --features "ssl"` if you need ssl feature.
## Benchmarks
After starting kafka environment use `cargo bench` command.