use std::time::Duration;
use crate::client::{self, KafkaClient};
use crate::error::Result;
use crate::protocol;
#[cfg(feature = "producer_timestamp")]
use crate::protocol::produce::ProducerTimestamp;
#[cfg(feature = "security")]
use crate::client::SecurityConfig;
#[cfg(not(feature = "security"))]
type SecurityConfig = ();
use super::config::{Config, DEFAULT_ACK_TIMEOUT_MILLIS, DEFAULT_REQUIRED_ACKS};
use super::partitioner::DefaultPartitioner;
use super::{Compression, Partitioner, Producer, RequiredAcks, State};
pub struct Builder<P = DefaultPartitioner> {
client: Option<KafkaClient>,
hosts: Vec<String>,
compression: Compression,
ack_timeout: Duration,
conn_idle_timeout: Duration,
required_acks: RequiredAcks,
partitioner: P,
security_config: Option<SecurityConfig>,
client_id: Option<String>,
enable_idempotence: bool,
transactional_id: Option<String>,
#[cfg(feature = "producer_timestamp")]
producer_timestamp: Option<ProducerTimestamp>,
}
impl Builder {
pub(crate) fn new(
client: Option<KafkaClient>,
hosts: Vec<String>,
) -> Builder<DefaultPartitioner> {
let mut b = Builder {
client,
hosts,
compression: client::DEFAULT_COMPRESSION,
ack_timeout: Duration::from_millis(DEFAULT_ACK_TIMEOUT_MILLIS),
conn_idle_timeout: Duration::from_millis(
client::DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS,
),
required_acks: DEFAULT_REQUIRED_ACKS,
partitioner: DefaultPartitioner::default(),
security_config: None,
client_id: None,
enable_idempotence: false,
transactional_id: None,
#[cfg(feature = "producer_timestamp")]
producer_timestamp: None,
};
if let Some(ref c) = b.client {
b.compression = c.compression();
b.conn_idle_timeout = c.connection_idle_timeout();
}
b
}
#[cfg(feature = "security")]
#[must_use]
pub fn with_security(mut self, security: SecurityConfig) -> Self {
self.security_config = Some(security);
self
}
#[must_use]
pub fn with_compression(mut self, compression: Compression) -> Self {
self.compression = compression;
self
}
#[must_use]
pub fn with_ack_timeout(mut self, timeout: Duration) -> Self {
self.ack_timeout = timeout;
self
}
#[must_use]
pub fn with_connection_idle_timeout(mut self, timeout: Duration) -> Self {
self.conn_idle_timeout = timeout;
self
}
#[must_use]
pub fn with_required_acks(mut self, acks: RequiredAcks) -> Self {
self.required_acks = acks;
self
}
#[must_use]
pub fn with_client_id(mut self, client_id: String) -> Self {
self.client_id = Some(client_id);
self
}
#[cfg(feature = "producer_timestamp")]
#[must_use]
pub fn with_timestamp(mut self, timestamp: ProducerTimestamp) -> Self {
self.producer_timestamp = Some(timestamp);
self
}
#[must_use]
pub fn with_idempotence(mut self, enabled: bool) -> Self {
self.enable_idempotence = enabled;
if enabled {
self.required_acks = RequiredAcks::All;
}
self
}
#[must_use]
pub fn with_transactional_id(mut self, id: impl Into<String>) -> Self {
self.transactional_id = Some(id.into());
self.enable_idempotence = true;
self.required_acks = RequiredAcks::All;
self
}
}
impl<P> Builder<P> {
pub fn with_partitioner<Q: Partitioner>(self, partitioner: Q) -> Builder<Q> {
Builder {
client: self.client,
hosts: self.hosts,
compression: self.compression,
ack_timeout: self.ack_timeout,
conn_idle_timeout: self.conn_idle_timeout,
required_acks: self.required_acks,
partitioner,
security_config: None,
client_id: None,
enable_idempotence: self.enable_idempotence,
transactional_id: self.transactional_id,
#[cfg(feature = "producer_timestamp")]
producer_timestamp: None,
}
}
#[cfg(not(feature = "security"))]
fn new_kafka_client(hosts: Vec<String>, _: Option<SecurityConfig>) -> KafkaClient {
KafkaClient::new(hosts)
}
#[cfg(feature = "security")]
fn new_kafka_client(hosts: Vec<String>, security: Option<SecurityConfig>) -> KafkaClient {
if let Some(security) = security {
KafkaClient::new_secure(hosts, security)
} else {
KafkaClient::new(hosts)
}
}
pub fn create(self) -> Result<Producer<P>> {
let (mut client, need_metadata) = match self.client {
Some(client) => (client, false),
None => (
Self::new_kafka_client(self.hosts, self.security_config),
true,
),
};
client.set_compression(self.compression);
client.set_connection_idle_timeout(self.conn_idle_timeout);
#[cfg(feature = "producer_timestamp")]
client.set_producer_timestamp(self.producer_timestamp);
if let Some(client_id) = self.client_id {
client.set_client_id(client_id);
}
let producer_config = Config {
ack_timeout: protocol::to_millis_i32(self.ack_timeout)?,
required_acks: self.required_acks as i16,
enable_idempotence: self.enable_idempotence,
transactional_id: self.transactional_id,
};
if need_metadata {
client.load_metadata_all()?;
}
let state = State::new(&mut client, self.partitioner);
Ok(Producer {
client,
state,
config: producer_config,
})
}
}