Skip to main content

crabka_client_admin/
configs.rs

1//! Topic-config wrappers.
2//!
3//! `DescribeConfigs` filters to the subset of entries the user/operator
4//! has explicitly set (i.e. dynamic topic config, `ConfigSource =
5//! DYNAMIC_TOPIC_CONFIG = 1`), so the diff against `spec.config` is
6//! against overrides only — never broker defaults.
7
8use std::collections::BTreeMap;
9
10use crabka_protocol::owned::{
11    describe_configs_request::{DescribeConfigsRequest, DescribeConfigsResource},
12    describe_configs_response::DescribeConfigsResourceResult,
13    incremental_alter_configs_request::{
14        AlterConfigsResource, AlterableConfig, IncrementalAlterConfigsRequest,
15    },
16};
17
18use crate::{AdminClient, AdminError, KafkaError, kafka_error_name};
19
20/// `ConfigSource = DYNAMIC_TOPIC_CONFIG` per
21/// <https://kafka.apache.org/protocol#The_Messages_DescribeConfigs>.
22const DYNAMIC_TOPIC_CONFIG_SOURCE: i8 = 1;
23
24/// Kafka's `ConfigResource.type` for topic resources.
25const RESOURCE_TYPE_TOPIC: i8 = 2;
26
27/// Per-topic dynamic config overrides (broker defaults are filtered out).
28#[derive(Debug, Clone, Default)]
29pub struct TopicConfigOverrides {
30    pub topic: String,
31    pub overrides: BTreeMap<String, String>,
32}
33
34#[derive(Debug, Clone)]
35pub enum IncrementalAlterOp {
36    Set {
37        topic: String,
38        key: String,
39        value: String,
40    },
41    Delete {
42        topic: String,
43        key: String,
44    },
45}
46
47#[derive(Debug, Clone)]
48pub struct AlterConfigsOutcome {
49    pub topic: String,
50    pub error: Option<KafkaError>,
51}
52
53/// Filter a `DescribeConfigsResult`'s entries down to dynamic-topic
54/// overrides only. Pure function, unit-tested in isolation.
55///
56/// Per KIP-226 / Kafka's `DescribeConfigs` semantics, only entries
57/// whose `config_source == DYNAMIC_TOPIC_CONFIG (1)` represent values
58/// the user has explicitly set on the topic; everything else (broker
59/// defaults, static config, etc.) is filtered out so the operator
60/// diffs `spec.config` against overrides only.
61pub(crate) fn filter_dynamic_overrides(
62    topic: String,
63    entries: impl IntoIterator<Item = DescribeConfigsResourceResult>,
64) -> TopicConfigOverrides {
65    let mut overrides = BTreeMap::new();
66    for entry in entries {
67        if entry.config_source == DYNAMIC_TOPIC_CONFIG_SOURCE
68            && let Some(value) = entry.value
69        {
70            overrides.insert(entry.name, value);
71        }
72    }
73    TopicConfigOverrides { topic, overrides }
74}
75
76/// Pure helper: take one `DescribeConfigsResult` (one resource's slice
77/// of the response) and either return its dynamic-topic-config
78/// overrides or surface its broker error. Extracted so both the success
79/// and error branches can be unit-tested without standing up a broker.
80pub(crate) fn parse_describe_configs_resource(
81    r: crabka_protocol::owned::describe_configs_response::DescribeConfigsResult,
82) -> Result<TopicConfigOverrides, AdminError> {
83    if r.error_code != 0 {
84        return Err(AdminError::Broker {
85            api: "DescribeConfigs",
86            code: r.error_code,
87            name: kafka_error_name(r.error_code),
88            message: r.error_message,
89        });
90    }
91    Ok(filter_dynamic_overrides(r.resource_name, r.configs))
92}
93
94/// Pure helper: project an `IncrementalAlterConfigsResponse` into the
95/// per-topic outcome list the operator consumes.
96pub(crate) fn parse_incremental_alter_outcomes(
97    resp: <IncrementalAlterConfigsRequest as crabka_protocol::ProtocolRequest>::Response,
98) -> Vec<AlterConfigsOutcome> {
99    resp.responses
100        .into_iter()
101        .map(|r| AlterConfigsOutcome {
102            topic: r.resource_name,
103            error: if r.error_code == 0 {
104                None
105            } else {
106                Some(KafkaError {
107                    code: r.error_code,
108                    name: kafka_error_name(r.error_code),
109                    message: r.error_message,
110                })
111            },
112        })
113        .collect()
114}
115
116impl AdminClient {
117    pub async fn describe_configs(
118        &mut self,
119        topics: &[&str],
120    ) -> Result<Vec<TopicConfigOverrides>, AdminError> {
121        let req = DescribeConfigsRequest {
122            resources: topics
123                .iter()
124                .map(|t| DescribeConfigsResource {
125                    resource_type: RESOURCE_TYPE_TOPIC,
126                    resource_name: (*t).to_string(),
127                    configuration_keys: None,
128                    ..Default::default()
129                })
130                .collect(),
131            include_synonyms: false,
132            include_documentation: false,
133            ..Default::default()
134        };
135        let resp = self.conn.send(req).await?;
136        let mut out = Vec::with_capacity(resp.results.len());
137        for r in resp.results {
138            out.push(parse_describe_configs_resource(r)?);
139        }
140        Ok(out)
141    }
142
143    pub async fn incremental_alter_configs(
144        &mut self,
145        ops: &[IncrementalAlterOp],
146    ) -> Result<Vec<AlterConfigsOutcome>, AdminError> {
147        // Group ops by topic.
148        let mut by_topic: BTreeMap<String, Vec<AlterableConfig>> = BTreeMap::new();
149        for op in ops {
150            match op {
151                IncrementalAlterOp::Set { topic, key, value } => {
152                    by_topic
153                        .entry(topic.clone())
154                        .or_default()
155                        .push(AlterableConfig {
156                            name: key.clone(),
157                            config_operation: 0, // SET
158                            value: Some(value.clone()),
159                            ..Default::default()
160                        });
161                }
162                IncrementalAlterOp::Delete { topic, key } => {
163                    by_topic
164                        .entry(topic.clone())
165                        .or_default()
166                        .push(AlterableConfig {
167                            name: key.clone(),
168                            config_operation: 1, // DELETE
169                            value: None,
170                            ..Default::default()
171                        });
172                }
173            }
174        }
175        let req = IncrementalAlterConfigsRequest {
176            resources: by_topic
177                .into_iter()
178                .map(|(topic, configs)| AlterConfigsResource {
179                    resource_type: RESOURCE_TYPE_TOPIC,
180                    resource_name: topic,
181                    configs,
182                    ..Default::default()
183                })
184                .collect(),
185            validate_only: false,
186            ..Default::default()
187        };
188        let resp = self.conn.send(req).await?;
189        Ok(parse_incremental_alter_outcomes(resp))
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196    use assert2::assert;
197
198    #[test]
199    fn dynamic_topic_config_source_is_one() {
200        // Guard so a future protocol change can't silently flip the
201        // filter we use to distinguish overrides from broker defaults.
202        assert!(DYNAMIC_TOPIC_CONFIG_SOURCE == 1);
203    }
204
205    #[test]
206    fn resource_type_topic_is_two() {
207        assert!(RESOURCE_TYPE_TOPIC == 2);
208    }
209
210    /// Spec test name: `describe_configs_filters_to_dynamic_topic`.
211    ///
212    /// Mixed `config_source` values: only entries with
213    /// `DYNAMIC_TOPIC_CONFIG (1)` survive; `STATIC_BROKER_CONFIG (4)`
214    /// and other sources are dropped. Also verifies entries with
215    /// `value: None` are filtered out.
216    #[test]
217    fn describe_configs_filters_to_dynamic_topic() {
218        let entries = vec![
219            DescribeConfigsResourceResult {
220                name: "retention.ms".into(),
221                value: Some("60000".into()),
222                config_source: 1, // DYNAMIC_TOPIC_CONFIG
223                ..Default::default()
224            },
225            DescribeConfigsResourceResult {
226                name: "log.dirs".into(),
227                value: Some("/data".into()),
228                config_source: 4, // STATIC_BROKER_CONFIG
229                ..Default::default()
230            },
231            DescribeConfigsResourceResult {
232                name: "cleanup.policy".into(),
233                value: Some("compact".into()),
234                config_source: 1,
235                ..Default::default()
236            },
237            // Dynamic-topic source but no value: should be dropped.
238            DescribeConfigsResourceResult {
239                name: "segment.bytes".into(),
240                value: None,
241                config_source: 1,
242                ..Default::default()
243            },
244            // Another non-dynamic source (DEFAULT_CONFIG = 5).
245            DescribeConfigsResourceResult {
246                name: "max.message.bytes".into(),
247                value: Some("1048576".into()),
248                config_source: 5,
249                ..Default::default()
250            },
251        ];
252        let r = filter_dynamic_overrides("foo".into(), entries);
253        assert!(r.topic == "foo");
254        assert!(r.overrides.len() == 2);
255        assert!(r.overrides.get("retention.ms").map(String::as_str) == Some("60000"));
256        assert!(r.overrides.get("cleanup.policy").map(String::as_str) == Some("compact"));
257        assert!(!r.overrides.contains_key("log.dirs"));
258        assert!(!r.overrides.contains_key("segment.bytes"));
259        assert!(!r.overrides.contains_key("max.message.bytes"));
260    }
261
262    // ── parse_describe_configs_resource ────────────────────────────────
263    //
264    // The success / error variants of the per-resource parser used by
265    // `describe_configs`. The full `describe_configs` RPC short-circuits
266    // on the first error_code != 0 entry; these tests lock that decision
267    // point so a refactor can't silently drop the error mapping or the
268    // dynamic-config filtering.
269
270    #[test]
271    fn parse_describe_configs_resource_returns_overrides_on_success() {
272        use crabka_protocol::owned::describe_configs_response::DescribeConfigsResult;
273        let r = DescribeConfigsResult {
274            error_code: 0,
275            error_message: None,
276            resource_type: RESOURCE_TYPE_TOPIC,
277            resource_name: "foo".into(),
278            configs: vec![DescribeConfigsResourceResult {
279                name: "retention.ms".into(),
280                value: Some("60000".into()),
281                config_source: 1,
282                ..Default::default()
283            }],
284            ..Default::default()
285        };
286        let parsed = parse_describe_configs_resource(r).expect("Ok branch");
287        assert!(parsed.topic == "foo");
288        assert!(parsed.overrides.get("retention.ms").map(String::as_str) == Some("60000"));
289    }
290
291    #[test]
292    fn parse_describe_configs_resource_returns_broker_error_when_error_code_set() {
293        use crabka_protocol::owned::describe_configs_response::DescribeConfigsResult;
294        let r = DescribeConfigsResult {
295            error_code: 3, // UNKNOWN_TOPIC_OR_PARTITION
296            error_message: Some("nope".into()),
297            resource_type: RESOURCE_TYPE_TOPIC,
298            resource_name: "missing".into(),
299            configs: Vec::new(),
300            ..Default::default()
301        };
302        let err = parse_describe_configs_resource(r).expect_err("Err branch");
303        match err {
304            AdminError::Broker {
305                api,
306                code,
307                name,
308                message,
309            } => {
310                assert!(api == "DescribeConfigs");
311                assert!(code == 3);
312                assert!(name == "UNKNOWN_TOPIC_OR_PARTITION");
313                assert!(message.as_deref() == Some("nope"));
314            }
315            other => panic!("expected Broker, got {other:?}"),
316        }
317    }
318
319    // ── parse_incremental_alter_outcomes ───────────────────────────────
320
321    #[test]
322    fn parse_incremental_alter_outcomes_success() {
323        use crabka_protocol::owned::incremental_alter_configs_response::{
324            AlterConfigsResourceResponse, IncrementalAlterConfigsResponse,
325        };
326        let resp = IncrementalAlterConfigsResponse {
327            responses: vec![AlterConfigsResourceResponse {
328                error_code: 0,
329                error_message: None,
330                resource_type: RESOURCE_TYPE_TOPIC,
331                resource_name: "foo".into(),
332                ..Default::default()
333            }],
334            ..Default::default()
335        };
336        let outs = parse_incremental_alter_outcomes(resp);
337        assert!(outs.len() == 1);
338        assert!(outs[0].topic == "foo");
339        assert!(outs[0].error.is_none());
340    }
341
342    #[test]
343    fn parse_incremental_alter_outcomes_carries_errors() {
344        use crabka_protocol::owned::incremental_alter_configs_response::{
345            AlterConfigsResourceResponse, IncrementalAlterConfigsResponse,
346        };
347        let resp = IncrementalAlterConfigsResponse {
348            responses: vec![
349                AlterConfigsResourceResponse {
350                    error_code: 0,
351                    error_message: None,
352                    resource_type: RESOURCE_TYPE_TOPIC,
353                    resource_name: "ok".into(),
354                    ..Default::default()
355                },
356                AlterConfigsResourceResponse {
357                    error_code: 40, // INVALID_CONFIG
358                    error_message: Some("bad value".into()),
359                    resource_type: RESOURCE_TYPE_TOPIC,
360                    resource_name: "bad".into(),
361                    ..Default::default()
362                },
363            ],
364            ..Default::default()
365        };
366        let outs = parse_incremental_alter_outcomes(resp);
367        assert!(outs.len() == 2);
368        assert!(outs[0].error.is_none());
369        let err = outs[1].error.as_ref().expect("error expected");
370        assert!(err.code == 40);
371        assert!(err.name == "INVALID_CONFIG");
372        assert!(err.message.as_deref() == Some("bad value"));
373    }
374}