1use std::collections::BTreeMap;
9
10use crabka_protocol::owned::{
11 describe_configs_request::{DescribeConfigsRequest, DescribeConfigsResource},
12 describe_configs_response::DescribeConfigsResourceResult,
13 incremental_alter_configs_request::{
14 AlterConfigsResource, AlterableConfig, IncrementalAlterConfigsRequest,
15 },
16};
17
18use crate::{AdminClient, AdminError, KafkaError, kafka_error_name};
19
20const DYNAMIC_TOPIC_CONFIG_SOURCE: i8 = 1;
23
24const RESOURCE_TYPE_TOPIC: i8 = 2;
26
27#[derive(Debug, Clone, Default)]
29pub struct TopicConfigOverrides {
30 pub topic: String,
31 pub overrides: BTreeMap<String, String>,
32}
33
34#[derive(Debug, Clone)]
35pub enum IncrementalAlterOp {
36 Set {
37 topic: String,
38 key: String,
39 value: String,
40 },
41 Delete {
42 topic: String,
43 key: String,
44 },
45}
46
47#[derive(Debug, Clone)]
48pub struct AlterConfigsOutcome {
49 pub topic: String,
50 pub error: Option<KafkaError>,
51}
52
53pub(crate) fn filter_dynamic_overrides(
62 topic: String,
63 entries: impl IntoIterator<Item = DescribeConfigsResourceResult>,
64) -> TopicConfigOverrides {
65 let mut overrides = BTreeMap::new();
66 for entry in entries {
67 if entry.config_source == DYNAMIC_TOPIC_CONFIG_SOURCE
68 && let Some(value) = entry.value
69 {
70 overrides.insert(entry.name, value);
71 }
72 }
73 TopicConfigOverrides { topic, overrides }
74}
75
76pub(crate) fn parse_describe_configs_resource(
81 r: crabka_protocol::owned::describe_configs_response::DescribeConfigsResult,
82) -> Result<TopicConfigOverrides, AdminError> {
83 if r.error_code != 0 {
84 return Err(AdminError::Broker {
85 api: "DescribeConfigs",
86 code: r.error_code,
87 name: kafka_error_name(r.error_code),
88 message: r.error_message,
89 });
90 }
91 Ok(filter_dynamic_overrides(r.resource_name, r.configs))
92}
93
94pub(crate) fn parse_incremental_alter_outcomes(
97 resp: <IncrementalAlterConfigsRequest as crabka_protocol::ProtocolRequest>::Response,
98) -> Vec<AlterConfigsOutcome> {
99 resp.responses
100 .into_iter()
101 .map(|r| AlterConfigsOutcome {
102 topic: r.resource_name,
103 error: if r.error_code == 0 {
104 None
105 } else {
106 Some(KafkaError {
107 code: r.error_code,
108 name: kafka_error_name(r.error_code),
109 message: r.error_message,
110 })
111 },
112 })
113 .collect()
114}
115
116impl AdminClient {
117 pub async fn describe_configs(
118 &mut self,
119 topics: &[&str],
120 ) -> Result<Vec<TopicConfigOverrides>, AdminError> {
121 let req = DescribeConfigsRequest {
122 resources: topics
123 .iter()
124 .map(|t| DescribeConfigsResource {
125 resource_type: RESOURCE_TYPE_TOPIC,
126 resource_name: (*t).to_string(),
127 configuration_keys: None,
128 ..Default::default()
129 })
130 .collect(),
131 include_synonyms: false,
132 include_documentation: false,
133 ..Default::default()
134 };
135 let resp = self.conn.send(req).await?;
136 let mut out = Vec::with_capacity(resp.results.len());
137 for r in resp.results {
138 out.push(parse_describe_configs_resource(r)?);
139 }
140 Ok(out)
141 }
142
143 pub async fn incremental_alter_configs(
144 &mut self,
145 ops: &[IncrementalAlterOp],
146 ) -> Result<Vec<AlterConfigsOutcome>, AdminError> {
147 let mut by_topic: BTreeMap<String, Vec<AlterableConfig>> = BTreeMap::new();
149 for op in ops {
150 match op {
151 IncrementalAlterOp::Set { topic, key, value } => {
152 by_topic
153 .entry(topic.clone())
154 .or_default()
155 .push(AlterableConfig {
156 name: key.clone(),
157 config_operation: 0, value: Some(value.clone()),
159 ..Default::default()
160 });
161 }
162 IncrementalAlterOp::Delete { topic, key } => {
163 by_topic
164 .entry(topic.clone())
165 .or_default()
166 .push(AlterableConfig {
167 name: key.clone(),
168 config_operation: 1, value: None,
170 ..Default::default()
171 });
172 }
173 }
174 }
175 let req = IncrementalAlterConfigsRequest {
176 resources: by_topic
177 .into_iter()
178 .map(|(topic, configs)| AlterConfigsResource {
179 resource_type: RESOURCE_TYPE_TOPIC,
180 resource_name: topic,
181 configs,
182 ..Default::default()
183 })
184 .collect(),
185 validate_only: false,
186 ..Default::default()
187 };
188 let resp = self.conn.send(req).await?;
189 Ok(parse_incremental_alter_outcomes(resp))
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use super::*;
196 use assert2::assert;
197
198 #[test]
199 fn dynamic_topic_config_source_is_one() {
200 assert!(DYNAMIC_TOPIC_CONFIG_SOURCE == 1);
203 }
204
205 #[test]
206 fn resource_type_topic_is_two() {
207 assert!(RESOURCE_TYPE_TOPIC == 2);
208 }
209
210 #[test]
217 fn describe_configs_filters_to_dynamic_topic() {
218 let entries = vec![
219 DescribeConfigsResourceResult {
220 name: "retention.ms".into(),
221 value: Some("60000".into()),
222 config_source: 1, ..Default::default()
224 },
225 DescribeConfigsResourceResult {
226 name: "log.dirs".into(),
227 value: Some("/data".into()),
228 config_source: 4, ..Default::default()
230 },
231 DescribeConfigsResourceResult {
232 name: "cleanup.policy".into(),
233 value: Some("compact".into()),
234 config_source: 1,
235 ..Default::default()
236 },
237 DescribeConfigsResourceResult {
239 name: "segment.bytes".into(),
240 value: None,
241 config_source: 1,
242 ..Default::default()
243 },
244 DescribeConfigsResourceResult {
246 name: "max.message.bytes".into(),
247 value: Some("1048576".into()),
248 config_source: 5,
249 ..Default::default()
250 },
251 ];
252 let r = filter_dynamic_overrides("foo".into(), entries);
253 assert!(r.topic == "foo");
254 assert!(r.overrides.len() == 2);
255 assert!(r.overrides.get("retention.ms").map(String::as_str) == Some("60000"));
256 assert!(r.overrides.get("cleanup.policy").map(String::as_str) == Some("compact"));
257 assert!(!r.overrides.contains_key("log.dirs"));
258 assert!(!r.overrides.contains_key("segment.bytes"));
259 assert!(!r.overrides.contains_key("max.message.bytes"));
260 }
261
262 #[test]
271 fn parse_describe_configs_resource_returns_overrides_on_success() {
272 use crabka_protocol::owned::describe_configs_response::DescribeConfigsResult;
273 let r = DescribeConfigsResult {
274 error_code: 0,
275 error_message: None,
276 resource_type: RESOURCE_TYPE_TOPIC,
277 resource_name: "foo".into(),
278 configs: vec![DescribeConfigsResourceResult {
279 name: "retention.ms".into(),
280 value: Some("60000".into()),
281 config_source: 1,
282 ..Default::default()
283 }],
284 ..Default::default()
285 };
286 let parsed = parse_describe_configs_resource(r).expect("Ok branch");
287 assert!(parsed.topic == "foo");
288 assert!(parsed.overrides.get("retention.ms").map(String::as_str) == Some("60000"));
289 }
290
291 #[test]
292 fn parse_describe_configs_resource_returns_broker_error_when_error_code_set() {
293 use crabka_protocol::owned::describe_configs_response::DescribeConfigsResult;
294 let r = DescribeConfigsResult {
295 error_code: 3, error_message: Some("nope".into()),
297 resource_type: RESOURCE_TYPE_TOPIC,
298 resource_name: "missing".into(),
299 configs: Vec::new(),
300 ..Default::default()
301 };
302 let err = parse_describe_configs_resource(r).expect_err("Err branch");
303 match err {
304 AdminError::Broker {
305 api,
306 code,
307 name,
308 message,
309 } => {
310 assert!(api == "DescribeConfigs");
311 assert!(code == 3);
312 assert!(name == "UNKNOWN_TOPIC_OR_PARTITION");
313 assert!(message.as_deref() == Some("nope"));
314 }
315 other => panic!("expected Broker, got {other:?}"),
316 }
317 }
318
319 #[test]
322 fn parse_incremental_alter_outcomes_success() {
323 use crabka_protocol::owned::incremental_alter_configs_response::{
324 AlterConfigsResourceResponse, IncrementalAlterConfigsResponse,
325 };
326 let resp = IncrementalAlterConfigsResponse {
327 responses: vec![AlterConfigsResourceResponse {
328 error_code: 0,
329 error_message: None,
330 resource_type: RESOURCE_TYPE_TOPIC,
331 resource_name: "foo".into(),
332 ..Default::default()
333 }],
334 ..Default::default()
335 };
336 let outs = parse_incremental_alter_outcomes(resp);
337 assert!(outs.len() == 1);
338 assert!(outs[0].topic == "foo");
339 assert!(outs[0].error.is_none());
340 }
341
342 #[test]
343 fn parse_incremental_alter_outcomes_carries_errors() {
344 use crabka_protocol::owned::incremental_alter_configs_response::{
345 AlterConfigsResourceResponse, IncrementalAlterConfigsResponse,
346 };
347 let resp = IncrementalAlterConfigsResponse {
348 responses: vec![
349 AlterConfigsResourceResponse {
350 error_code: 0,
351 error_message: None,
352 resource_type: RESOURCE_TYPE_TOPIC,
353 resource_name: "ok".into(),
354 ..Default::default()
355 },
356 AlterConfigsResourceResponse {
357 error_code: 40, error_message: Some("bad value".into()),
359 resource_type: RESOURCE_TYPE_TOPIC,
360 resource_name: "bad".into(),
361 ..Default::default()
362 },
363 ],
364 ..Default::default()
365 };
366 let outs = parse_incremental_alter_outcomes(resp);
367 assert!(outs.len() == 2);
368 assert!(outs[0].error.is_none());
369 let err = outs[1].error.as_ref().expect("error expected");
370 assert!(err.code == 40);
371 assert!(err.name == "INVALID_CONFIG");
372 assert!(err.message.as_deref() == Some("bad value"));
373 }
374}