rdkafka_wrap/
wrap_client.rs1use crate::wrap_ext::SafeAdminClient;
2use crate::{AdminClientExt, GroupID, KWConsumer, KWProducer, KWResult, LogWrapExt, TopicName};
3use rdkafka::admin::{AdminClient, NewTopic};
4use rdkafka::client::DefaultClientContext;
5use rdkafka::config::RDKafkaLogLevel;
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8
9pub struct KWClient {
10 pub admin_client: SafeAdminClient,
11 pub kw_producer: KWProducer,
12 pub kw_consumer: BTreeMap<GroupID, KWConsumer>,
13 log_level: RDKafkaLogLevel,
14 pub brokers: String,
15 pub create_topic_conf: HashMap<TopicName, NewTopic<'static>>,
16 pub default_topic_conf: Option<NewTopic<'static>>,
17}
18
19impl KWClient {
20 pub fn new<B>(
21 brokers: B,
22 kw_producer: KWProducer,
23 kw_consumer: KWConsumer,
24 log_level: Option<RDKafkaLogLevel>,
25 ) -> KWResult<Self>
26 where
27 B: AsRef<str>,
28 {
29 let log_level = log_level.get_or_init();
30 let brokers = brokers.as_ref();
31 let mut consumer = BTreeMap::default();
32 consumer.insert(kw_consumer.get_group_id().to_string(), kw_consumer);
33
34 let admin_client =
35 (brokers, log_level).create_admin_client::<AdminClient<DefaultClientContext>>()?;
36 let client = Self {
37 admin_client: Arc::new(admin_client),
38 kw_producer,
39 kw_consumer: consumer,
40 create_topic_conf: Default::default(),
41 log_level,
42 brokers: brokers.to_string(),
43 default_topic_conf: None,
44 };
45 Ok(client)
46 }
47
48 pub fn set_consumer(mut self, kw_consumer: KWConsumer) -> Self {
49 self.kw_consumer
50 .insert(kw_consumer.get_group_id().to_string(), kw_consumer);
51 self
52 }
53
54 pub fn set_log_level(mut self, log_level: RDKafkaLogLevel) -> Self {
55 self.kw_producer.conf.log_level = Some(log_level);
56 for consumer in self.kw_consumer.values_mut() {
57 consumer.conf.log_level = Some(log_level);
58 }
59 self.log_level = log_level;
60 self
61 }
62
63 pub fn get_log_level(&self) -> RDKafkaLogLevel {
64 self.log_level
65 }
66}