1use crate::configuration::all::BOOTSTRAP_SERVERS;
2use crate::wrap_err::KWResult;
3use crate::wrap_metadata::{MetadataTopicWrap, MetadataWrap};
4use crate::{KWClient, KWConsumer, KWProducer, LogWrapExt};
5use anyhow::anyhow;
6use rdkafka::admin::{AdminClient, AdminOptions, NewTopic};
7use rdkafka::client::DefaultClientContext;
8use rdkafka::config::{FromClientConfig, RDKafkaLogLevel};
9use rdkafka::error::RDKafkaErrorCode;
10use rdkafka::util::Timeout;
11use rdkafka::{ClientConfig, ClientContext};
12use std::sync::Arc;
13use std::time::Duration;
14
15pub(crate) type SafeAdminClient = Arc<AdminClient<DefaultClientContext>>;
16pub type TopicName = String;
17pub type Brokers<'a> = &'a str;
18
19pub mod log_wrap {
20 use rdkafka::config::RDKafkaLogLevel;
21
22 pub trait LogWrapExt: Sized {
23 type Item;
24 fn default() -> Self::Item;
25 fn get_or_init(&self) -> Self::Item;
26 }
27
28 impl LogWrapExt for RDKafkaLogLevel {
29 type Item = Self;
30
31 fn default() -> Self::Item {
32 RDKafkaLogLevel::Warning
33 }
34
35 fn get_or_init(&self) -> Self::Item {
36 *self
37 }
38 }
39
40 impl LogWrapExt for Option<RDKafkaLogLevel> {
41 type Item = RDKafkaLogLevel;
42
43 fn default() -> Self::Item {
44 RDKafkaLogLevel::default()
45 }
46
47 fn get_or_init(&self) -> Self::Item {
48 self.unwrap_or(RDKafkaLogLevel::default())
49 }
50 }
51}
52
53pub trait AdminClientExt {
54 fn create_admin_client<T: FromClientConfig>(&self) -> KWResult<T> {
55 let admin_client = ClientConfig::new()
56 .set(BOOTSTRAP_SERVERS, self.get_brokers())
57 .set_log_level(self.get_log_level())
58 .create()?;
59 Ok(admin_client)
60 }
61
62 fn get_brokers(&self) -> &str;
63 fn get_log_level(&self) -> RDKafkaLogLevel;
64}
65
66impl<'a> AdminClientExt for (Brokers<'a>, RDKafkaLogLevel) {
67 fn get_brokers(&self) -> &str {
68 self.0
69 }
70
71 fn get_log_level(&self) -> RDKafkaLogLevel {
72 self.1
73 }
74}
75
76#[async_trait::async_trait]
77pub trait OptionExt {
78 type AdminClient: FromClientConfig;
79
80 fn get_new_topics(&self) -> KWResult<Vec<NewTopic>> {
81 Ok(vec![])
82 }
83
84 async fn create_topic(&self) -> KWResult<()> {
85 let admin_client = self.get_admin_client().await?;
86
87 let new_topics = self.get_new_topics()?;
88 if new_topics.is_empty() {
89 debug!("topics is empty.skip");
90 return Ok(());
91 }
92 let topics_result = admin_client
93 .create_topics(new_topics.as_slice(), &AdminOptions::new())
94 .await?;
95
96 let mut err_msg = vec![];
97 for ret in topics_result {
98 match ret {
99 Ok(_) => {}
100 Err((topic, err_code)) => {
101 if let RDKafkaErrorCode::TopicAlreadyExists = err_code {
103 warn!("create kafka topic {}: topic already exists", topic);
104 } else {
105 err_msg.push(format!("topic {} -> error code: {}", topic, err_code))
106 }
107 }
108 }
109 }
110
111 if err_msg.is_empty() {
112 return Ok(());
113 }
114 Err(anyhow!("failed to create kafka:{}", err_msg.join(",")).into())
115 }
116
117 fn create_admin_client(&self) -> KWResult<Self::AdminClient> {
118 let ext = (self.get_brokers(), self.get_log_level());
119 ext.create_admin_client()
120 }
121
122 fn get_brokers(&self) -> &str;
123 fn get_log_level(&self) -> RDKafkaLogLevel;
124
125 async fn get_admin_client(&self) -> KWResult<SafeAdminClient>;
126
127 async fn get_topics<C: ClientContext>(&self) -> KWResult<Vec<MetadataTopicWrap>> {
128 let metadata: MetadataWrap = self
129 .get_admin_client()
130 .await?
131 .inner()
132 .fetch_metadata(None, Timeout::from(Duration::from_secs(5)))?
133 .into();
134 Ok(metadata.topics)
135 }
136}
137
138#[async_trait::async_trait]
139impl OptionExt for KWProducer {
140 type AdminClient = AdminClient<DefaultClientContext>;
141
142 fn get_new_topics(&self) -> KWResult<Vec<NewTopic>> {
143 Ok(vec![self.new_topic()?])
144 }
145
146 fn get_brokers(&self) -> &str {
147 self.conf.brokers.as_str()
148 }
149
150 fn get_log_level(&self) -> RDKafkaLogLevel {
151 self.conf.log_level.get_or_init()
152 }
153
154 async fn get_admin_client(&self) -> KWResult<SafeAdminClient> {
155 let mut guard = self.admin_client.lock().await;
156 if guard.is_none() {
157 let client = self.create_admin_client()?;
158 *guard = Some(Arc::new(client));
159 }
160
161 Ok(guard.clone().unwrap())
162 }
163}
164
165#[async_trait::async_trait]
166impl OptionExt for KWConsumer {
167 type AdminClient = AdminClient<DefaultClientContext>;
168
169 fn get_brokers(&self) -> &str {
170 self.conf.brokers.as_str()
171 }
172
173 fn get_log_level(&self) -> RDKafkaLogLevel {
174 self.conf.log_level.get_or_init()
175 }
176
177 async fn get_admin_client(&self) -> KWResult<SafeAdminClient> {
178 let mut guard = self.admin_client.lock().await;
179 if guard.is_none() {
180 let client = self.create_admin_client()?;
181 *guard = Some(Arc::new(client));
182 }
183
184 Ok(guard.clone().unwrap())
185 }
186}
187
188#[async_trait::async_trait]
189impl OptionExt for KWClient {
190 type AdminClient = AdminClient<DefaultClientContext>;
191
192 fn get_brokers(&self) -> &str {
193 self.brokers.as_str()
194 }
195
196 fn get_log_level(&self) -> RDKafkaLogLevel {
197 self.get_log_level()
198 }
199
200 async fn get_admin_client(&self) -> KWResult<SafeAdminClient> {
201 Ok(self.admin_client.clone())
202 }
203}