Expand description
§Samsa
Rust-native Kafka/Redpanda protocol and client implementation.
This crate provides Rust native consumers and producers as well as low level bindings for the Apache Kafka protocol. Unlike crates that use librdkafka in an FFI, users of this crate will not need the C lib and will benefit from Rust all the way down; meaning memory safety, safe concurrency, low resource usage, and of course blazing speed.
§Goals
- Easy to understand code
- Leverage best in class libraries such as Tokio, Nom to do the heavy lifting
- Start with a robust foundation and add more advanced features over time
- Provide a pure rust implementation of the Kafka protocol
- Be a good building block for future works based around Kafka
§Table of contents
§Getting started
Install samsa to your rust project with cargo add samsa or include the following snippet in your Cargo.toml dependencies:
samsa = "0.1"This project includes Docker Compose files to help set up Redpanda and Kafka clusters to ease with testing. The easiest way to do this is to run docker-compose up to spin up a 2 broker Redpanda cluster. If you want to use different versions of Kafka, check out the DockerCompose.README.md
§Producer
A Producer sends messages to the given topic and partition.
It is buffered, with both a timeout and volume threshold that clears the buffer when reached. This is how letency and throughout can be tweaked to achieve the desired rates.
To instantiate one, it is easiest to use a Stream and the ProducerBuilder.
use samsa::prelude::*;
let bootstrap_addrs = vec![BrokerAddress {
host: "127.0.0.1".to_owned(),
port: 9092,
}];
let topic_name = "my-topic".to_string();
let partition_id = 0;
// create a stream of 5k messages in batches of 100
let stream = iter(0..5000).map(|_| ProduceMessage {
topic: topic_name.to_string(),
partition_id,
key: Some(bytes::Bytes::from_static(b"Tester")),
value: Some(bytes::Bytes::from_static(b"Value")),
headers: vec![
Header::new(String::from("Key"), bytes::Bytes::from("Value"))
],
}).chunks(100);
let output_stream =
ProducerBuilder::<TcpConnection>::new(bootstrap_addrs, vec![topic_name.to_string()])
.await?
.batch_timeout_ms(1000)
.max_batch_size(100)
.clone()
.build_from_stream(stream)
.await;
tokio::pin!(output_stream);
while (output_stream.next().await).is_some() {}§Consumer
A Consumer is used to fetch messages from the broker. It is an asynchronous iterator that can be configured to auto-commit. To instantiate one, start with a ConsumerBuilder.
use samsa::prelude::*;
let bootstrap_addrs = vec![BrokerAddress {
host: "127.0.0.1".to_owned(),
port: 9092,
}];
let partitions = vec![0];
let topic_name = "my-topic".to_string();
let assignment = TopicPartitionsBuilder::new()
.assign(topic_name, partitions)
.build();
let consumer = ConsumerBuilder::<TcpConnection>::new(
bootstrap_addrs,
assignment,
)
.await?
.build();
let stream = consumer.into_stream();
// have to pin streams before iterating
tokio::pin!(stream);
// Stream will do nothing unless consumed.
while let Some(batch) = stream.next().await {
println!("{:?} messages read", batch.unwrap().count());
}§Consumer group
You can set up a consumer group with a group id and an assignment. The offsets are commit automatically for the member of the group.
use samsa::prelude::*;
let bootstrap_addrs = vec![BrokerAddress {
host: "127.0.0.1".to_owned(),
port: 9092,
}];
let partitions = vec![0];
let topic_name = "my-topic".to_string();
let assignment = TopicPartitionsBuilder::new()
.assign(topic_name, partitions)
.build();
let group_id = "The Data Engineering Team".to_string();
let consumer_group_member = ConsumerGroupBuilder::<TcpConnection>::new(
bootstrap_addrs,
group_id,
assignment,
).await?
.build()
.await?;
let stream = consumer_group_member.into_stream();
// have to pin streams before iterating
tokio::pin!(stream);
// Stream will do nothing unless consumed.
while let Some(batch) = stream.next().await {
println!("{:?} messages read", batch.unwrap().count());
}§TLS support
You can add TLS support to your consumer or producer for secured communication. To enable this, start with specifying the TlsConnectionOptions,
and pass it into an instance of the ProducerBuilder or ConsumerBuilder.
Example for Consumer with TLS support:
use samsa::prelude::*;
let tls_option = TlsConnectionOptions {
broker_options: vec![BrokerAddress {
host: "127.0.0.1".to_owned(),
port: 9092,
}],
key: "/path_to_key_file".into(),
cert: "/path_to_cert_file".into(),
cafile: Some("/path_to_ca_file".into()),
};
let partitions = vec![0];
let topic_name = "my-topic".to_string();
let assignment = TopicPartitionsBuilder::new()
.assign(topic_name, partitions)
.build();
let consumer = ConsumerBuilder::<TlsConnection>::new(
tls_option,
assignment,
)
.await?
.build();
let stream = consumer.into_stream();
// have to pin streams before iterating
tokio::pin!(stream);
// Stream will do nothing unless consumed.
while let Some(batch) = stream.next().await {
println!("{:?} messages read", batch.unwrap().count());
}§Compression support
We provide support for compression in the producer using the Compression enum. The enum allows to specify what type of compression to use. The Consumer will automatically know to decompress the message.
Example for Producer with GZIP compression enabled:
use samsa::prelude::*;
let bootstrap_addrs = vec![BrokerAddress {
host: "127.0.0.1".to_owned(),
port: 9092,
}];
let topic_name = "my-topic".to_string();
let partition_id = 0;
// create a stream of 5k messages in batches of 100
let stream = iter(0..5000).map(|_| ProduceMessage {
topic: topic_name.to_string(),
partition_id,
key: Some(bytes::Bytes::from_static(b"Tester")),
value: Some(bytes::Bytes::from_static(b"Value")),
headers: vec![
samsa::prelude::Header::new(String::from("Key"), bytes::Bytes::from("Value"))
],
}).chunks(100);
let output_stream =
ProducerBuilder::<TcpConnection>::new(bootstrap_addrs, vec![topic_name.to_string()])
.await?
.batch_timeout_ms(1000)
.max_batch_size(100)
.compression(Compression::Gzip)
.clone()
.build_from_stream(stream)
.await;
tokio::pin!(output_stream);
while (output_stream.next().await).is_some() {}§SASL support
We include support for SASL using all typical mechanisms: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. This is represented as another type of BrokerConnection that our Consumers and Producers recieve as a generic parameter. All that is needed is to provide the credentials.
Example for Producer using both TLS and SASL:
use samsa::prelude::*;
let tls_config = TlsConnectionOptions {
broker_options: vec![BrokerAddress {
host: "127.0.0.1".to_owned(),
port: 9092,
}],
key: "/path_to_key_file".into(),
cert: "/path_to_cert_file".into(),
cafile: Some("/path_to_ca_file".into()),
};
let sasl_config = SaslConfig::new(String::from("myuser"), String::from("pass1234"), None, None);
let options = SaslTlsConfig {
tls_config,
sasl_config,
};
let topic_name = "atopic";
let s = ConsumerBuilder::<SaslTlsConnection>::new(
options,
TopicPartitionsBuilder::new()
.assign(topic_name.to_owned(), vec![0])
.build(),
)
.await
.unwrap()
.build()
.into_stream();
tokio::pin!(s);
while let Some(m) = s.next().await {
tracing::info!("{:?} messages read", m.unwrap().count());
}§Resources
Modules§
- prelude
- Main export of various structures and methods