use crate::config::{ClientConfig, CommonConfig};
use crate::name::Name;
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::client::DefaultClientContext;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
pub struct CreateOption {
pub partitions: i32,
pub replication: i32,
}
impl CreateOption {
pub fn new(partitions: i32, replication: i32) -> Self {
Self {
partitions,
replication,
}
}
}
impl Default for CreateOption {
fn default() -> Self {
Self::new(1, 1)
}
}
pub struct Inbox {
client: AdminClient<DefaultClientContext>,
}
impl Inbox {
pub fn new(config: &ClientConfig) -> Result<Self, KafkaError> {
let client = config.inner.create()?;
Ok(Inbox { client })
}
pub fn new_with_brokers<V: AsRef<str>>(brokers: &[V]) -> Result<Self, KafkaError> {
let mut c = ClientConfig::default();
c.set_brokers(brokers);
Inbox::new(&c)
}
pub async fn create_inbox<S: AsRef<str>>(
&self,
name: S,
opt: CreateOption,
) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
let name = Name::new(name.as_ref());
let mut topics = vec![];
for n in name.names() {
let partitions = if name.is_dead_letter(n) {
1
} else {
opt.partitions
};
topics.push(NewTopic::new(
n,
partitions,
TopicReplication::Fixed(opt.replication),
));
}
println!("topics:{:?}", topics);
Ok(check_topic_result(
self.client
.create_topics(&topics[..], &AdminOptions::new())
.await?,
RDKafkaErrorCode::TopicAlreadyExists,
topics.len(),
))
}
pub async fn delete_inbox<S: AsRef<str>>(
&self,
name: S,
) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
let name = Name::new(name.as_ref());
let names: Vec<&str> = name.names().iter().map(|n| n as &str).collect();
Ok(check_topic_result(
self.client
.delete_topics(&names, &AdminOptions::new())
.await?,
RDKafkaErrorCode::UnknownTopicOrPartition,
names.len(),
))
}
pub async fn adjust_inbox<S: AsRef<str>>(
&self,
name: S,
partitions: i32,
) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
let name = Name::new(name.as_ref());
let mut parts = vec![];
for n in name.names() {
if name.is_dead_letter(n) {
continue;
}
parts.push(rdkafka::admin::NewPartitions::new(n, partitions as usize));
}
self.client
.create_partitions(&parts, &AdminOptions::new())
.await
.map(|r| match r[0] {
Ok(_) => Ok(()),
Err(ref t) => Err(t.1),
})
}
}
fn check_topic_result(
v: Vec<rdkafka::admin::TopicResult>,
not: RDKafkaErrorCode,
count: usize,
) -> Result<(), RDKafkaErrorCode> {
assert_eq!(v.len(), count);
for r in v.iter() {
if let Err(t) = r {
if t.1 != not {
return Err(t.1);
}
}
}
Ok(())
}