1use async_trait::async_trait;
2use std::ops::ControlFlow;
3use std::sync::Arc;
4use tokio::sync::Mutex;
5use tracing::{debug, error, info};
6
7use crate::{
8 backoff::{Backoff, BackoffConfig, ErrorOrThrottle},
9 client::{Error, Result},
10 connection::{
11 BrokerCache, BrokerCacheGeneration, BrokerConnection, BrokerConnector, MessengerTransport,
12 MetadataLookupMode,
13 },
14 messenger::RequestError,
15 protocol::{
16 error::Error as ProtocolError,
17 messages::{CreateTopicRequest, CreateTopicsRequest, DeleteTopicsRequest},
18 primitives::{Array, Int16, Int32, String_},
19 },
20 throttle::maybe_throttle,
21 validation::ExactlyOne,
22};
23
24use super::error::RequestContext;
25
26#[derive(Debug)]
27pub struct ControllerClient {
28 brokers: Arc<BrokerConnector>,
29
30 backoff_config: BackoffConfig,
31
32 current_broker: Mutex<(Option<BrokerConnection>, BrokerCacheGeneration)>,
34}
35
36impl ControllerClient {
37 pub(super) fn new(brokers: Arc<BrokerConnector>) -> Self {
38 Self {
39 brokers,
40 backoff_config: Default::default(),
41 current_broker: Mutex::new((None, BrokerCacheGeneration::START)),
42 }
43 }
44
45 pub async fn create_topic(
47 &self,
48 name: impl Into<String> + Send,
49 num_partitions: i32,
50 replication_factor: i16,
51 timeout_ms: i32,
52 ) -> Result<()> {
53 let request = &CreateTopicsRequest {
54 topics: vec![CreateTopicRequest {
55 name: String_(name.into()),
56 num_partitions: Int32(num_partitions),
57 replication_factor: Int16(replication_factor),
58 assignments: vec![],
59 configs: vec![],
60 tagged_fields: None,
61 }],
62 timeout_ms: Int32(timeout_ms),
63 validate_only: None,
64 tagged_fields: None,
65 };
66
67 maybe_retry(&self.backoff_config, self, "create_topic", || async move {
68 let (broker, gen) = self
69 .get()
70 .await
71 .map_err(|e| ErrorOrThrottle::Error((e, None)))?;
72 let response = broker
73 .request(request)
74 .await
75 .map_err(|e| ErrorOrThrottle::Error((e.into(), Some(gen))))?;
76
77 maybe_throttle(response.throttle_time_ms)?;
78
79 let topic = response
80 .topics
81 .exactly_one()
82 .map_err(|e| ErrorOrThrottle::Error((Error::exactly_one_topic(e), Some(gen))))?;
83
84 match topic.error {
85 None => Ok(()),
86 Some(protocol_error) => Err(ErrorOrThrottle::Error((
87 Error::ServerError {
88 protocol_error,
89 error_message: topic.error_message.and_then(|s| s.0),
90 request: RequestContext::Topic(topic.name.0),
91 response: None,
92 is_virtual: false,
93 },
94 Some(gen),
95 ))),
96 }
97 })
98 .await?;
99
100 let _ = self.brokers.refresh_metadata().await;
102
103 Ok(())
104 }
105
106 pub async fn delete_topic(
108 &self,
109 name: impl Into<String> + Send,
110 timeout_ms: i32,
111 ) -> Result<()> {
112 let request = &DeleteTopicsRequest {
113 topic_names: Array(Some(vec![String_(name.into())])),
114 timeout_ms: Int32(timeout_ms),
115 tagged_fields: None,
116 };
117
118 maybe_retry(&self.backoff_config, self, "delete_topic", || async move {
119 let (broker, gen) = self
120 .get()
121 .await
122 .map_err(|e| ErrorOrThrottle::Error((e, None)))?;
123 let response = broker
124 .request(request)
125 .await
126 .map_err(|e| ErrorOrThrottle::Error((e.into(), Some(gen))))?;
127
128 maybe_throttle(response.throttle_time_ms)?;
129
130 let topic = response
131 .responses
132 .exactly_one()
133 .map_err(|e| ErrorOrThrottle::Error((Error::exactly_one_topic(e), Some(gen))))?;
134
135 match topic.error {
136 None => Ok(()),
137 Some(protocol_error) => Err(ErrorOrThrottle::Error((
138 Error::ServerError {
139 protocol_error,
140 error_message: topic.error_message.and_then(|s| s.0),
141 request: RequestContext::Topic(topic.name.0),
142 response: None,
143 is_virtual: false,
144 },
145 Some(gen),
146 ))),
147 }
148 })
149 .await?;
150
151 let _ = self.brokers.refresh_metadata().await;
153
154 Ok(())
155 }
156
157 async fn get_controller_id(&self) -> Result<i32> {
159 let (metadata, _gen) = self
161 .brokers
162 .request_metadata(&MetadataLookupMode::ArbitraryBroker, Some(vec![]))
163 .await?;
164
165 let controller_id = metadata
166 .controller_id
167 .ok_or_else(|| Error::InvalidResponse("Leader is NULL".to_owned()))?
168 .0;
169
170 Ok(controller_id)
171 }
172}
173
174#[async_trait]
176impl BrokerCache for &ControllerClient {
177 type R = MessengerTransport;
178 type E = Error;
179
180 async fn get(&self) -> Result<(Arc<Self::R>, BrokerCacheGeneration)> {
181 let mut current_broker = self.current_broker.lock().await;
182 if let Some(broker) = ¤t_broker.0 {
183 return Ok((Arc::clone(broker), current_broker.1));
184 }
185
186 info!("Creating new controller broker connection",);
187
188 let controller_id = self.get_controller_id().await?;
189 let broker = self.brokers.connect(controller_id).await?.ok_or_else(|| {
190 Error::InvalidResponse(format!(
191 "Controller {} not found in metadata response",
192 controller_id
193 ))
194 })?;
195
196 current_broker.0 = Some(Arc::clone(&broker));
197 current_broker.1.bump();
198
199 Ok((broker, current_broker.1))
200 }
201
202 async fn invalidate(&self, reason: &'static str, gen: BrokerCacheGeneration) {
203 let mut guard = self.current_broker.lock().await;
204
205 if guard.1 != gen {
206 debug!(
208 reason,
209 current_gen = guard.1.get(),
210 request_gen = gen.get(),
211 "stale invalidation request for arbitrary broker cache",
212 );
213 return;
214 }
215
216 info!(reason, "Invalidating cached controller broker",);
217 guard.0.take();
218 }
219}
220
221async fn maybe_retry<B, R, F, T>(
224 backoff_config: &BackoffConfig,
225 broker_cache: B,
226 request_name: &str,
227 f: R,
228) -> Result<T>
229where
230 B: BrokerCache,
231 R: (Fn() -> F) + Send + Sync,
232 F: std::future::Future<
233 Output = Result<T, ErrorOrThrottle<(Error, Option<BrokerCacheGeneration>)>>,
234 > + Send,
235{
236 let mut backoff = Backoff::new(backoff_config);
237
238 backoff
239 .retry_with_backoff(request_name, || async {
240 let (error, cache_gen) = match f().await {
241 Ok(v) => {
242 return ControlFlow::Break(Ok(v));
243 }
244 Err(ErrorOrThrottle::Throttle(t)) => {
245 return ControlFlow::Continue(ErrorOrThrottle::Throttle(t));
246 }
247 Err(ErrorOrThrottle::Error(e)) => e,
248 };
249
250 match error {
251 Error::Request(RequestError::Poisoned(_) | RequestError::IO(_))
253 | Error::Connection(_) => {
254 if let Some(cache_gen) = cache_gen {
255 broker_cache
256 .invalidate("controller client: connection broken", cache_gen)
257 .await
258 }
259 }
260
261 Error::ServerError {
263 protocol_error: ProtocolError::NotController,
264 ..
265 } => {
266 if let Some(cache_gen) = cache_gen {
267 broker_cache
268 .invalidate(
269 "controller client: server error: not controller",
270 cache_gen,
271 )
272 .await;
273 }
274 }
275
276 _ => {
278 error!(
279 e=%error,
280 request_name,
281 "request encountered fatal error",
282 );
283 return ControlFlow::Break(Err(error));
284 }
285 }
286 ControlFlow::Continue(ErrorOrThrottle::Error(error))
287 })
288 .await
289 .map_err(Error::RetryFailed)?
290}