use bytes::Bytes;
use nom::AsBytes;
use crate::{
consumer::{FetchParams, TopicPartitions},
consumer_group::ConsumerGroup,
error::{Error, KafkaCode, Result},
network::{BrokerAddress, BrokerConnection},
protocol, DEFAULT_CLIENT_ID, DEFAULT_CORRELATION_ID,
};
const DEFAULT_RETENTION_TIME_MS: i64 = 100000;
const DEFAULT_SESSION_TIMEOUT_MS: i32 = 10000;
const DEFAULT_REBALANCE_TIMEOUT_MS: i32 = 10000;
#[derive(Clone)]
pub struct ConsumerGroupBuilder<T: BrokerConnection> {
pub connection_params: T::ConnConfig,
pub correlation_id: i32,
pub client_id: String,
pub session_timeout_ms: i32,
pub rebalance_timeout_ms: i32,
pub group_id: String,
pub retention_time_ms: i64,
pub group_topic_partitions: TopicPartitions,
pub fetch_params: FetchParams,
}
impl<T: BrokerConnection> ConsumerGroupBuilder<T> {
pub async fn new(
connection_params: T::ConnConfig,
group_id: String,
group_topic_partitions: TopicPartitions,
) -> Result<Self> {
Ok(Self {
connection_params,
correlation_id: DEFAULT_CORRELATION_ID,
client_id: DEFAULT_CLIENT_ID.to_owned(),
session_timeout_ms: DEFAULT_SESSION_TIMEOUT_MS,
rebalance_timeout_ms: DEFAULT_REBALANCE_TIMEOUT_MS,
group_id,
retention_time_ms: DEFAULT_RETENTION_TIME_MS,
group_topic_partitions,
fetch_params: FetchParams::create(DEFAULT_CORRELATION_ID, DEFAULT_CLIENT_ID.to_owned()),
})
}
pub fn correlation_id(mut self, correlation_id: i32) -> Self {
self.fetch_params.correlation_id = correlation_id;
self
}
pub fn client_id(mut self, client_id: String) -> Self {
self.fetch_params.client_id = client_id;
self
}
pub fn retention_time_ms(mut self, retention_time_ms: i64) -> Self {
self.retention_time_ms = retention_time_ms;
self
}
pub fn session_timeout_ms(mut self, session_timeout_ms: i32) -> Self {
self.session_timeout_ms = session_timeout_ms;
self
}
pub fn rebalance_timeout_ms(mut self, rebalance_timeout_ms: i32) -> Self {
self.rebalance_timeout_ms = rebalance_timeout_ms;
self
}
pub fn max_wait_ms(mut self, max_wait_ms: i32) -> Self {
self.fetch_params.max_wait_ms = max_wait_ms;
self
}
pub fn min_bytes(mut self, min_bytes: i32) -> Self {
self.fetch_params.min_bytes = min_bytes;
self
}
pub fn max_bytes(mut self, max_bytes: i32) -> Self {
self.fetch_params.max_bytes = max_bytes;
self
}
pub fn max_partition_bytes(mut self, max_partition_bytes: i32) -> Self {
self.fetch_params.max_partition_bytes = max_partition_bytes;
self
}
pub fn isolation_level(mut self, isolation_level: i8) -> Self {
self.fetch_params.isolation_level = isolation_level;
self
}
pub async fn build(self) -> Result<ConsumerGroup<T>> {
let conn = T::new(self.connection_params.clone()).await?;
let coordinator =
find_coordinator(conn, self.correlation_id, &self.client_id, &self.group_id).await?;
if coordinator.error_code != KafkaCode::None {
return Err(Error::KafkaError(coordinator.error_code));
}
let host = std::str::from_utf8(coordinator.host.as_bytes()).map_err(|err| {
tracing::error!("Error converting from UTF8 {:?}", err);
Error::DecodingUtf8Error
})?;
let port = coordinator.port;
let coordinator_conn = T::from_addr(
self.connection_params.clone(),
BrokerAddress {
host: host.to_string(),
port: port.try_into().map_err(|err| {
tracing::error!(
"Error decoding Broker connection port from metadata {:?}",
err
);
Error::MetadataNeedsSync
})?,
},
)
.await?;
let fetch_params = self.fetch_params.clone();
Ok(ConsumerGroup {
connection_params: self.connection_params,
coordinator_conn,
correlation_id: fetch_params.correlation_id,
client_id: fetch_params.client_id,
session_timeout_ms: self.session_timeout_ms,
rebalance_timeout_ms: self.rebalance_timeout_ms,
group_id: self.group_id,
retention_time_ms: self.retention_time_ms,
group_topic_partitions: self.group_topic_partitions,
fetch_params: self.fetch_params,
member_id: Bytes::from_static(b""),
generation_id: 0,
assignment: None,
})
}
}
pub async fn find_coordinator(
mut conn: impl BrokerConnection,
correlation_id: i32,
client_id: &str,
group_id: &str,
) -> Result<protocol::FindCoordinatorResponse> {
let find_coordinator_request =
protocol::FindCoordinatorRequest::new(correlation_id, client_id, group_id);
conn.send_request(&find_coordinator_request).await?;
let find_coordinator_response = conn.receive_response().await?;
protocol::FindCoordinatorResponse::try_from(find_coordinator_response.freeze())
}