use std::{collections::HashMap, time::Duration};
use rdkafka::{
client::ClientContext,
config::ClientConfig,
error::{KafkaError, KafkaResult},
message::{BorrowedMessage, Message},
producer::{BaseRecord, NoCustomPartitioner, Producer, ProducerContext, ThreadedProducer},
};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct KafkaProducerConfig {
#[serde(flatten)]
pub common: KafkaCommonConfig,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub transaction: Option<KafkaTransactionalConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub queue: Option<KafkaQueueConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
max_retries: Option<i32>,
pub topic: String,
#[serde(default = "crate::util::default_true")]
pub idempotence: bool,
#[serde(default)]
pub gapless: bool,
#[serde(default = "KafkaProducerConfig::default_acks")]
pub acks: i16,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde"
)]
pub message_timeout: Option<Duration>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde"
)]
pub request_timeout: Option<Duration>,
#[serde(
default = "KafkaProducerConfig::default_flush_timeout",
with = "humantime_serde"
)]
pub flush_timeout: Duration,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde"
)]
pub linger: Option<Duration>,
}
impl KafkaProducerConfig {
#[must_use]
#[inline]
fn default_acks() -> i16 {
-1
}
#[must_use]
#[inline]
fn default_flush_timeout() -> Duration {
Duration::from_secs(5)
}
pub fn make_client_config(&self) -> ClientConfig {
let mut client = ClientConfig::new();
self.common.configure(&mut client);
client.set("enable.idempotence", kafka_config_bool(self.idempotence));
client.set("enable.gapless.guarantee", kafka_config_bool(self.gapless));
client.set("request.required.acks", self.acks.to_string());
if let Some(msg_timeout) = self.message_timeout {
client.set("message.timeout.ms", kafka_ms(msg_timeout));
}
if let Some(req_timeout) = self.request_timeout {
client.set("request.timeout.ms", kafka_ms(req_timeout));
}
if let Some(linger) = self.linger {
client.set("queue.buffering.max.ms", kafka_ms(linger));
}
if let Some(cfg) = &self.transaction {
cfg.configure(&mut client);
}
if let Some(cfg) = &self.queue {
cfg.configure(&mut client);
}
if let Some(retries) = self.max_retries {
client.set("message.send.max.retries", retries.to_string());
}
client
}
pub fn make_sync_producer<C: ProducerContext>(
&self,
context: C,
) -> KafkaResult<ThreadedProducer<C>> {
let client = self.make_client_config();
let producer: ThreadedProducer<C> = client.create_with_context(context)?;
if let Some(trxn) = &self.transaction {
producer.init_transactions(trxn.init_timeout)?;
}
Ok(producer)
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct KafkaCommonConfig {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub brokers: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub auth: Option<KafkaAuthConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub backoff: Option<KafkaBackoffConfig>,
#[serde(
default,
alias = "rdkafka_params",
skip_serializing_if = "HashMap::is_empty"
)]
pub extra_params: HashMap<String, String>,
}
impl KafkaCommonConfig {
pub fn configure(&self, client: &mut ClientConfig) {
client.set("bootstrap.servers", self.brokers.join(","));
if let Some(cfg) = &self.auth {
cfg.configure(client);
}
if let Some(cfg) = &self.backoff {
cfg.configure(client);
}
for (key, value) in &self.extra_params {
client.set(key, value);
}
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct KafkaAuthConfig {
pub username: Option<String>,
pub password: Option<String>, #[serde(default)]
pub protocol: KafkaProtocol,
#[serde(default)]
pub mechanism: KafkaAuthMechanism,
}
impl KafkaAuthConfig {
pub fn configure(&self, client: &mut ClientConfig) {
if let Some(username) = &self.username {
client.set("sasl.username", username);
}
if let Some(password) = &self.password {
client.set("sasl.password", password);
}
client.set("sasl.mechanisms", self.mechanism.as_ref());
client.set("security.protocol", self.protocol.as_ref());
}
}
#[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq, Eq, Serialize)]
#[non_exhaustive]
#[serde(rename_all = "snake_case")]
pub enum KafkaProtocol {
Plaintext,
Ssl,
SaslPlaintext,
#[default]
SaslSsl,
}
impl AsRef<str> for KafkaProtocol {
fn as_ref(&self) -> &str {
match self {
Self::Plaintext => "plaintext",
Self::Ssl => "ssl",
Self::SaslPlaintext => "sasl_plaintext",
Self::SaslSsl => "sasl_ssl",
}
}
}
#[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq, Eq, Serialize)]
#[non_exhaustive]
#[serde(rename_all = "UPPERCASE")]
pub enum KafkaAuthMechanism {
Gssapi,
Plain,
#[serde(rename = "SCRAM-SHA-256", alias = "SCRAM-SHA256")]
ScramSha256,
#[serde(rename = "SCRAM-SHA-512", alias = "SCRAM-SHA512")]
#[default]
ScramSha512,
#[serde(alias = "OAUTH-BEARER")]
OAuthBearer,
}
impl AsRef<str> for KafkaAuthMechanism {
fn as_ref(&self) -> &str {
match self {
Self::Gssapi => "GSSAPI",
Self::Plain => "PLAIN",
Self::ScramSha256 => "SCRAM-SHA-256",
Self::ScramSha512 => "SCRAM-SHA-512",
Self::OAuthBearer => "OAUTHBEARER",
}
}
}
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct KafkaTransactionalConfig {
pub id: String,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde"
)]
pub timeout: Option<Duration>,
#[serde(
default = "KafkaTransactionalConfig::default_init_timeout",
with = "humantime_serde"
)]
pub init_timeout: Duration,
}
impl KafkaTransactionalConfig {
#[must_use]
#[inline]
fn default_init_timeout() -> Duration {
Duration::from_secs(5)
}
pub fn configure(&self, client: &mut ClientConfig) {
client.set("transactional.id", &self.id);
if let Some(timeout) = self.timeout {
client.set("transaction.timeout.ms", kafka_ms(timeout));
}
}
}
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct KafkaQueueConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_messages: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_kbytes: Option<i32>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde"
)]
pub max_duration: Option<Duration>,
}
impl KafkaQueueConfig {
pub fn configure(&self, client: &mut ClientConfig) {
if let Some(max_msg) = self.max_messages {
client.set("queue.buffering.max.messages", max_msg.to_string());
}
if let Some(max_kb) = self.max_kbytes {
client.set("queue.buffering.max.kbytes", max_kb.to_string());
}
if let Some(max_dur) = self.max_duration {
client.set("queue.buffering.max.ms", kafka_ms(max_dur));
}
}
}
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct KafkaBackoffConfig {
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde"
)]
min: Option<Duration>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde"
)]
max: Option<Duration>,
}
impl KafkaBackoffConfig {
pub fn configure(&self, client: &mut ClientConfig) {
if let Some(min) = self.min {
client.set("retry.backoff.ms", kafka_ms(min));
}
if let Some(max) = self.max {
client.set("retry.backoff.max.ms", kafka_ms(max));
}
}
}
fn kafka_config_bool(b: bool) -> &'static str {
if b {
"true"
} else {
"false"
}
}
fn kafka_ms(d: Duration) -> String {
d.as_millis().to_string()
}
pub struct LogProducerContext;
impl ClientContext for LogProducerContext {}
impl ProducerContext<NoCustomPartitioner> for LogProducerContext {
type DeliveryOpaque = ();
fn delivery(
&self,
res: &Result<BorrowedMessage<'_>, (KafkaError, BorrowedMessage<'_>)>,
_: <Self as ProducerContext>::DeliveryOpaque,
) {
if let Err((err, msg)) = res {
eprintln!(
"Kafka log appender error (topic:{} partition:{} offset:{}): {err}",
msg.topic(),
msg.partition(),
msg.offset(),
);
}
}
}
pub struct KafkaLogAppender {
config: KafkaProducerConfig,
producer: ThreadedProducer<LogProducerContext>,
}
impl KafkaLogAppender {
pub fn new(config: &KafkaProducerConfig) -> KafkaResult<Self> {
let producer = config.make_sync_producer(LogProducerContext)?;
Ok(KafkaLogAppender {
config: config.clone(),
producer,
})
}
}
impl std::io::Write for KafkaLogAppender {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let record: BaseRecord<'_, (), _> = BaseRecord::to(&self.config.topic).payload(buf);
self.producer
.send(record)
.map_err(|(err, _)| std::io::Error::other(err))?;
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
self.producer
.flush(self.config.flush_timeout)
.map_err(std::io::Error::other)
}
}