use crate::config::Config;
pub struct Client {
client: rdkafka::admin::AdminClient<rdkafka::client::DefaultClientContext>,
}
impl Client {
pub fn build(config: &Config) -> Result<Client, rdkafka::error::KafkaError> {
let client: rdkafka::admin::AdminClient<rdkafka::client::DefaultClientContext> =
config.inner.create()?;
Ok(Self { client })
}
pub fn client(&self) -> &rdkafka::admin::AdminClient<rdkafka::client::DefaultClientContext> {
&self.client
}
pub async fn simple_create_topic(
&self,
topic: &str,
num_partitions: i32,
fixed_replication: i32,
) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
let topic = rdkafka::admin::NewTopic::new(
topic,
num_partitions,
rdkafka::admin::TopicReplication::Fixed(fixed_replication),
);
self.create_topic(topic, &rdkafka::admin::AdminOptions::new())
.await
}
pub async fn create_topic<'a>(
&self,
topic: rdkafka::admin::NewTopic<'a>,
opts: &rdkafka::admin::AdminOptions,
) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
self.client
.create_topics(&[topic], opts)
.await
.map(|r| match r[0] {
Ok(_) => Ok(()),
Err(ref t) => Err(t.1),
})
}
pub async fn simple_delete_topic(
&self,
topic: &str,
) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
self.delete_topic(topic, &rdkafka::admin::AdminOptions::new())
.await
}
pub async fn delete_topic(
&self,
topic: &str,
opts: &rdkafka::admin::AdminOptions,
) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
self.client
.delete_topics(&[topic], opts)
.await
.map(|r| match r[0] {
Ok(_) => Ok(()),
Err(ref t) => Err(t.1),
})
}
pub async fn simple_delete_group(
&self,
group_name: &str,
) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
self.delete_group(group_name, &rdkafka::admin::AdminOptions::new())
.await
}
pub async fn delete_group(
&self,
group_name: &str,
opts: &rdkafka::admin::AdminOptions,
) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
self.client
.delete_groups(&[group_name], opts)
.await
.map(|r| match r[0] {
Ok(_) => Ok(()),
Err(ref t) => Err(t.1),
})
}
pub async fn simple_create_partition(
&self,
topic: &str,
new_partition_count: usize,
) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
let p = rdkafka::admin::NewPartitions::new(topic, new_partition_count);
self.create_partition(p, &rdkafka::admin::AdminOptions::new())
.await
}
pub async fn create_partition<'a>(
&self,
partition: rdkafka::admin::NewPartitions<'a>,
opts: &rdkafka::admin::AdminOptions,
) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
self.client
.create_partitions(&[partition], opts)
.await
.map(|r| match r[0] {
Ok(_) => Ok(()),
Err(ref t) => Err(t.1),
})
}
pub async fn simple_delete_record(
&self,
topic: &str,
partition_id: i32,
offset: i64,
) -> Result<Result<(), rdkafka::error::KafkaError>, rdkafka::error::KafkaError> {
let mut tpl = rdkafka::TopicPartitionList::new();
tpl.add_partition_offset(topic, partition_id, rdkafka::Offset::Offset(offset))?;
self.client
.delete_records(&tpl, &rdkafka::admin::AdminOptions::new())
.await
.map(|r| r.elements()[0].error())
}
}