mod consumer;
mod producer;
pub use consumer::{KafkaConsumer, KafkaConsumerConfig, KafkaMessage};
pub use producer::{KafkaBatchConfig, KafkaProducer, KafkaProducerCompression, KafkaProducerConfig};
use crate::{SdkError, wit};
use std::path::{Path, PathBuf};
pub fn producer(
name: &str,
servers: impl IntoIterator<Item = impl ToString>,
topic: &str,
config: KafkaProducerConfig,
) -> Result<KafkaProducer, SdkError> {
let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
let config = config.into();
let producer = wit::KafkaProducer::connect(name, &servers, topic, &config)?;
Ok(KafkaProducer { inner: producer })
}
pub fn consumer(
servers: impl IntoIterator<Item = impl ToString>,
topic: &str,
config: KafkaConsumerConfig,
) -> Result<KafkaConsumer, SdkError> {
let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
let config = config.into();
let consumer = wit::KafkaConsumer::connect(&servers, topic, &config)?;
Ok(KafkaConsumer { inner: consumer })
}
pub enum KafkaTlsConfig {
SystemCa,
CustomCa(PathBuf),
}
impl KafkaTlsConfig {
pub fn system_ca() -> Self {
Self::SystemCa
}
pub fn custom_ca(ca_cert_path: impl AsRef<Path>) -> Self {
Self::CustomCa(ca_cert_path.as_ref().to_path_buf())
}
}
impl From<KafkaTlsConfig> for wit::KafkaTlsConfig {
fn from(value: KafkaTlsConfig) -> Self {
match value {
KafkaTlsConfig::SystemCa => wit::KafkaTlsConfig::SystemCa,
KafkaTlsConfig::CustomCa(path) => wit::KafkaTlsConfig::CustomCa(path.to_string_lossy().to_string()),
}
}
}
enum KafkaAuthenticationInner {
SaslPlain(wit::KafkaSaslPlainAuth),
SaslScram(wit::KafkaSaslScramAuth),
Mtls(wit::KafkaMtlsAuth),
}
pub struct KafkaAuthentication {
inner: KafkaAuthenticationInner,
}
impl KafkaAuthentication {
pub fn sasl_plain(username: impl ToString, password: impl ToString) -> Self {
Self {
inner: KafkaAuthenticationInner::SaslPlain(wit::KafkaSaslPlainAuth {
username: username.to_string(),
password: password.to_string(),
}),
}
}
pub fn sasl_scram_sha256(username: impl ToString, password: impl ToString) -> Self {
Self {
inner: KafkaAuthenticationInner::SaslScram(wit::KafkaSaslScramAuth {
username: username.to_string(),
password: password.to_string(),
mechanism: wit::KafkaScramMechanism::Sha256,
}),
}
}
pub fn sasl_scram_sha512(username: impl ToString, password: impl ToString) -> Self {
Self {
inner: KafkaAuthenticationInner::SaslScram(wit::KafkaSaslScramAuth {
username: username.to_string(),
password: password.to_string(),
mechanism: wit::KafkaScramMechanism::Sha512,
}),
}
}
pub fn mtls(cert_path: impl AsRef<Path>, key_path: impl AsRef<Path>) -> Self {
Self {
inner: KafkaAuthenticationInner::Mtls(wit::KafkaMtlsAuth {
client_cert_path: cert_path.as_ref().to_string_lossy().to_string(),
client_key_path: key_path.as_ref().to_string_lossy().to_string(),
}),
}
}
}
impl From<KafkaAuthentication> for wit::KafkaAuthentication {
fn from(value: KafkaAuthentication) -> Self {
match value.inner {
KafkaAuthenticationInner::SaslPlain(wit::KafkaSaslPlainAuth { username, password }) => {
wit::KafkaAuthentication::SaslPlain(wit::KafkaSaslPlainAuth { username, password })
}
KafkaAuthenticationInner::SaslScram(wit::KafkaSaslScramAuth {
username,
password,
mechanism,
}) => wit::KafkaAuthentication::SaslScram(wit::KafkaSaslScramAuth {
username,
password,
mechanism,
}),
KafkaAuthenticationInner::Mtls(wit::KafkaMtlsAuth {
client_cert_path,
client_key_path,
}) => wit::KafkaAuthentication::Mtls(wit::KafkaMtlsAuth {
client_cert_path,
client_key_path,
}),
}
}
}