use std::collections::BTreeMap;
use std::time::Duration;
use rdkafka::admin::AdminClient;
use rdkafka::client::DefaultClientContext;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::groups::GroupList;
use rdkafka::metadata::Metadata;
use rdkafka::TopicPartitionList;
use buswatch_types::{ModuleMetrics, ReadMetrics, SchemaVersion, Snapshot};
use crate::AdapterError;
pub struct KafkaAdapter {
#[allow(dead_code)]
admin: AdminClient<DefaultClientContext>,
consumer: BaseConsumer,
group_filter: Option<String>,
timeout: Duration,
}
impl KafkaAdapter {
pub fn builder() -> KafkaAdapterBuilder {
KafkaAdapterBuilder::default()
}
pub async fn collect(&self) -> Result<Snapshot, AdapterError> {
let groups = self.list_groups()?;
let metadata = self.fetch_metadata()?;
let mut modules = BTreeMap::new();
for group in groups.groups() {
let group_name = group.name();
if let Some(ref filter) = self.group_filter {
if !group_name.contains(filter) {
continue;
}
}
if let Ok(metrics) = self.collect_group_metrics(group_name, &metadata) {
modules.insert(group_name.to_string(), metrics);
}
}
Ok(Snapshot {
version: SchemaVersion::current(),
timestamp_ms: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
modules,
})
}
pub async fn collect_group(&self, group_name: &str) -> Result<ModuleMetrics, AdapterError> {
let metadata = self.fetch_metadata()?;
self.collect_group_metrics(group_name, &metadata)
}
fn list_groups(&self) -> Result<GroupList, AdapterError> {
self.consumer
.fetch_group_list(None, self.timeout)
.map_err(|e| AdapterError::Connection(e.to_string()))
}
fn fetch_metadata(&self) -> Result<Metadata, AdapterError> {
self.consumer
.fetch_metadata(None, self.timeout)
.map_err(|e| AdapterError::Connection(e.to_string()))
}
fn collect_group_metrics(
&self,
_group_name: &str,
metadata: &Metadata,
) -> Result<ModuleMetrics, AdapterError> {
let mut reads = BTreeMap::new();
for topic in metadata.topics() {
let topic_name = topic.name();
let mut tpl = TopicPartitionList::new();
for partition in topic.partitions() {
tpl.add_partition(topic_name, partition.id());
}
let committed = match self.consumer.committed_offsets(tpl.clone(), self.timeout) {
Ok(c) => c,
Err(_) => continue,
};
let mut total_lag: i64 = 0;
let mut total_offset: i64 = 0;
let mut has_offsets = false;
for elem in committed.elements() {
if let rdkafka::Offset::Offset(committed_offset) = elem.offset() {
has_offsets = true;
total_offset += committed_offset;
if let Ok((_, high)) =
self.consumer
.fetch_watermarks(topic_name, elem.partition(), self.timeout)
{
total_lag += high - committed_offset;
}
}
}
if has_offsets {
let mut read_metrics = ReadMetrics::new(total_offset as u64);
if total_lag >= 0 {
read_metrics.backlog = Some(total_lag as u64);
}
reads.insert(topic_name.to_string(), read_metrics);
}
}
Ok(ModuleMetrics {
reads,
writes: BTreeMap::new(),
})
}
}
impl std::fmt::Debug for KafkaAdapter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KafkaAdapter")
.field("group_filter", &self.group_filter)
.field("timeout", &self.timeout)
.finish()
}
}
#[derive(Debug, Default)]
pub struct KafkaAdapterBuilder {
brokers: Option<String>,
group_id: Option<String>,
group_filter: Option<String>,
timeout: Option<Duration>,
}
impl KafkaAdapterBuilder {
pub fn brokers(mut self, brokers: impl Into<String>) -> Self {
self.brokers = Some(brokers.into());
self
}
pub fn group_id(mut self, group_id: impl Into<String>) -> Self {
self.group_id = Some(group_id.into());
self
}
pub fn group_filter(mut self, filter: impl Into<String>) -> Self {
self.group_filter = Some(filter.into());
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn build(self) -> Result<KafkaAdapter, AdapterError> {
let brokers = self.brokers.unwrap_or_else(|| "localhost:9092".to_string());
let group_id = self
.group_id
.unwrap_or_else(|| "buswatch-adapter".to_string());
let timeout = self.timeout.unwrap_or(Duration::from_secs(10));
let mut config = ClientConfig::new();
config.set("bootstrap.servers", &brokers);
config.set("group.id", &group_id);
let admin: AdminClient<DefaultClientContext> = config
.create()
.map_err(|e| AdapterError::Connection(e.to_string()))?;
let consumer: BaseConsumer = config
.create()
.map_err(|e| AdapterError::Connection(e.to_string()))?;
Ok(KafkaAdapter {
admin,
consumer,
group_filter: self.group_filter,
timeout,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_builder_defaults() {
let builder = KafkaAdapter::builder()
.brokers("localhost:9092")
.group_id("test-group")
.timeout(Duration::from_secs(5));
assert!(builder.brokers.is_some());
assert!(builder.group_id.is_some());
}
}