rdkafka_wrap/
wrap_client.rs

1use crate::wrap_ext::SafeAdminClient;
2use crate::{AdminClientExt, GroupID, KWConsumer, KWProducer, KWResult, LogWrapExt, TopicName};
3use rdkafka::admin::{AdminClient, NewTopic};
4use rdkafka::client::DefaultClientContext;
5use rdkafka::config::RDKafkaLogLevel;
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8
9pub struct KWClient {
10    pub admin_client: SafeAdminClient,
11    pub kw_producer: KWProducer,
12    pub kw_consumer: BTreeMap<GroupID, KWConsumer>,
13    log_level: RDKafkaLogLevel,
14    pub brokers: String,
15    pub create_topic_conf: HashMap<TopicName, NewTopic<'static>>,
16    pub default_topic_conf: Option<NewTopic<'static>>,
17}
18
19impl KWClient {
20    pub fn new<B>(
21        brokers: B,
22        kw_producer: KWProducer,
23        kw_consumer: KWConsumer,
24        log_level: Option<RDKafkaLogLevel>,
25    ) -> KWResult<Self>
26    where
27        B: AsRef<str>,
28    {
29        let log_level = log_level.get_or_init();
30        let brokers = brokers.as_ref();
31        let mut consumer = BTreeMap::default();
32        consumer.insert(kw_consumer.get_group_id().to_string(), kw_consumer);
33
34        let admin_client =
35            (brokers, log_level).create_admin_client::<AdminClient<DefaultClientContext>>()?;
36        let client = Self {
37            admin_client: Arc::new(admin_client),
38            kw_producer,
39            kw_consumer: consumer,
40            create_topic_conf: Default::default(),
41            log_level,
42            brokers: brokers.to_string(),
43            default_topic_conf: None,
44        };
45        Ok(client)
46    }
47
48    pub fn set_consumer(mut self, kw_consumer: KWConsumer) -> Self {
49        self.kw_consumer
50            .insert(kw_consumer.get_group_id().to_string(), kw_consumer);
51        self
52    }
53
54    pub fn set_log_level(mut self, log_level: RDKafkaLogLevel) -> Self {
55        self.kw_producer.conf.log_level = Some(log_level);
56        for consumer in self.kw_consumer.values_mut() {
57            consumer.conf.log_level = Some(log_level);
58        }
59        self.log_level = log_level;
60        self
61    }
62
63    pub fn get_log_level(&self) -> RDKafkaLogLevel {
64        self.log_level
65    }
66}