use std::time::Duration;
use crate::auth::AuthConfig;
use crate::error::{KrafkaError, Result};
use crate::metadata::MetadataRecoveryStrategy;
use crate::protocol::Compression;
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum Acks {
None,
Leader,
#[default]
All,
}
impl Acks {
#[inline]
pub fn to_i16(self) -> i16 {
match self {
Acks::None => 0,
Acks::Leader => 1,
Acks::All => -1,
}
}
#[inline]
pub fn from_i16(value: i16) -> Self {
match value {
0 => Acks::None,
1 => Acks::Leader,
-1 => Acks::All,
other => {
tracing::warn!(acks = other, "Unknown acks value, defaulting to All");
Acks::All
}
}
}
}
#[derive(Debug, Clone)]
pub struct ProducerConfig {
pub(crate) bootstrap_servers: String,
pub(crate) client_id: String,
pub(crate) acks: Acks,
pub(crate) compression: Compression,
pub(crate) batch_size: usize,
pub(crate) linger: Duration,
pub(crate) request_timeout: Duration,
pub(crate) delivery_timeout: Duration,
pub(crate) retries: u32,
pub(crate) retry_backoff: Duration,
pub(crate) max_in_flight: usize,
pub(crate) max_request_size: usize,
pub(crate) idempotent: bool,
pub(crate) max_block: Duration,
pub(crate) buffer_memory: usize,
pub(crate) metadata_max_age: Duration,
pub(crate) metadata_topic_cache_ttl: Option<Duration>,
pub(crate) metadata_recovery_strategy: MetadataRecoveryStrategy,
pub(crate) metadata_recovery_rebootstrap_trigger: Duration,
pub(crate) auth: Option<AuthConfig>,
#[cfg(feature = "socks5")]
pub(crate) proxy: Option<crate::network::ProxyConfig>,
}
impl Default for ProducerConfig {
fn default() -> Self {
Self {
bootstrap_servers: String::new(),
client_id: "krafka".to_string(),
acks: Acks::All,
compression: Compression::None,
batch_size: 16384,
linger: Duration::from_millis(0),
request_timeout: Duration::from_secs(30),
delivery_timeout: Duration::from_secs(120),
retries: u32::MAX,
retry_backoff: Duration::from_millis(100),
max_in_flight: 5,
max_request_size: crate::protocol::MAX_MESSAGE_SIZE,
idempotent: true,
max_block: Duration::from_secs(60),
buffer_memory: 32 * 1024 * 1024, metadata_max_age: Duration::from_secs(300),
metadata_topic_cache_ttl: Some(Duration::from_secs(300)),
metadata_recovery_strategy: MetadataRecoveryStrategy::Rebootstrap,
metadata_recovery_rebootstrap_trigger: Duration::from_secs(300),
auth: None,
#[cfg(feature = "socks5")]
proxy: None,
}
}
}
impl ProducerConfig {
pub fn builder() -> ProducerConfigBuilder {
ProducerConfigBuilder::default()
}
#[inline]
pub fn bootstrap_servers(&self) -> &str {
&self.bootstrap_servers
}
#[inline]
pub fn client_id(&self) -> &str {
&self.client_id
}
#[inline]
pub fn acks(&self) -> Acks {
self.acks
}
#[inline]
pub fn compression(&self) -> Compression {
self.compression
}
#[inline]
pub fn batch_size(&self) -> usize {
self.batch_size
}
#[inline]
pub fn linger(&self) -> Duration {
self.linger
}
#[inline]
pub fn request_timeout(&self) -> Duration {
self.request_timeout
}
#[inline]
pub fn delivery_timeout(&self) -> Duration {
self.delivery_timeout
}
#[inline]
pub fn retries(&self) -> u32 {
self.retries
}
#[inline]
pub fn retry_backoff(&self) -> Duration {
self.retry_backoff
}
#[inline]
pub fn max_in_flight(&self) -> usize {
self.max_in_flight
}
#[inline]
pub fn max_request_size(&self) -> usize {
self.max_request_size
}
#[inline]
pub fn idempotent(&self) -> bool {
self.idempotent
}
#[inline]
pub fn max_block(&self) -> Duration {
self.max_block
}
#[inline]
pub fn buffer_memory(&self) -> usize {
self.buffer_memory
}
#[inline]
pub fn metadata_max_age(&self) -> Duration {
self.metadata_max_age
}
#[inline]
pub fn metadata_topic_cache_ttl(&self) -> Option<Duration> {
self.metadata_topic_cache_ttl
}
#[inline]
pub fn metadata_recovery_strategy(&self) -> MetadataRecoveryStrategy {
self.metadata_recovery_strategy
}
#[inline]
pub fn metadata_recovery_rebootstrap_trigger(&self) -> Duration {
self.metadata_recovery_rebootstrap_trigger
}
#[inline]
pub fn auth(&self) -> Option<&AuthConfig> {
self.auth.as_ref()
}
#[cfg(feature = "socks5")]
#[inline]
pub fn proxy(&self) -> Option<&crate::network::ProxyConfig> {
self.proxy.as_ref()
}
}
#[must_use = "builders do nothing until .build() is called"]
#[derive(Debug, Default)]
pub struct ProducerConfigBuilder {
config: ProducerConfig,
}
impl ProducerConfigBuilder {
pub fn bootstrap_servers(mut self, servers: impl Into<String>) -> Self {
self.config.bootstrap_servers = servers.into();
self
}
pub fn client_id(mut self, id: impl Into<String>) -> Self {
self.config.client_id = id.into();
self
}
pub fn acks(mut self, acks: Acks) -> Self {
self.config.acks = acks;
self
}
pub fn compression(mut self, compression: Compression) -> Self {
self.config.compression = compression;
self
}
pub fn batch_size(mut self, size: usize) -> Self {
self.config.batch_size = size;
self
}
pub fn linger(mut self, duration: Duration) -> Self {
self.config.linger = duration;
self
}
pub fn auth(mut self, auth: AuthConfig) -> Self {
self.config.auth = Some(auth);
self
}
#[cfg(feature = "socks5")]
pub fn proxy(mut self, proxy: crate::network::ProxyConfig) -> Self {
self.config.proxy = Some(proxy);
self
}
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.config.request_timeout = timeout;
self
}
pub fn delivery_timeout(mut self, timeout: Duration) -> Self {
self.config.delivery_timeout = timeout;
self
}
pub fn retries(mut self, retries: u32) -> Self {
self.config.retries = retries;
self
}
pub fn retry_backoff(mut self, backoff: Duration) -> Self {
self.config.retry_backoff = backoff;
self
}
pub fn max_in_flight(mut self, max: usize) -> Self {
self.config.max_in_flight = max;
self
}
pub fn max_request_size(mut self, bytes: usize) -> Self {
self.config.max_request_size = bytes;
self
}
pub fn idempotent(mut self, enable: bool) -> Self {
self.config.idempotent = enable;
self
}
pub fn max_block(mut self, duration: Duration) -> Self {
self.config.max_block = duration;
self
}
pub fn buffer_memory(mut self, bytes: usize) -> Self {
self.config.buffer_memory = bytes;
self
}
pub fn metadata_max_age(mut self, duration: Duration) -> Self {
self.config.metadata_max_age = duration;
self
}
pub fn metadata_topic_cache_ttl(mut self, ttl: Duration) -> Self {
self.config.metadata_topic_cache_ttl = Some(ttl);
self
}
pub fn disable_metadata_topic_cache_ttl(mut self) -> Self {
self.config.metadata_topic_cache_ttl = None;
self
}
pub fn metadata_recovery_strategy(mut self, strategy: MetadataRecoveryStrategy) -> Self {
self.config.metadata_recovery_strategy = strategy;
self
}
pub fn metadata_recovery_rebootstrap_trigger(mut self, duration: Duration) -> Self {
self.config.metadata_recovery_rebootstrap_trigger = duration;
self
}
pub fn build(self) -> Result<ProducerConfig> {
if self.config.batch_size == 0 {
return Err(KrafkaError::config(format!(
"batch_size must be >= 1 (got {})",
self.config.batch_size
)));
}
if self.config.max_in_flight == 0 {
return Err(KrafkaError::config(format!(
"max_in_flight must be >= 1 (got {})",
self.config.max_in_flight
)));
}
if self.config.max_request_size == 0 {
return Err(KrafkaError::config("max_request_size must be >= 1"));
}
if self.config.delivery_timeout.is_zero() {
return Err(KrafkaError::config(
"delivery_timeout must be greater than zero",
));
}
if self.config.idempotent {
if self.config.retries == 0 {
return Err(KrafkaError::config(
"idempotent producer requires retries > 0",
));
}
if self.config.acks != Acks::All {
return Err(KrafkaError::config(format!(
"idempotent producer requires acks = All (got {:?})",
self.config.acks
)));
}
if self.config.max_in_flight > 5 {
return Err(KrafkaError::config(format!(
"idempotent producer requires max_in_flight <= 5 (got {})",
self.config.max_in_flight
)));
}
}
if self.config.buffer_memory > 0 && self.config.batch_size > self.config.buffer_memory {
return Err(KrafkaError::config(format!(
"batch_size must not exceed buffer_memory (got batch_size={}, buffer_memory={})",
self.config.batch_size, self.config.buffer_memory
)));
}
if self.config.batch_size > self.config.max_request_size {
return Err(KrafkaError::config(format!(
"batch_size must not exceed max_request_size (got batch_size={}, max_request_size={})",
self.config.batch_size, self.config.max_request_size
)));
}
Ok(self.config)
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn test_acks_to_i16() {
assert_eq!(Acks::None.to_i16(), 0);
assert_eq!(Acks::Leader.to_i16(), 1);
assert_eq!(Acks::All.to_i16(), -1);
}
#[test]
fn test_acks_from_i16() {
assert_eq!(Acks::from_i16(0), Acks::None);
assert_eq!(Acks::from_i16(1), Acks::Leader);
assert_eq!(Acks::from_i16(-1), Acks::All);
}
#[test]
fn test_config_default() {
let config = ProducerConfig::default();
assert_eq!(config.acks, Acks::All);
assert!(config.idempotent);
assert_eq!(config.compression, Compression::None);
assert_eq!(config.batch_size, 16384);
assert_eq!(config.max_request_size, crate::protocol::MAX_MESSAGE_SIZE);
assert_eq!(config.delivery_timeout, Duration::from_secs(120));
assert_eq!(config.retries, u32::MAX);
assert_eq!(
config.metadata_topic_cache_ttl,
Some(Duration::from_secs(300))
);
}
#[test]
fn test_config_builder() {
let config = ProducerConfig::builder()
.bootstrap_servers("localhost:9092")
.client_id("test")
.acks(Acks::All)
.compression(Compression::Lz4)
.batch_size(32768)
.max_request_size(65536)
.build()
.unwrap();
assert_eq!(config.bootstrap_servers, "localhost:9092");
assert_eq!(config.client_id, "test");
assert_eq!(config.acks, Acks::All);
assert_eq!(config.compression, Compression::Lz4);
assert_eq!(config.batch_size, 32768);
assert_eq!(config.max_request_size, 65536);
}
#[test]
fn test_config_builder_request_timeout() {
let config = ProducerConfig::builder()
.request_timeout(Duration::from_secs(60))
.build()
.unwrap();
assert_eq!(
config.request_timeout,
Duration::from_secs(60),
"request_timeout should be set by builder"
);
}
#[test]
fn test_config_builder_delivery_timeout() {
let config = ProducerConfig::builder()
.delivery_timeout(Duration::from_secs(45))
.build()
.unwrap();
assert_eq!(config.delivery_timeout(), Duration::from_secs(45));
}
#[test]
fn test_config_builder_max_in_flight() {
let config = ProducerConfig::builder()
.idempotent(false)
.max_in_flight(10)
.build()
.unwrap();
assert_eq!(
config.max_in_flight, 10,
"max_in_flight should be set by builder"
);
}
#[test]
fn test_config_builder_metadata_max_age() {
let config = ProducerConfig::builder()
.metadata_max_age(Duration::from_secs(120))
.build()
.unwrap();
assert_eq!(
config.metadata_max_age,
Duration::from_secs(120),
"metadata_max_age should be set by builder"
);
}
#[test]
fn test_config_builder_metadata_topic_cache_ttl() {
let config = ProducerConfig::builder()
.metadata_topic_cache_ttl(Duration::from_secs(600))
.build()
.unwrap();
assert_eq!(
config.metadata_topic_cache_ttl(),
Some(Duration::from_secs(600))
);
}
#[test]
fn test_config_builder_disable_metadata_topic_cache_ttl() {
let config = ProducerConfig::builder()
.disable_metadata_topic_cache_ttl()
.build()
.unwrap();
assert_eq!(config.metadata_topic_cache_ttl(), None);
}
#[test]
fn test_acks_from_i16_known_values() {
assert_eq!(Acks::from_i16(0), Acks::None);
assert_eq!(Acks::from_i16(1), Acks::Leader);
assert_eq!(Acks::from_i16(-1), Acks::All);
}
#[test]
fn test_acks_from_i16_unknown_defaults_to_all() {
assert_eq!(Acks::from_i16(2), Acks::All);
assert_eq!(Acks::from_i16(99), Acks::All);
assert_eq!(Acks::from_i16(-2), Acks::All);
}
#[test]
fn test_acks_roundtrip() {
assert_eq!(Acks::from_i16(Acks::None.to_i16()), Acks::None);
assert_eq!(Acks::from_i16(Acks::Leader.to_i16()), Acks::Leader);
assert_eq!(Acks::from_i16(Acks::All.to_i16()), Acks::All);
}
#[cfg(feature = "socks5")]
#[test]
fn test_config_builder_proxy_round_trip() {
let config = ProducerConfig::builder()
.proxy(crate::network::ProxyConfig::new("proxy:1080"))
.build()
.unwrap();
let proxy = config.proxy().expect("proxy should be set");
assert_eq!(proxy.address(), "proxy:1080");
}
#[test]
fn test_config_default_recovery_strategy() {
let config = ProducerConfig::default();
assert_eq!(
config.metadata_recovery_strategy,
MetadataRecoveryStrategy::Rebootstrap,
);
assert_eq!(
config.metadata_recovery_rebootstrap_trigger,
Duration::from_secs(300),
);
}
#[test]
fn test_config_builder_recovery_strategy() {
let config = ProducerConfig::builder()
.metadata_recovery_strategy(MetadataRecoveryStrategy::Rebootstrap)
.metadata_recovery_rebootstrap_trigger(Duration::from_secs(120))
.build()
.unwrap();
assert_eq!(
config.metadata_recovery_strategy(),
MetadataRecoveryStrategy::Rebootstrap,
);
assert_eq!(
config.metadata_recovery_rebootstrap_trigger(),
Duration::from_secs(120),
);
}
#[test]
fn test_config_builder_rejects_zero_batch_size() {
let err = ProducerConfig::builder().batch_size(0).build();
assert!(err.is_err());
}
#[test]
fn test_config_builder_rejects_zero_max_in_flight() {
let err = ProducerConfig::builder().max_in_flight(0).build();
assert!(err.is_err());
}
#[test]
fn test_config_builder_rejects_zero_max_request_size() {
let err = ProducerConfig::builder().max_request_size(0).build();
assert!(err.is_err());
}
#[test]
fn test_config_builder_rejects_zero_delivery_timeout() {
let err = ProducerConfig::builder()
.delivery_timeout(Duration::ZERO)
.build();
assert!(err.is_err());
}
#[test]
fn test_config_builder_rejects_idempotent_without_retries() {
let err = ProducerConfig::builder().retries(0).build();
assert!(err.is_err());
}
#[test]
fn test_config_builder_rejects_idempotent_with_acks_leader() {
let err = ProducerConfig::builder()
.idempotent(true)
.acks(Acks::Leader)
.build();
assert!(err.is_err());
}
#[test]
fn test_config_builder_rejects_idempotent_with_high_in_flight() {
let err = ProducerConfig::builder()
.idempotent(true)
.max_in_flight(6)
.build();
assert!(err.is_err());
}
#[test]
fn test_config_builder_rejects_batch_exceeding_buffer() {
let err = ProducerConfig::builder()
.batch_size(1024)
.buffer_memory(512)
.build();
assert!(err.is_err());
}
#[test]
fn test_config_builder_rejects_batch_exceeding_max_request_size() {
let err = ProducerConfig::builder()
.batch_size(1024)
.max_request_size(512)
.build();
assert!(err.is_err());
}
}