use std::collections::BTreeMap;
use chrono::Utc;
use rskafka::{
client::{
partition::{Compression, UnknownTopicHandling},
ClientBuilder,
},
record::Record,
};
use tokio::runtime::Runtime;
use crate::sink::retry::RetryPolicy;
use crate::{sink::Sink, SondaError};
pub const KAFKA_BUFFER_SIZE: usize = 64 * 1024;
pub struct KafkaSink {
topic: String,
brokers: String,
client: rskafka::client::partition::PartitionClient,
buffer: Vec<u8>,
runtime: Runtime,
retry_policy: Option<RetryPolicy>,
}
impl KafkaSink {
pub fn new(
brokers: &str,
topic: &str,
retry_policy: Option<RetryPolicy>,
) -> Result<Self, SondaError> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| {
std::io::Error::other(format!(
"kafka sink: failed to build tokio runtime for broker '{}': {}",
brokers, e
))
})
.map_err(SondaError::Sink)?;
let bootstrap_brokers: Vec<String> = brokers
.split(',')
.map(|s| s.trim().to_owned())
.filter(|s| !s.is_empty())
.collect();
if bootstrap_brokers.is_empty() {
return Err(SondaError::Sink(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("kafka sink: no valid broker addresses in '{}'", brokers),
)));
}
let topic_str = topic.to_owned();
let brokers_str = brokers.to_owned();
let client = runtime
.block_on(async {
let kafka_client = ClientBuilder::new(bootstrap_brokers)
.build()
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!(
"kafka sink: failed to connect to broker(s) '{}': {}",
brokers_str, e
),
)
})?;
kafka_client
.partition_client(
topic_str.clone(),
0, UnknownTopicHandling::Retry,
)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::NotFound,
format!(
"kafka sink: failed to get partition client for topic '{}' at broker(s) '{}': {}",
topic_str, brokers_str, e
),
)
})
})
.map_err(SondaError::Sink)?;
Ok(Self {
topic: topic.to_owned(),
brokers: brokers.to_owned(),
client,
buffer: Vec::with_capacity(KAFKA_BUFFER_SIZE),
runtime,
retry_policy,
})
}
fn publish_buffer(&mut self) -> Result<(), SondaError> {
if self.buffer.is_empty() {
return Ok(());
}
let payload = std::mem::replace(&mut self.buffer, Vec::with_capacity(KAFKA_BUFFER_SIZE));
match &self.retry_policy {
Some(policy) => {
let policy = policy.clone();
policy.execute(
|| self.do_produce(&payload),
|_| true, )
}
None => self.do_produce(&payload),
}
}
fn do_produce(&mut self, payload: &[u8]) -> Result<(), SondaError> {
let record = Record {
key: None,
value: Some(payload.to_vec()),
headers: BTreeMap::new(),
timestamp: Utc::now(),
};
self.runtime
.block_on(async {
self.client
.produce(vec![record], Compression::NoCompression)
.await
})
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
format!(
"kafka sink: failed to produce record to topic '{}' at broker(s) '{}': {}",
self.topic, self.brokers, e
),
)
})
.map_err(SondaError::Sink)?;
Ok(())
}
}
impl Sink for KafkaSink {
fn write(&mut self, data: &[u8]) -> Result<(), SondaError> {
self.buffer.extend_from_slice(data);
if self.buffer.len() >= KAFKA_BUFFER_SIZE {
self.publish_buffer()?;
}
Ok(())
}
fn flush(&mut self) -> Result<(), SondaError> {
self.publish_buffer()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sink::SinkConfig;
#[test]
fn kafka_buffer_size_is_64_kib() {
assert_eq!(KAFKA_BUFFER_SIZE, 64 * 1024);
}
#[test]
fn kafka_sink_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<KafkaSink>();
}
#[cfg(feature = "config")]
#[test]
fn sink_config_kafka_deserializes_with_brokers_and_topic() {
let yaml = "type: kafka\nbrokers: \"127.0.0.1:9092\"\ntopic: sonda-test";
let config: SinkConfig = serde_yaml_ng::from_str(yaml).unwrap();
match config {
SinkConfig::Kafka { brokers, topic, .. } => {
assert_eq!(brokers, "127.0.0.1:9092");
assert_eq!(topic, "sonda-test");
}
other => panic!("expected SinkConfig::Kafka, got {other:?}"),
}
}
#[cfg(feature = "config")]
#[test]
fn sink_config_kafka_deserializes_with_multiple_brokers() {
let yaml = "type: kafka\nbrokers: \"broker1:9092,broker2:9092\"\ntopic: my-topic";
let config: SinkConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert!(
matches!(config, SinkConfig::Kafka { ref brokers, ref topic, .. }
if brokers == "broker1:9092,broker2:9092" && topic == "my-topic")
);
}
#[cfg(feature = "config")]
#[test]
fn sink_config_kafka_requires_brokers_field() {
let yaml = "type: kafka\ntopic: sonda-test";
let result: Result<SinkConfig, _> = serde_yaml_ng::from_str(yaml);
assert!(
result.is_err(),
"kafka variant without brokers should fail deserialization"
);
}
#[cfg(feature = "config")]
#[test]
fn sink_config_kafka_requires_topic_field() {
let yaml = "type: kafka\nbrokers: \"127.0.0.1:9092\"";
let result: Result<SinkConfig, _> = serde_yaml_ng::from_str(yaml);
assert!(
result.is_err(),
"kafka variant without topic should fail deserialization"
);
}
#[test]
fn sink_config_kafka_is_cloneable() {
let config = SinkConfig::Kafka {
brokers: "127.0.0.1:9092".to_string(),
topic: "sonda-test".to_string(),
retry: None,
};
let cloned = config.clone();
assert!(
matches!(cloned, SinkConfig::Kafka { ref brokers, ref topic, .. }
if brokers == "127.0.0.1:9092" && topic == "sonda-test")
);
}
#[test]
fn sink_config_kafka_is_debuggable() {
let config = SinkConfig::Kafka {
brokers: "127.0.0.1:9092".to_string(),
topic: "sonda-test".to_string(),
retry: None,
};
let s = format!("{config:?}");
assert!(s.contains("Kafka"));
assert!(s.contains("9092"));
assert!(s.contains("sonda-test"));
}
#[test]
#[ignore = "requires network timeout which is slow; run with --ignored when desired"]
fn new_with_unreachable_broker_returns_sink_error() {
let result = KafkaSink::new("127.0.0.1:1", "sonda-test");
match result {
Err(err) => {
let msg = err.to_string();
assert!(
msg.contains("127.0.0.1:1") || msg.contains("kafka"),
"error message should reference the broker address or 'kafka', got: {msg}"
);
}
Ok(_) => panic!("construction must fail when broker is unreachable"),
}
}
#[test]
fn new_with_empty_broker_string_returns_error() {
let result = KafkaSink::new("", "sonda-test");
match result {
Err(err) => {
let msg = err.to_string();
assert!(
msg.contains("kafka") || msg.contains("broker"),
"error should mention kafka or broker, got: {msg}"
);
}
Ok(_) => panic!("empty broker string must be rejected"),
}
}
#[test]
fn new_with_whitespace_only_broker_string_returns_error() {
let result = KafkaSink::new(" , , ", "sonda-test");
assert!(
result.is_err(),
"broker string with only separators must be rejected"
);
}
#[cfg(feature = "config")]
#[test]
fn scenario_yaml_with_kafka_sink_deserializes_correctly() {
use crate::config::ScenarioConfig;
let yaml = r#"
name: kafka_test
rate: 100.0
generator:
type: constant
value: 1.0
encoder:
type: prometheus_text
sink:
type: kafka
brokers: "127.0.0.1:9092"
topic: sonda-metrics
"#;
let config: ScenarioConfig = serde_yaml_ng::from_str(yaml).unwrap();
assert_eq!(config.name, "kafka_test");
assert!(
matches!(config.sink, SinkConfig::Kafka { ref brokers, ref topic, .. }
if brokers == "127.0.0.1:9092" && topic == "sonda-metrics")
);
}
}