Skip to main content

rskafka/client/
controller.rs

1use async_trait::async_trait;
2use std::ops::ControlFlow;
3use std::sync::Arc;
4use tokio::sync::Mutex;
5use tracing::{debug, error, info};
6
7use crate::{
8    backoff::{Backoff, BackoffConfig, ErrorOrThrottle},
9    client::{Error, Result},
10    connection::{
11        BrokerCache, BrokerCacheGeneration, BrokerConnection, BrokerConnector, MessengerTransport,
12        MetadataLookupMode,
13    },
14    messenger::RequestError,
15    protocol::{
16        error::Error as ProtocolError,
17        messages::{CreateTopicRequest, CreateTopicsRequest, DeleteTopicsRequest},
18        primitives::{Array, Int16, Int32, String_},
19    },
20    throttle::maybe_throttle,
21    validation::ExactlyOne,
22};
23
24use super::error::RequestContext;
25
26#[derive(Debug)]
27pub struct ControllerClient {
28    brokers: Arc<BrokerConnector>,
29
30    backoff_config: BackoffConfig,
31
32    /// Current broker connection if any
33    current_broker: Mutex<(Option<BrokerConnection>, BrokerCacheGeneration)>,
34}
35
36impl ControllerClient {
37    pub(super) fn new(brokers: Arc<BrokerConnector>) -> Self {
38        Self {
39            brokers,
40            backoff_config: Default::default(),
41            current_broker: Mutex::new((None, BrokerCacheGeneration::START)),
42        }
43    }
44
45    /// Create a topic
46    pub async fn create_topic(
47        &self,
48        name: impl Into<String> + Send,
49        num_partitions: i32,
50        replication_factor: i16,
51        timeout_ms: i32,
52    ) -> Result<()> {
53        let request = &CreateTopicsRequest {
54            topics: vec![CreateTopicRequest {
55                name: String_(name.into()),
56                num_partitions: Int32(num_partitions),
57                replication_factor: Int16(replication_factor),
58                assignments: vec![],
59                configs: vec![],
60                tagged_fields: None,
61            }],
62            timeout_ms: Int32(timeout_ms),
63            validate_only: None,
64            tagged_fields: None,
65        };
66
67        maybe_retry(&self.backoff_config, self, "create_topic", || async move {
68            let (broker, gen) = self
69                .get()
70                .await
71                .map_err(|e| ErrorOrThrottle::Error((e, None)))?;
72            let response = broker
73                .request(request)
74                .await
75                .map_err(|e| ErrorOrThrottle::Error((e.into(), Some(gen))))?;
76
77            maybe_throttle(response.throttle_time_ms)?;
78
79            let topic = response
80                .topics
81                .exactly_one()
82                .map_err(|e| ErrorOrThrottle::Error((Error::exactly_one_topic(e), Some(gen))))?;
83
84            match topic.error {
85                None => Ok(()),
86                Some(protocol_error) => Err(ErrorOrThrottle::Error((
87                    Error::ServerError {
88                        protocol_error,
89                        error_message: topic.error_message.and_then(|s| s.0),
90                        request: RequestContext::Topic(topic.name.0),
91                        response: None,
92                        is_virtual: false,
93                    },
94                    Some(gen),
95                ))),
96            }
97        })
98        .await?;
99
100        // Refresh the cache now there is definitely a new topic to observe.
101        let _ = self.brokers.refresh_metadata().await;
102
103        Ok(())
104    }
105
106    /// Delete a topic
107    pub async fn delete_topic(
108        &self,
109        name: impl Into<String> + Send,
110        timeout_ms: i32,
111    ) -> Result<()> {
112        let request = &DeleteTopicsRequest {
113            topic_names: Array(Some(vec![String_(name.into())])),
114            timeout_ms: Int32(timeout_ms),
115            tagged_fields: None,
116        };
117
118        maybe_retry(&self.backoff_config, self, "delete_topic", || async move {
119            let (broker, gen) = self
120                .get()
121                .await
122                .map_err(|e| ErrorOrThrottle::Error((e, None)))?;
123            let response = broker
124                .request(request)
125                .await
126                .map_err(|e| ErrorOrThrottle::Error((e.into(), Some(gen))))?;
127
128            maybe_throttle(response.throttle_time_ms)?;
129
130            let topic = response
131                .responses
132                .exactly_one()
133                .map_err(|e| ErrorOrThrottle::Error((Error::exactly_one_topic(e), Some(gen))))?;
134
135            match topic.error {
136                None => Ok(()),
137                Some(protocol_error) => Err(ErrorOrThrottle::Error((
138                    Error::ServerError {
139                        protocol_error,
140                        error_message: topic.error_message.and_then(|s| s.0),
141                        request: RequestContext::Topic(topic.name.0),
142                        response: None,
143                        is_virtual: false,
144                    },
145                    Some(gen),
146                ))),
147            }
148        })
149        .await?;
150
151        // Refresh the cache now there is definitely a new topic to observe.
152        let _ = self.brokers.refresh_metadata().await;
153
154        Ok(())
155    }
156
157    /// Retrieve the broker ID of the controller
158    async fn get_controller_id(&self) -> Result<i32> {
159        // Request an uncached, fresh copy of the metadata.
160        let (metadata, _gen) = self
161            .brokers
162            .request_metadata(&MetadataLookupMode::ArbitraryBroker, Some(vec![]))
163            .await?;
164
165        let controller_id = metadata
166            .controller_id
167            .ok_or_else(|| Error::InvalidResponse("Leader is NULL".to_owned()))?
168            .0;
169
170        Ok(controller_id)
171    }
172}
173
174/// Caches the cluster controller broker.
175#[async_trait]
176impl BrokerCache for &ControllerClient {
177    type R = MessengerTransport;
178    type E = Error;
179
180    async fn get(&self) -> Result<(Arc<Self::R>, BrokerCacheGeneration)> {
181        let mut current_broker = self.current_broker.lock().await;
182        if let Some(broker) = &current_broker.0 {
183            return Ok((Arc::clone(broker), current_broker.1));
184        }
185
186        info!("Creating new controller broker connection",);
187
188        let controller_id = self.get_controller_id().await?;
189        let broker = self.brokers.connect(controller_id).await?.ok_or_else(|| {
190            Error::InvalidResponse(format!(
191                "Controller {} not found in metadata response",
192                controller_id
193            ))
194        })?;
195
196        current_broker.0 = Some(Arc::clone(&broker));
197        current_broker.1.bump();
198
199        Ok((broker, current_broker.1))
200    }
201
202    async fn invalidate(&self, reason: &'static str, gen: BrokerCacheGeneration) {
203        let mut guard = self.current_broker.lock().await;
204
205        if guard.1 != gen {
206            // stale request
207            debug!(
208                reason,
209                current_gen = guard.1.get(),
210                request_gen = gen.get(),
211                "stale invalidation request for arbitrary broker cache",
212            );
213            return;
214        }
215
216        info!(reason, "Invalidating cached controller broker",);
217        guard.0.take();
218    }
219}
220
221/// Takes a `request_name` and a function yielding a fallible future
222/// and handles certain classes of error
223async fn maybe_retry<B, R, F, T>(
224    backoff_config: &BackoffConfig,
225    broker_cache: B,
226    request_name: &str,
227    f: R,
228) -> Result<T>
229where
230    B: BrokerCache,
231    R: (Fn() -> F) + Send + Sync,
232    F: std::future::Future<
233            Output = Result<T, ErrorOrThrottle<(Error, Option<BrokerCacheGeneration>)>>,
234        > + Send,
235{
236    let mut backoff = Backoff::new(backoff_config);
237
238    backoff
239        .retry_with_backoff(request_name, || async {
240            let (error, cache_gen) = match f().await {
241                Ok(v) => {
242                    return ControlFlow::Break(Ok(v));
243                }
244                Err(ErrorOrThrottle::Throttle(t)) => {
245                    return ControlFlow::Continue(ErrorOrThrottle::Throttle(t));
246                }
247                Err(ErrorOrThrottle::Error(e)) => e,
248            };
249
250            match error {
251                // broken connection
252                Error::Request(RequestError::Poisoned(_) | RequestError::IO(_))
253                | Error::Connection(_) => {
254                    if let Some(cache_gen) = cache_gen {
255                        broker_cache
256                            .invalidate("controller client: connection broken", cache_gen)
257                            .await
258                    }
259                }
260
261                // our broker is actually not the controller
262                Error::ServerError {
263                    protocol_error: ProtocolError::NotController,
264                    ..
265                } => {
266                    if let Some(cache_gen) = cache_gen {
267                        broker_cache
268                            .invalidate(
269                                "controller client: server error: not controller",
270                                cache_gen,
271                            )
272                            .await;
273                    }
274                }
275
276                // fatal
277                _ => {
278                    error!(
279                        e=%error,
280                        request_name,
281                        "request encountered fatal error",
282                    );
283                    return ControlFlow::Break(Err(error));
284                }
285            }
286            ControlFlow::Continue(ErrorOrThrottle::Error(error))
287        })
288        .await
289        .map_err(Error::RetryFailed)?
290}