picokafka 0.1.13

Kafka library for tarantool-module based on librdkafka.
Documentation
# PICOKAFKA

[![Latest Version]][crates.io] [![Docs badge]][docs.rs]

[Latest Version]: https://img.shields.io/crates/v/picokafka.svg
[crates.io]: https://crates.io/crates/picokafka

[Docs badge]: https://img.shields.io/badge/docs.rs-rustdoc-green
[docs.rs]: https://docs.rs/picokafka/

Kafka driver for distributed systems built with [tarantool-module](https://github.com/picodata/tarantool-module).

### Consumer

Create new consumer:
```rust
use picokafka::consumer;

let consumer = consumer::Builder::new("kafka:29092")
        .with_group("group_1")
        .append_topic("topic_1")
        .start();
```

You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with `Builder::with_opts` method:

```rust
    let consumer = consumer::Builder::new("kafka:29092")
        .with_opt("enable.auto.offset.store", "false")
        .with_session_timeout(Duration::from_secs(10))
        .start();
```

For handling consumer output use `Consumer::output` method:
```rust
    let consumed = consumer.output().collect::<Vec<_>>();
    consumed.iter().for_each(|received| {
        assert!(received.is_ok());
    });
```

Note that consumer prepare kafka records for output in separate tokio threads.

### Producer

Create new producer:

```rust
use picokafka::producer;

let producer = producer::Builder::new("kafka:29092")
    .with_message_timeout(Duration::from_secs(1))
    .build()
    .unwrap();
```

You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with `Builder::with_opts` method.

Send message:
```rust
producer.send(
    Record::new("topic_1")
        .key(String::from("key_1"))
        .payload(String::from("payload_1")),
    Duration::from_secs(1),
    move |result| {
        println!("send result: {:?}", result);
    },
);
```

Note that sent callback executed in separate tokio threads. If you want to use 
tarantool API - use `TarantoolProducer` instead of `Producer`.

#### TarantoolProducer
Create:
```rust
use picokafka::producer;

let producer = producer::Builder::new("kafka:29092")
    .with_message_timeout(Duration::from_secs(1))
    .build()
    .unwrap()
    .tarantool();
```

Send:
```rust
    producer.send(
        IdentifiedRecord::new(
            2,
            Record::new("topic_1")
                .key(String::from("key_1"))
                .payload(String::from("payload_1")),
        ),
        Duration::from_secs(1),
    );
```

Sent result handling:
```rust
    producer.output().for_each(|(descriptor, _)| {
        let descriptor = descriptor.downcast::<i32>().unwrap();
        assert!(*descriptor == 2);
    });
```

`TarantoolProducer` use `IdentifiedRecord` instead of `Record` because we need a way to distinguish messages in the output from each other.

## SSL and SASL

For enabling ssl and sasl protocols use a "ssl" feature. See auth test for familiarize with it.

## Statistic

Picokafka supports a statistic callbacks, use an own context implementation on producer/receiver for acquire it.
Note that all callbacks implemented in context will be executed in librdkafka threads, not in TX thread.
About statistic format: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md.
See test as example:
* producer - `test_producer_statistic`
* consumer - `test_consumer_statistic`

## Tests

You need start kafka before testing. You can use tests/docker-compose.yml file:
```bash
    docker run --rm -v $(pwd)/tests:/opt/kafka confluentinc/cp-kafka:latest /opt/kafka/setup_ssl.sh
    docker-compose -f tests/docker-compose.yml up -d
```

Or create your own environment (set KAFKA_ADDR and KAFKA_REST_ADDR if you do that).

Then run `cargo test` or `cargo test --features "ssl"` if you need ssl feature.

## Benchmarks

After starting kafka environment use `cargo bench` command.