use std::time::Duration;
use crate::client::config::RetryPolicy;
use crate::client::state::ClientState;
use crate::client::{ClientConfig, GroupOffsetStorage, KafkaClient};
use crate::compression::Compression;
use crate::protocol::api_versions::ApiVersionCache;
#[cfg(feature = "security")]
use crate::network::SecurityConfig;
use crate::network::Connections;
#[must_use]
pub struct KafkaClientBuilder {
hosts: Vec<String>,
client_id: String,
#[cfg(feature = "security")]
security: Option<SecurityConfig>,
compression: Compression,
fetch_max_wait_time_millis: u64,
fetch_min_bytes: i32,
fetch_max_bytes_per_partition: i32,
fetch_crc_validation: bool,
offset_storage: Option<GroupOffsetStorage>,
retry_policy: RetryPolicy,
producer_timestamp: Option<crate::protocol::produce::ProducerTimestamp>,
conn_rw_timeout_secs: u64,
conn_idle_timeout_millis: u64,
}
impl KafkaClientBuilder {
pub(crate) fn new() -> Self {
Self {
hosts: Vec::new(),
client_id: String::new(),
#[cfg(feature = "security")]
security: None,
compression: super::DEFAULT_COMPRESSION,
fetch_max_wait_time_millis: super::DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS,
fetch_min_bytes: super::DEFAULT_FETCH_MIN_BYTES,
fetch_max_bytes_per_partition: super::DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
fetch_crc_validation: super::DEFAULT_FETCH_CRC_VALIDATION,
offset_storage: super::DEFAULT_GROUP_OFFSET_STORAGE,
retry_policy: RetryPolicy::default(),
producer_timestamp: super::DEFAULT_PRODUCER_TIMESTAMP,
conn_rw_timeout_secs: super::DEFAULT_CONNECTION_RW_TIMEOUT_SECS,
conn_idle_timeout_millis: super::DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS,
}
}
pub fn with_hosts(mut self, hosts: Vec<String>) -> Self {
self.hosts = hosts;
self
}
pub fn with_client_id(mut self, id: String) -> Self {
self.client_id = id;
self
}
#[cfg(feature = "security")]
pub fn with_security(mut self, config: SecurityConfig) -> Self {
self.security = Some(config);
self
}
pub fn with_compression(mut self, compression: Compression) -> Self {
self.compression = compression;
self
}
pub fn with_fetch_max_wait_time(mut self, millis: u64) -> Self {
self.fetch_max_wait_time_millis = millis;
self
}
pub fn with_fetch_min_bytes(mut self, bytes: i32) -> Self {
self.fetch_min_bytes = bytes;
self
}
pub fn with_fetch_max_bytes_per_partition(mut self, bytes: i32) -> Self {
self.fetch_max_bytes_per_partition = bytes;
self
}
pub fn with_fetch_crc_validation(mut self, validate: bool) -> Self {
self.fetch_crc_validation = validate;
self
}
pub fn with_group_offset_storage(mut self, storage: Option<GroupOffsetStorage>) -> Self {
self.offset_storage = storage;
self
}
pub fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
self.retry_policy = policy;
self
}
pub fn with_retry_backoff_time(mut self, millis: u64) -> Self {
let backoff = Duration::from_millis(millis);
match &mut self.retry_policy {
RetryPolicy::Exponential { initial, .. } => *initial = backoff,
RetryPolicy::Fixed { interval, .. } => *interval = backoff,
RetryPolicy::None => {}
}
self
}
pub fn with_retry_max_attempts(mut self, attempts: u32) -> Self {
match &mut self.retry_policy {
RetryPolicy::Exponential { max_attempts, .. }
| RetryPolicy::Fixed { max_attempts, .. } => *max_attempts = attempts,
RetryPolicy::None => {}
}
self
}
#[cfg(feature = "producer_timestamp")]
pub fn with_producer_timestamp(
mut self,
timestamp: Option<crate::protocol::produce::ProducerTimestamp>,
) -> Self {
self.producer_timestamp = timestamp;
self
}
pub fn with_conn_rw_timeout(mut self, secs: u64) -> Self {
self.conn_rw_timeout_secs = secs;
self
}
pub fn with_conn_idle_timeout(mut self, millis: u64) -> Self {
self.conn_idle_timeout_millis = millis;
self
}
#[must_use]
pub fn build(self) -> KafkaClient {
let config = ClientConfig {
client_id: self.client_id,
hosts: self.hosts,
compression: self.compression,
fetch: super::config::FetchConfig {
max_wait_time: crate::protocol::to_millis_i32(Duration::from_millis(
self.fetch_max_wait_time_millis,
))
.expect("invalid fetch-max-wait-time"),
min_bytes: self.fetch_min_bytes,
max_bytes_per_partition: self.fetch_max_bytes_per_partition,
crc_validation: self.fetch_crc_validation,
},
retry: super::config::RetryConfig {
policy: self.retry_policy,
},
connection: super::config::ConnectionConfig {
rw_timeout: Duration::from_secs(self.conn_rw_timeout_secs),
idle_timeout: Duration::from_millis(self.conn_idle_timeout_millis),
},
offset_storage: self.offset_storage,
producer_timestamp: self.producer_timestamp,
};
let rw_timeout = if self.conn_rw_timeout_secs == 0 {
None
} else {
Some(config.connection.rw_timeout)
};
#[cfg(not(feature = "security"))]
let conn_pool = Connections::new(rw_timeout, config.connection.idle_timeout);
#[cfg(feature = "security")]
let conn_pool = match self.security {
Some(security) => Connections::new_with_security(
rw_timeout,
config.connection.idle_timeout,
Some(security),
),
None => Connections::new(rw_timeout, config.connection.idle_timeout),
};
KafkaClient {
config,
conn_pool,
state: ClientState::new(),
api_versions: ApiVersionCache::new(),
}
}
}
impl Default for KafkaClientBuilder {
fn default() -> Self {
Self::new()
}
}