use std::time::Duration;
use crate::{SdkError, wit};
use super::{KafkaAuthentication, KafkaTlsConfig};
pub struct KafkaProducer {
pub(super) inner: wit::KafkaProducer,
}
impl KafkaProducer {
pub fn produce(&self, key: Option<&str>, value: &[u8]) -> Result<(), SdkError> {
self.inner.produce(key, value)?;
Ok(())
}
}
pub struct KafkaProducerConfig {
compression: wit::KafkaProducerCompression,
partitions: Option<Vec<i32>>,
batching: Option<KafkaBatchConfig>,
tls: Option<wit::KafkaTlsConfig>,
authentication: Option<wit::KafkaAuthentication>,
}
impl KafkaProducerConfig {
pub fn compression(&mut self, compression: KafkaProducerCompression) {
self.compression = compression.into();
}
pub fn partitions(&mut self, partitions: Vec<i32>) {
self.partitions = Some(partitions);
}
pub fn batching(&mut self, batching: KafkaBatchConfig) {
self.batching = Some(batching);
}
pub fn tls(&mut self, tls: KafkaTlsConfig) {
self.tls = Some(tls.into());
}
pub fn authentication(&mut self, authentication: KafkaAuthentication) {
self.authentication = Some(authentication.into());
}
}
pub struct KafkaBatchConfig {
pub max_size_bytes: u64,
pub linger: Duration,
}
pub enum KafkaProducerCompression {
None,
Gzip,
Snappy,
Lz4,
Zstd,
}
impl From<KafkaProducerCompression> for wit::KafkaProducerCompression {
fn from(value: KafkaProducerCompression) -> Self {
match value {
KafkaProducerCompression::None => wit::KafkaProducerCompression::None,
KafkaProducerCompression::Gzip => wit::KafkaProducerCompression::Gzip,
KafkaProducerCompression::Snappy => wit::KafkaProducerCompression::Snappy,
KafkaProducerCompression::Lz4 => wit::KafkaProducerCompression::Lz4,
KafkaProducerCompression::Zstd => wit::KafkaProducerCompression::Zstd,
}
}
}
impl From<KafkaProducerConfig> for wit::KafkaProducerConfig {
fn from(value: KafkaProducerConfig) -> Self {
Self {
compression: value.compression,
client_config: wit::KafkaClientConfig {
partitions: value.partitions,
tls: value.tls,
authentication: value.authentication,
},
batching: value.batching.map(Into::into),
}
}
}
impl From<KafkaBatchConfig> for wit::KafkaBatchConfig {
fn from(value: KafkaBatchConfig) -> Self {
Self {
linger_ms: value.linger.as_millis() as u64,
batch_size_bytes: value.max_size_bytes,
}
}
}
impl Default for KafkaProducerConfig {
fn default() -> Self {
Self {
compression: wit::KafkaProducerCompression::None,
partitions: None,
batching: None,
tls: None,
authentication: None,
}
}
}