rdkafka_wrap/
wrap_ext.rs

1use crate::configuration::all::BOOTSTRAP_SERVERS;
2use crate::wrap_err::KWResult;
3use crate::wrap_metadata::{MetadataTopicWrap, MetadataWrap};
4use crate::{KWClient, KWConsumer, KWProducer, LogWrapExt};
5use anyhow::anyhow;
6use rdkafka::admin::{AdminClient, AdminOptions, NewTopic};
7use rdkafka::client::DefaultClientContext;
8use rdkafka::config::{FromClientConfig, RDKafkaLogLevel};
9use rdkafka::error::RDKafkaErrorCode;
10use rdkafka::util::Timeout;
11use rdkafka::{ClientConfig, ClientContext};
12use std::sync::Arc;
13use std::time::Duration;
14
15pub(crate) type SafeAdminClient = Arc<AdminClient<DefaultClientContext>>;
16pub type TopicName = String;
17pub type Brokers<'a> = &'a str;
18
19pub mod log_wrap {
20    use rdkafka::config::RDKafkaLogLevel;
21
22    pub trait LogWrapExt: Sized {
23        type Item;
24        fn default() -> Self::Item;
25        fn get_or_init(&self) -> Self::Item;
26    }
27
28    impl LogWrapExt for RDKafkaLogLevel {
29        type Item = Self;
30
31        fn default() -> Self::Item {
32            RDKafkaLogLevel::Warning
33        }
34
35        fn get_or_init(&self) -> Self::Item {
36            *self
37        }
38    }
39
40    impl LogWrapExt for Option<RDKafkaLogLevel> {
41        type Item = RDKafkaLogLevel;
42
43        fn default() -> Self::Item {
44            RDKafkaLogLevel::default()
45        }
46
47        fn get_or_init(&self) -> Self::Item {
48            self.unwrap_or(RDKafkaLogLevel::default())
49        }
50    }
51}
52
53pub trait AdminClientExt {
54    fn create_admin_client<T: FromClientConfig>(&self) -> KWResult<T> {
55        let admin_client = ClientConfig::new()
56            .set(BOOTSTRAP_SERVERS, self.get_brokers())
57            .set_log_level(self.get_log_level())
58            .create()?;
59        Ok(admin_client)
60    }
61
62    fn get_brokers(&self) -> &str;
63    fn get_log_level(&self) -> RDKafkaLogLevel;
64}
65
66impl<'a> AdminClientExt for (Brokers<'a>, RDKafkaLogLevel) {
67    fn get_brokers(&self) -> &str {
68        self.0
69    }
70
71    fn get_log_level(&self) -> RDKafkaLogLevel {
72        self.1
73    }
74}
75
76#[async_trait::async_trait]
77pub trait OptionExt {
78    type AdminClient: FromClientConfig;
79
80    fn get_new_topics(&self) -> KWResult<Vec<NewTopic>> {
81        Ok(vec![])
82    }
83
84    async fn create_topic(&self) -> KWResult<()> {
85        let admin_client = self.get_admin_client().await?;
86
87        let new_topics = self.get_new_topics()?;
88        if new_topics.is_empty() {
89            debug!("topics is empty.skip");
90            return Ok(());
91        }
92        let topics_result = admin_client
93            .create_topics(new_topics.as_slice(), &AdminOptions::new())
94            .await?;
95
96        let mut err_msg = vec![];
97        for ret in topics_result {
98            match ret {
99                Ok(_) => {}
100                Err((topic, err_code)) => {
101                    // topic already exists
102                    if let RDKafkaErrorCode::TopicAlreadyExists = err_code {
103                        warn!("create kafka topic {}: topic already exists", topic);
104                    } else {
105                        err_msg.push(format!("topic {} -> error code: {}", topic, err_code))
106                    }
107                }
108            }
109        }
110
111        if err_msg.is_empty() {
112            return Ok(());
113        }
114        Err(anyhow!("failed to create kafka:{}", err_msg.join(",")).into())
115    }
116
117    fn create_admin_client(&self) -> KWResult<Self::AdminClient> {
118        let ext = (self.get_brokers(), self.get_log_level());
119        ext.create_admin_client()
120    }
121
122    fn get_brokers(&self) -> &str;
123    fn get_log_level(&self) -> RDKafkaLogLevel;
124
125    async fn get_admin_client(&self) -> KWResult<SafeAdminClient>;
126
127    async fn get_topics<C: ClientContext>(&self) -> KWResult<Vec<MetadataTopicWrap>> {
128        let metadata: MetadataWrap = self
129            .get_admin_client()
130            .await?
131            .inner()
132            .fetch_metadata(None, Timeout::from(Duration::from_secs(5)))?
133            .into();
134        Ok(metadata.topics)
135    }
136}
137
138#[async_trait::async_trait]
139impl OptionExt for KWProducer {
140    type AdminClient = AdminClient<DefaultClientContext>;
141
142    fn get_new_topics(&self) -> KWResult<Vec<NewTopic>> {
143        Ok(vec![self.new_topic()?])
144    }
145
146    fn get_brokers(&self) -> &str {
147        self.conf.brokers.as_str()
148    }
149
150    fn get_log_level(&self) -> RDKafkaLogLevel {
151        self.conf.log_level.get_or_init()
152    }
153
154    async fn get_admin_client(&self) -> KWResult<SafeAdminClient> {
155        let mut guard = self.admin_client.lock().await;
156        if guard.is_none() {
157            let client = self.create_admin_client()?;
158            *guard = Some(Arc::new(client));
159        }
160
161        Ok(guard.clone().unwrap())
162    }
163}
164
165#[async_trait::async_trait]
166impl OptionExt for KWConsumer {
167    type AdminClient = AdminClient<DefaultClientContext>;
168
169    fn get_brokers(&self) -> &str {
170        self.conf.brokers.as_str()
171    }
172
173    fn get_log_level(&self) -> RDKafkaLogLevel {
174        self.conf.log_level.get_or_init()
175    }
176
177    async fn get_admin_client(&self) -> KWResult<SafeAdminClient> {
178        let mut guard = self.admin_client.lock().await;
179        if guard.is_none() {
180            let client = self.create_admin_client()?;
181            *guard = Some(Arc::new(client));
182        }
183
184        Ok(guard.clone().unwrap())
185    }
186}
187
188#[async_trait::async_trait]
189impl OptionExt for KWClient {
190    type AdminClient = AdminClient<DefaultClientContext>;
191
192    fn get_brokers(&self) -> &str {
193        self.brokers.as_str()
194    }
195
196    fn get_log_level(&self) -> RDKafkaLogLevel {
197        self.get_log_level()
198    }
199
200    async fn get_admin_client(&self) -> KWResult<SafeAdminClient> {
201        Ok(self.admin_client.clone())
202    }
203}