pub use crate::compression::Compression;
pub use crate::protocol::create_topics::{CreateTopicsResponseData, TopicConfig, TopicResult};
pub use crate::protocol::delete_topics::{DeleteTopicResult, DeleteTopicsResponseData};
#[cfg(feature = "producer_timestamp")]
pub use crate::protocol::produce::ProducerTimestamp;
pub use crate::utils::PartitionOffset;
use crate::utils::TimestampedPartitionOffset;
use std::collections::hash_map::HashMap;
use std::time::Duration;
#[cfg(feature = "security")]
pub use crate::network::{SaslConfig, SecurityConfig};
#[cfg(feature = "security")]
pub use crate::tls::TlsConfig;
use crate::error::{Error, KafkaCode, Result};
use crate::protocol;
pub mod builder;
pub mod config;
pub(crate) mod fetch_ops;
mod internals;
pub mod metadata;
pub(crate) mod metadata_ops;
pub(crate) mod offset_ops;
pub(crate) mod produce_ops;
mod state;
pub(crate) mod transport;
use crate::network;
#[allow(clippy::wildcard_imports)]
pub use config::*;
pub(crate) use internals::KafkaClientInternals;
pub mod fetch_kp {
pub use crate::protocol::fetch::{
OwnedData, OwnedFetchResponse, OwnedMessage, OwnedPartition, OwnedTopic,
};
}
pub mod fetch {
pub use crate::protocol::fetch::OwnedFetchResponse as Response;
pub use crate::protocol::fetch::{OwnedData, OwnedMessage, OwnedPartition, OwnedTopic};
}
use config::ClientConfig;
#[derive(Debug, Copy, Clone)]
pub enum FetchOffset {
Earliest,
Latest,
ByTime(i64),
}
impl FetchOffset {
fn to_kafka_value(self) -> i64 {
match self {
FetchOffset::Earliest => -2,
FetchOffset::Latest => -1,
FetchOffset::ByTime(n) => n,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum GroupOffsetStorage {
Zookeeper,
Kafka,
}
#[derive(Debug)]
pub struct FetchGroupOffset<'a> {
pub topic: &'a str,
pub partition: i32,
}
impl<'a> FetchGroupOffset<'a> {
#[inline]
#[must_use]
pub fn new(topic: &'a str, partition: i32) -> Self {
FetchGroupOffset { topic, partition }
}
}
impl<'a> AsRef<FetchGroupOffset<'a>> for FetchGroupOffset<'a> {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Debug)]
pub struct CommitOffset<'a> {
pub offset: i64,
pub topic: &'a str,
pub partition: i32,
}
impl<'a> CommitOffset<'a> {
#[must_use]
pub fn new(topic: &'a str, partition: i32, offset: i64) -> Self {
CommitOffset {
offset,
topic,
partition,
}
}
}
impl<'a> AsRef<CommitOffset<'a>> for CommitOffset<'a> {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Debug, Copy, Clone)]
pub enum RequiredAcks {
None = 0,
One = 1,
All = -1,
}
#[derive(Debug)]
pub struct ProduceMessage<'a, 'b> {
pub key: Option<&'b [u8]>,
pub value: Option<&'b [u8]>,
pub topic: &'a str,
pub partition: i32,
pub headers: &'b [(String, bytes::Bytes)],
}
impl<'a, 'b> AsRef<ProduceMessage<'a, 'b>> for ProduceMessage<'a, 'b> {
fn as_ref(&self) -> &Self {
self
}
}
impl<'a, 'b> ProduceMessage<'a, 'b> {
#[must_use]
pub fn new(
topic: &'a str,
partition: i32,
key: Option<&'b [u8]>,
value: Option<&'b [u8]>,
) -> Self {
ProduceMessage {
key,
value,
topic,
partition,
headers: &[],
}
}
}
#[derive(Debug)]
pub struct FetchPartition<'a> {
pub topic: &'a str,
pub offset: i64,
pub partition: i32,
pub max_bytes: i32,
}
impl<'a> FetchPartition<'a> {
#[must_use]
pub fn new(topic: &'a str, partition: i32, offset: i64) -> Self {
FetchPartition {
topic,
partition,
offset,
max_bytes: -1,
}
}
#[must_use]
pub fn with_max_bytes(mut self, max_bytes: i32) -> Self {
self.max_bytes = max_bytes;
self
}
}
impl<'a> AsRef<FetchPartition<'a>> for FetchPartition<'a> {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Debug)]
pub struct ProduceConfirm {
pub topic: String,
pub partition_confirms: Vec<ProducePartitionConfirm>,
}
#[derive(Debug)]
pub struct ProducePartitionConfirm {
pub offset: std::result::Result<i64, KafkaCode>,
pub partition: i32,
}
#[derive(Debug)]
pub struct KafkaClient {
config: ClientConfig,
conn_pool: network::Connections,
state: state::ClientState,
api_versions: crate::protocol::api_versions::ApiVersionCache,
}
impl KafkaClient {
pub fn builder() -> builder::KafkaClientBuilder {
builder::KafkaClientBuilder::new()
}
#[must_use]
pub fn new(hosts: Vec<String>) -> KafkaClient {
Self::builder().with_hosts(hosts).build()
}
#[cfg(feature = "security")]
#[must_use]
pub fn new_secure(hosts: Vec<String>, security: SecurityConfig) -> KafkaClient {
Self::builder()
.with_hosts(hosts)
.with_security(security)
.build()
}
#[inline]
#[must_use]
pub fn hosts(&self) -> &[String] {
&self.config.hosts
}
pub fn set_client_id(&mut self, client_id: String) {
self.config.client_id = client_id;
}
#[must_use]
pub fn client_id(&self) -> &str {
&self.config.client_id
}
#[inline]
pub fn set_compression(&mut self, compression: Compression) {
self.config.compression = compression;
}
#[inline]
#[must_use]
pub fn compression(&self) -> Compression {
self.config.compression
}
#[inline]
pub fn set_fetch_max_wait_time(&mut self, max_wait_time: Duration) -> Result<()> {
self.config.fetch.max_wait_time = protocol::to_millis_i32(max_wait_time)?;
Ok(())
}
#[inline]
#[must_use]
pub fn fetch_max_wait_time(&self) -> Duration {
let millis = u64::try_from(self.config.fetch.max_wait_time).unwrap_or_default();
Duration::from_millis(millis)
}
#[inline]
pub fn set_fetch_min_bytes(&mut self, min_bytes: i32) {
self.config.fetch.min_bytes = min_bytes;
}
#[inline]
#[must_use]
pub fn fetch_min_bytes(&self) -> i32 {
self.config.fetch.min_bytes
}
#[inline]
pub fn set_fetch_max_bytes_per_partition(&mut self, max_bytes: i32) {
self.config.fetch.max_bytes_per_partition = max_bytes;
}
#[inline]
#[must_use]
pub fn fetch_max_bytes_per_partition(&self) -> i32 {
self.config.fetch.max_bytes_per_partition
}
#[inline]
pub fn set_fetch_crc_validation(&mut self, validate_crc: bool) {
self.config.fetch.crc_validation = validate_crc;
}
#[inline]
#[must_use]
pub fn fetch_crc_validation(&self) -> bool {
self.config.fetch.crc_validation
}
#[inline]
pub fn set_group_offset_storage(&mut self, storage: Option<GroupOffsetStorage>) {
self.config.offset_storage = storage;
}
#[must_use]
pub fn group_offset_storage(&self) -> Option<GroupOffsetStorage> {
self.config.offset_storage
}
#[inline]
pub fn set_retry_backoff_time(&mut self, time: Duration) {
match &mut self.config.retry.policy {
config::RetryPolicy::Exponential { initial, .. } => *initial = time,
config::RetryPolicy::Fixed { interval, .. } => *interval = time,
config::RetryPolicy::None => {}
}
}
#[inline]
#[must_use]
pub fn retry_max_attempts(&self) -> u32 {
self.config.retry.policy.max_attempts()
}
#[inline]
pub fn set_connection_idle_timeout(&mut self, timeout: Duration) {
self.conn_pool.set_idle_timeout(timeout);
}
#[inline]
#[must_use]
pub fn connection_idle_timeout(&self) -> Duration {
self.conn_pool.idle_timeout()
}
#[cfg(feature = "producer_timestamp")]
#[inline]
pub fn set_producer_timestamp(&mut self, producer_timestamp: Option<ProducerTimestamp>) {
self.config.producer_timestamp = producer_timestamp;
}
#[cfg(feature = "producer_timestamp")]
#[inline]
#[must_use]
pub fn producer_timestamp(&self) -> Option<ProducerTimestamp> {
self.config.producer_timestamp
}
#[inline]
#[must_use]
pub fn topics(&self) -> metadata::Topics<'_> {
metadata::Topics::new(self)
}
#[inline]
pub fn load_metadata_all(&mut self) -> Result<()> {
metadata_ops::load_metadata_all(self)
}
#[inline]
pub fn load_metadata<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<()> {
metadata_ops::load_metadata(self, topics)
}
pub fn load_metadata_kp<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<()> {
metadata_ops::load_metadata_kp(self, topics)
}
#[inline]
pub fn reset_metadata(&mut self) {
metadata_ops::reset_metadata(self);
}
pub fn fetch_offsets<T: AsRef<str>>(
&mut self,
topics: &[T],
offset: FetchOffset,
) -> Result<HashMap<String, Vec<PartitionOffset>>> {
metadata_ops::fetch_offsets(self, topics, offset)
}
pub fn list_offsets<T: AsRef<str>>(
&mut self,
topics: &[T],
offset: FetchOffset,
) -> Result<HashMap<String, Vec<TimestampedPartitionOffset>>> {
metadata_ops::list_offsets(self, topics, offset)
}
pub fn fetch_topic_offsets<T: AsRef<str>>(
&mut self,
topic: T,
offset: FetchOffset,
) -> Result<Vec<PartitionOffset>> {
metadata_ops::fetch_topic_offsets(self, topic, offset)
}
pub fn fetch_offsets_kp<T: AsRef<str>>(
&mut self,
topics: &[T],
offset: FetchOffset,
) -> Result<HashMap<String, Vec<PartitionOffset>>> {
metadata_ops::fetch_offsets_kp(self, topics, offset)
}
pub fn create_topics(
&mut self,
topics: &[TopicConfig],
timeout: Duration,
) -> Result<CreateTopicsResponseData> {
let correlation_id = self.state.next_correlation_id();
let timeout_ms = protocol::to_millis_i32(timeout)?;
let now = std::time::Instant::now();
let hosts = self.config.hosts.clone();
let mut last_err: Option<Error> = None;
for host in hosts {
let conn = match self.conn_pool.get_conn(&host, now) {
Ok(conn) => conn,
Err(e) => {
last_err = Some(e.with_broker_context(&host, "CreateTopics"));
continue;
}
};
match crate::protocol::create_topics::fetch_create_topics(
conn,
correlation_id,
&self.config.client_id,
topics,
timeout_ms,
) {
Ok(resp) => return Ok(resp),
Err(e) => last_err = Some(e.with_broker_context(&host, "CreateTopics")),
}
}
Err(last_err.unwrap_or_else(Error::no_host_reachable))
}
pub fn delete_topics(
&mut self,
topic_names: &[&str],
timeout: Duration,
) -> Result<DeleteTopicsResponseData> {
let correlation_id = self.state.next_correlation_id();
let timeout_ms = protocol::to_millis_i32(timeout)?;
let now = std::time::Instant::now();
let hosts = self.config.hosts.clone();
let mut last_err: Option<Error> = None;
for host in hosts {
let conn = match self.conn_pool.get_conn(&host, now) {
Ok(conn) => conn,
Err(e) => {
last_err = Some(e.with_broker_context(&host, "DeleteTopics"));
continue;
}
};
match crate::protocol::delete_topics::fetch_delete_topics(
conn,
correlation_id,
&self.config.client_id,
topic_names,
timeout_ms,
) {
Ok(resp) => return Ok(resp),
Err(e) => last_err = Some(e.with_broker_context(&host, "DeleteTopics")),
}
}
Err(last_err.unwrap_or_else(Error::no_host_reachable))
}
pub fn fetch_messages<'a, I, J>(
&mut self,
input: I,
) -> Result<Vec<fetch_kp::OwnedFetchResponse>>
where
J: AsRef<FetchPartition<'a>>,
I: IntoIterator<Item = J>,
{
self.fetch_messages_kp(input)
}
pub fn fetch_messages_for_partition(
&mut self,
req: &FetchPartition<'_>,
) -> Result<Vec<fetch_kp::OwnedFetchResponse>> {
self.fetch_messages_kp([req])
}
pub fn fetch_messages_kp<'a, I, J>(
&mut self,
input: I,
) -> Result<Vec<fetch_kp::OwnedFetchResponse>>
where
J: AsRef<FetchPartition<'a>>,
I: IntoIterator<Item = J>,
{
let correlation = self.state.next_correlation_id();
fetch_ops::fetch_messages_kp(
&mut self.conn_pool,
&mut self.state,
&self.config,
correlation,
input,
)
}
pub fn produce_messages<'a, 'b, I, J>(
&mut self,
acks: RequiredAcks,
ack_timeout: Duration,
messages: I,
) -> Result<Vec<ProduceConfirm>>
where
J: AsRef<ProduceMessage<'a, 'b>>,
I: IntoIterator<Item = J>,
{
self.produce_messages_kp(acks, ack_timeout, messages)
}
pub fn produce_messages_kp<'a, 'b, I, J>(
&mut self,
acks: RequiredAcks,
ack_timeout: Duration,
messages: I,
) -> Result<Vec<ProduceConfirm>>
where
J: AsRef<ProduceMessage<'a, 'b>>,
I: IntoIterator<Item = J>,
{
produce_ops::internal_produce_messages_kp(
&mut self.conn_pool,
&mut self.state,
&self.config,
acks,
ack_timeout,
messages,
)
}
pub fn commit_offsets<'a, J, I>(&mut self, group: &str, offsets: I) -> Result<()>
where
J: AsRef<CommitOffset<'a>>,
I: IntoIterator<Item = J>,
{
self.commit_offsets_kp(group, offsets)
}
pub fn commit_offset(
&mut self,
group: &str,
topic: &str,
partition: i32,
offset: i64,
) -> Result<()> {
self.commit_offset_kp(group, topic, partition, offset)
}
pub fn fetch_group_offsets<'a, J, I>(
&mut self,
group: &str,
partitions: I,
) -> Result<HashMap<String, Vec<PartitionOffset>>>
where
J: AsRef<FetchGroupOffset<'a>>,
I: IntoIterator<Item = J>,
{
self.fetch_group_offsets_kp(group, partitions)
}
pub fn fetch_group_topic_offset(
&mut self,
group: &str,
topic: &str,
) -> Result<Vec<PartitionOffset>> {
self.fetch_group_topic_offset_kp(group, topic)
}
pub fn commit_offsets_kp<'a, J, I>(&mut self, group: &str, offsets: I) -> Result<()>
where
J: AsRef<CommitOffset<'a>>,
I: IntoIterator<Item = J>,
{
let correlation_id = self.state.next_correlation_id();
offset_ops::commit_offsets_kp(
offsets,
group,
correlation_id,
&self.config.client_id,
&mut self.state,
&mut self.conn_pool,
&self.config,
)
}
pub fn commit_offset_kp(
&mut self,
group: &str,
topic: &str,
partition: i32,
offset: i64,
) -> Result<()> {
self.commit_offsets_kp(group, &[CommitOffset::new(topic, partition, offset)])
}
pub fn fetch_group_offsets_kp<'a, J, I>(
&mut self,
group: &str,
partitions: I,
) -> Result<HashMap<String, Vec<PartitionOffset>>>
where
J: AsRef<FetchGroupOffset<'a>>,
I: IntoIterator<Item = J>,
{
let correlation_id = self.state.next_correlation_id();
offset_ops::fetch_group_offsets_kp(
partitions,
group,
correlation_id,
&self.config.client_id,
&mut self.state,
&mut self.conn_pool,
&self.config,
)
}
pub fn fetch_group_topic_offset_kp(
&mut self,
group: &str,
topic: &str,
) -> Result<Vec<PartitionOffset>> {
let correlation_id = self.state.next_correlation_id();
let mut partition_vec: Vec<FetchGroupOffset<'_>> = Vec::new();
match self.state.partitions_for(topic) {
None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
Some(tp) => {
for (id, _) in tp {
partition_vec.push(FetchGroupOffset::new(topic, id));
}
}
}
offset_ops::fetch_group_offsets_kp(
partition_vec,
group,
correlation_id,
&self.config.client_id,
&mut self.state,
&mut self.conn_pool,
&self.config,
)
.map(|mut m| m.remove(topic).unwrap_or_default())
}
#[must_use]
pub fn group_coordinator_host(&self, group: &str) -> Option<String> {
self.state
.group_coordinator(group)
.map(std::borrow::ToOwned::to_owned)
}
pub fn next_correlation_id(&mut self) -> i32 {
self.state.next_correlation_id()
}
pub fn get_conn_mut(&mut self, host: &str) -> Result<&mut network::KafkaConnection> {
self.conn_pool.get_conn(host, std::time::Instant::now())
}
}
impl KafkaClientInternals for KafkaClient {
fn internal_produce_messages<'a, 'b, I, J>(
&mut self,
required_acks: i16,
ack_timeout: i32,
messages: I,
) -> Result<Vec<ProduceConfirm>>
where
J: AsRef<ProduceMessage<'a, 'b>>,
I: IntoIterator<Item = J>,
{
let acks = match required_acks {
0 => RequiredAcks::None,
1 => RequiredAcks::One,
-1 => RequiredAcks::All,
_ => RequiredAcks::None,
};
produce_ops::internal_produce_messages_kp(
&mut self.conn_pool,
&mut self.state,
&self.config,
acks,
Duration::from_millis(u64::try_from(ack_timeout).unwrap_or_default()),
messages,
)
}
}