use crate::consumer::{Consumer, FetchParams, PartitionOffsets, TopicPartitions};
use crate::metadata::ClusterMetadata;
use crate::{
error::{Error, KafkaCode, Result},
metadata::{self},
network::BrokerConnection,
protocol, DEFAULT_CLIENT_ID, DEFAULT_CORRELATION_ID,
};
use nom::AsBytes;
use std::collections::HashMap;
use std::fmt::Debug;
#[derive(Clone)]
pub struct ConsumerBuilder<T: BrokerConnection> {
pub(crate) cluster_metadata: ClusterMetadata<T>,
pub(crate) fetch_params: FetchParams,
pub(crate) assigned_topic_partitions: TopicPartitions,
pub(crate) offsets: PartitionOffsets,
}
impl<T: BrokerConnection + Clone + Debug> ConsumerBuilder<T> {
pub async fn new(
broker_config: T::ConnConfig,
assigned_topic_partitions: TopicPartitions,
) -> Result<Self> {
let topics = Self::extract_topics_from_assignments(&assigned_topic_partitions);
let cluster_metadata = metadata::ClusterMetadata::new(
broker_config,
DEFAULT_CORRELATION_ID,
DEFAULT_CLIENT_ID.to_owned(),
topics,
)
.await?;
Ok(Self {
cluster_metadata,
fetch_params: FetchParams::create(DEFAULT_CORRELATION_ID, DEFAULT_CLIENT_ID.to_owned()),
assigned_topic_partitions,
offsets: HashMap::new(),
})
}
pub async fn seek_to_timestamp(mut self, timestamp: i64) -> Result<Self> {
tracing::debug!("Seeking offsets to timestamp {}", timestamp);
let brokers_and_their_topic_partitions = self
.cluster_metadata
.get_connections_for_topic_partitions(&self.assigned_topic_partitions)?;
self.offsets = HashMap::new();
for (broker_conn, topic_partitions) in brokers_and_their_topic_partitions.into_iter() {
let offsets_list = list_offsets(
broker_conn,
self.fetch_params.correlation_id,
&self.fetch_params.client_id,
&topic_partitions,
timestamp,
)
.await?;
let partition_offsets = offsets_list.into_box_iter();
for (topic_name, partition) in partition_offsets {
if partition.error_code != KafkaCode::None {
return Err(Error::KafkaError(partition.error_code));
}
let topic_name = std::str::from_utf8(topic_name.as_bytes()).map_err(|err| {
tracing::error!("Error converting from UTF8 {:?}", err);
Error::DecodingUtf8Error
})?;
let topic_name = self
.cluster_metadata
.topic_names
.iter()
.find(|my_topic| **my_topic == topic_name)
.ok_or(Error::MetadataNeedsSync)?;
self.offsets.insert(
(topic_name.to_owned(), partition.partition_index),
partition.offset,
);
}
}
tracing::trace!("Offsets set to {:?}", self.offsets);
Ok(self)
}
pub async fn seek_to_group(
mut self,
coordinator_conn: impl BrokerConnection + Clone,
group_id: &str,
) -> Result<Self> {
tracing::debug!("Seeking offsets to group {}", group_id);
let fetch_params = &self.fetch_params;
let offset_response = fetch_offset(
fetch_params.correlation_id,
&fetch_params.client_id,
group_id,
coordinator_conn,
&self.assigned_topic_partitions,
)
.await?;
if offset_response.error_code != KafkaCode::None {
return Err(Error::KafkaError(offset_response.error_code));
}
let partition_offsets = offset_response.into_box_iter();
for (topic_name, partition) in partition_offsets {
if partition.error_code != KafkaCode::None {
return Err(Error::KafkaError(partition.error_code));
}
let topic_name = std::str::from_utf8(topic_name.as_bytes()).map_err(|err| {
tracing::error!("Error converting from UTF8 {:?}", err);
Error::DecodingUtf8Error
})?;
let topic_name = self
.cluster_metadata
.topic_names
.iter()
.find(|my_topic| **my_topic == topic_name)
.ok_or(Error::MetadataNeedsSync)?;
let offset = if partition.committed_offset == -1 {
tracing::debug!(
"No offset found for topic {} partition {}, initializing to 0",
topic_name,
partition.partition_index
);
0
} else {
partition.committed_offset
};
self.offsets
.insert((topic_name.to_owned(), partition.partition_index), offset);
}
tracing::trace!("Offsets set to {:?}", self.offsets);
Ok(self)
}
pub fn seek(mut self, offsets: &PartitionOffsets) -> Self {
tracing::debug!("Seeking offsets to given values");
self.offsets = offsets.clone();
tracing::trace!("Offsets set to {:?}", self.offsets);
self
}
pub fn correlation_id(mut self, correlation_id: i32) -> Self {
self.fetch_params.correlation_id = correlation_id;
self
}
pub fn client_id(mut self, client_id: String) -> Self {
self.fetch_params.client_id = client_id;
self
}
pub fn max_wait_ms(mut self, max_wait_ms: i32) -> Self {
self.fetch_params.max_wait_ms = max_wait_ms;
self
}
pub fn min_bytes(mut self, min_bytes: i32) -> Self {
self.fetch_params.min_bytes = min_bytes;
self
}
pub fn max_bytes(mut self, max_bytes: i32) -> Self {
self.fetch_params.max_bytes = max_bytes;
self
}
pub fn max_partition_bytes(mut self, max_partition_bytes: i32) -> Self {
self.fetch_params.max_partition_bytes = max_partition_bytes;
self
}
pub fn isolation_level(mut self, isolation_level: i8) -> Self {
self.fetch_params.isolation_level = isolation_level;
self
}
pub fn build(self) -> Consumer<T> {
let fetch_params = self.fetch_params;
Consumer {
cluster_metadata: self.cluster_metadata,
fetch_params: fetch_params.clone(),
assigned_topic_partitions: self.assigned_topic_partitions,
offsets: self.offsets,
}
}
fn extract_topics_from_assignments(assigned_partitions: &TopicPartitions) -> Vec<String> {
assigned_partitions
.keys()
.map(|topic_name| topic_name.to_owned())
.collect()
}
}
pub async fn fetch_offset(
correlation_id: i32,
client_id: &str,
group_id: &str,
mut coordinator_conn: impl BrokerConnection,
topic_partitions: &TopicPartitions,
) -> Result<protocol::OffsetFetchResponse> {
tracing::debug!(
"Fetching offset for group {} for {:?}",
group_id,
topic_partitions
);
let mut offset_request = protocol::OffsetFetchRequest::new(correlation_id, client_id, group_id);
for (topic_name, partitions) in topic_partitions.iter() {
for partition_index in partitions.iter() {
offset_request.add(topic_name, *partition_index);
}
}
coordinator_conn.send_request(&offset_request).await?;
let offset_response = coordinator_conn.receive_response().await?;
protocol::OffsetFetchResponse::try_from(offset_response.freeze())
}
pub async fn list_offsets(
mut broker_conn: impl BrokerConnection,
correlation_id: i32,
client_id: &str,
topic_partitions: &TopicPartitions,
timestamp: i64,
) -> Result<protocol::ListOffsetsResponse> {
tracing::debug!(
"Listing offset for time {} for {:?}",
timestamp,
topic_partitions
);
let mut list_offsets_request = protocol::ListOffsetsRequest::new(correlation_id, client_id, -1);
for (topic_name, partitions) in topic_partitions.iter() {
for partition_index in partitions.iter() {
list_offsets_request.add(topic_name, *partition_index, timestamp);
}
}
broker_conn.send_request(&list_offsets_request).await?;
let list_offsets_response = broker_conn.receive_response().await?;
protocol::ListOffsetsResponse::try_from(list_offsets_response.freeze())
}