use crate::config::{ClientConfig, CommonConfig};
use crate::name::Name;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::error::KafkaError;
use std::collections::HashMap;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct Meta {
pub name: String,
pub partitions: Vec<(i32, usize)>,
}
impl Meta {
pub fn new() -> Self {
Self {
name: String::new(),
partitions: vec![],
}
}
}
pub struct List {
inner: BaseConsumer,
}
impl List {
pub fn new(cfg: &ClientConfig) -> Result<Self, KafkaError> {
let consumer: BaseConsumer = cfg.inner.create()?;
Ok(List { inner: consumer })
}
pub fn new_with_brokers<V: AsRef<str>>(brokers: &[V]) -> Result<Self, KafkaError> {
let mut c = ClientConfig::default();
c.set_brokers(brokers);
List::new(&c)
}
pub fn list_inboxes(&self) -> Result<Vec<Meta>, KafkaError> {
let result = self.inner.fetch_metadata(None, Duration::from_secs(5))?;
let mut metas = vec![];
for topic in result.topics() {
let mut partitions = vec![];
for p in topic.partitions() {
partitions.push((p.id(), p.replicas().len()));
}
metas.push(Meta {
name: topic.name().to_string(),
partitions,
})
}
Ok(metas)
}
pub fn aggregate_inboxes<S: AsRef<str>>(
&self,
names: &[S],
) -> Result<HashMap<String, Vec<Meta>>, KafkaError> {
let mut result = HashMap::new();
if names.is_empty() {
return Ok(result);
}
let mut name_map = HashMap::new();
for name in names {
let name = Name::new(name.as_ref());
for n in name.names() {
name_map.insert(n.to_string(), name.name.clone());
}
result.insert(name.name.clone(), vec![]);
}
let metas = self.list_inboxes()?;
for meta in metas {
if let Some(original_name) = name_map.get(&meta.name) {
if let Some(entry) = result.get_mut(original_name) {
entry.push(meta);
}
}
}
Ok(result)
}
}