use crate::client_wrappers::client_wrapper::ClientWrapper;
use crate::prelude::{AutoCommit, AutoCommitWhen, IggyConsumer};
use iggy_common::locking::IggyRwLock;
use iggy_common::{Consumer, EncryptorKind, Identifier, IggyDuration, PollingStrategy};
use std::sync::Arc;
#[derive(Debug)]
pub struct IggyConsumerBuilder {
client: IggyRwLock<ClientWrapper>,
consumer_name: String,
consumer: Consumer,
stream: Identifier,
topic: Identifier,
partition: Option<u32>,
polling_strategy: PollingStrategy,
polling_interval: Option<IggyDuration>,
batch_length: u32,
auto_commit: AutoCommit,
auto_join_consumer_group: bool,
create_consumer_group_if_not_exists: bool,
encryptor: Option<Arc<EncryptorKind>>,
polling_retry_interval: IggyDuration,
init_retries: Option<u32>,
init_retry_interval: IggyDuration,
allow_replay: bool,
}
impl IggyConsumerBuilder {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
client: IggyRwLock<ClientWrapper>,
consumer_name: String,
consumer: Consumer,
stream_id: Identifier,
topic_id: Identifier,
partition_id: Option<u32>,
encryptor: Option<Arc<EncryptorKind>>,
polling_interval: Option<IggyDuration>,
) -> Self {
Self {
client,
consumer_name,
consumer,
stream: stream_id,
topic: topic_id,
partition: partition_id,
polling_strategy: PollingStrategy::next(),
batch_length: 1000,
auto_commit: AutoCommit::IntervalOrWhen(
IggyDuration::ONE_SECOND,
AutoCommitWhen::PollingMessages,
),
auto_join_consumer_group: true,
create_consumer_group_if_not_exists: true,
encryptor,
polling_interval,
polling_retry_interval: IggyDuration::ONE_SECOND,
init_retries: None,
init_retry_interval: IggyDuration::ONE_SECOND,
allow_replay: false,
}
}
pub fn stream(self, stream: Identifier) -> Self {
Self { stream, ..self }
}
pub fn topic(self, topic: Identifier) -> Self {
Self { topic, ..self }
}
pub fn partition(self, partition: Option<u32>) -> Self {
Self { partition, ..self }
}
pub fn polling_strategy(self, polling_strategy: PollingStrategy) -> Self {
Self {
polling_strategy,
..self
}
}
pub fn batch_length(self, batch_length: u32) -> Self {
Self {
batch_length,
..self
}
}
pub fn auto_commit(self, auto_commit: AutoCommit) -> Self {
Self {
auto_commit,
..self
}
}
pub fn commit_failed_messages(self) -> Self {
Self {
auto_commit: AutoCommit::Disabled,
..self
}
}
pub fn auto_join_consumer_group(self) -> Self {
Self {
auto_join_consumer_group: true,
..self
}
}
pub fn do_not_auto_join_consumer_group(self) -> Self {
Self {
auto_join_consumer_group: false,
..self
}
}
pub fn create_consumer_group_if_not_exists(self) -> Self {
Self {
create_consumer_group_if_not_exists: true,
..self
}
}
pub fn do_not_create_consumer_group_if_not_exists(self) -> Self {
Self {
create_consumer_group_if_not_exists: false,
..self
}
}
pub fn poll_interval(self, interval: IggyDuration) -> Self {
Self {
polling_interval: Some(interval),
..self
}
}
pub fn without_poll_interval(self) -> Self {
Self {
polling_interval: None,
..self
}
}
pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self {
Self {
encryptor: Some(encryptor),
..self
}
}
pub fn without_encryptor(self) -> Self {
Self {
encryptor: None,
..self
}
}
pub fn polling_retry_interval(self, interval: IggyDuration) -> Self {
Self {
polling_retry_interval: interval,
..self
}
}
pub fn init_retries(self, retries: u32, interval: IggyDuration) -> Self {
Self {
init_retries: Some(retries),
init_retry_interval: interval,
..self
}
}
pub fn allow_replay(self) -> Self {
Self {
allow_replay: true,
..self
}
}
pub fn build(self) -> IggyConsumer {
IggyConsumer::new(
self.client,
self.consumer_name,
self.consumer,
self.stream,
self.topic,
self.partition,
self.polling_interval,
self.polling_strategy,
self.batch_length,
self.auto_commit,
self.auto_join_consumer_group,
self.create_consumer_group_if_not_exists,
self.encryptor,
self.polling_retry_interval,
self.init_retries,
self.init_retry_interval,
self.allow_replay,
)
}
}