Skip to main content

crabka_client_admin/
quotas.rs

1//! Client-quota admin RPCs.
2//!
3//! Two admin operations the `KafkaUser` reconciler drives:
4//! `DescribeClientQuotas` (`api_key` 48) reads the current set of
5//! quota keys → values for a single (user) entity;
6//! `AlterClientQuotas` (`api_key` 49) upserts and/or removes those keys.
7//!
8//! Only the per-user shape is exposed (entity `[("user", Some(name))]`).
9//! Per-`client-id`, per-`ip`, and tuple entities (e.g. `(user, client-id)`)
10//! are reserved for later operator surfaces.
11
12use std::collections::BTreeMap;
13
14use crabka_protocol::owned::{
15    alter_client_quotas_request::{
16        AlterClientQuotasRequest, EntityData as AlterEntity, EntryData as AlterEntry,
17        OpData as AlterOp,
18    },
19    describe_client_quotas_request::{ComponentData, DescribeClientQuotasRequest},
20};
21
22use crate::{AdminClient, AdminError, KafkaError, kafka_error_name};
23
24/// Wire `match_type` constants from KIP-546 / `DescribeClientQuotasRequest.json`.
25const MATCH_TYPE_EXACT: i8 = 0;
26const _MATCH_TYPE_DEFAULT: i8 = 1;
27const _MATCH_TYPE_ANY: i8 = 2;
28
29/// One mutation against a (user) quota entity. The reconciler computes
30/// these by diffing the spec against the current broker state.
31#[derive(Debug, Clone, PartialEq)]
32pub enum QuotaOp {
33    /// Upsert `key` → `value`. `value` must be finite and non-negative;
34    /// for `request_percentage` the broker also requires `value <= 100`.
35    Set { key: String, value: f64 },
36    /// Tombstone `key` for this entity. Matches Kafka's `remove=true`
37    /// `OpData` flag.
38    Remove { key: String },
39}
40
41/// Snapshot of the broker's quota state for a single user. Empty map ==
42/// no per-user quotas configured.
43pub type UserQuotaConfig = BTreeMap<String, f64>;
44
45impl AdminClient {
46    /// Read the broker's current client-quota config for the named user.
47    /// Filters strictly on the single-component entity
48    /// `[("user", Some(username))]`; broker entries whose entity also
49    /// carries a `client-id` axis do not match (matches Kafka admin-tool
50    /// strict-component semantics).
51    pub async fn describe_user_quotas(
52        &mut self,
53        username: &str,
54    ) -> Result<UserQuotaConfig, AdminError> {
55        let req = DescribeClientQuotasRequest {
56            components: vec![ComponentData {
57                entity_type: "user".into(),
58                match_type: MATCH_TYPE_EXACT,
59                match_: Some(username.into()),
60                ..Default::default()
61            }],
62            strict: true,
63            ..Default::default()
64        };
65        let resp = self.conn.send(req).await?;
66        if resp.error_code != 0 {
67            return Err(AdminError::Broker {
68                api: "DescribeClientQuotas",
69                code: resp.error_code,
70                name: kafka_error_name(resp.error_code),
71                message: resp.error_message,
72            });
73        }
74        let mut out = UserQuotaConfig::new();
75        // `strict: true` plus a one-component filter means the broker
76        // returns at most one entry — but be tolerant of broker bugs.
77        for entry in resp.entries.unwrap_or_default() {
78            for v in entry.values {
79                out.insert(v.key, v.value);
80            }
81        }
82        Ok(out)
83    }
84
85    /// Apply `ops` against the (user) entity. Returns the per-entry
86    /// `KafkaError` surfaced by the broker, or `None` on success.
87    ///
88    /// `validate_only` mirrors the wire flag — when `true` the broker
89    /// runs validation but writes no metadata record.
90    pub async fn alter_user_quotas(
91        &mut self,
92        username: &str,
93        ops: &[QuotaOp],
94        validate_only: bool,
95    ) -> Result<Option<KafkaError>, AdminError> {
96        if ops.is_empty() {
97            return Ok(None);
98        }
99        let req = AlterClientQuotasRequest {
100            entries: vec![AlterEntry {
101                entity: vec![AlterEntity {
102                    entity_type: "user".into(),
103                    entity_name: Some(username.into()),
104                    ..Default::default()
105                }],
106                ops: ops.iter().map(op_to_wire).collect(),
107                ..Default::default()
108            }],
109            validate_only,
110            ..Default::default()
111        };
112        let resp = self.conn.send(req).await?;
113        // We pass one entry → expect one result. Defensive on length.
114        let entry = resp.entries.into_iter().next();
115        let Some(entry) = entry else {
116            return Ok(None);
117        };
118        if entry.error_code == 0 {
119            return Ok(None);
120        }
121        Ok(Some(KafkaError {
122            code: entry.error_code,
123            name: kafka_error_name(entry.error_code),
124            message: entry.error_message,
125        }))
126    }
127}
128
129fn op_to_wire(op: &QuotaOp) -> AlterOp {
130    match op {
131        QuotaOp::Set { key, value } => AlterOp {
132            key: key.clone(),
133            value: *value,
134            remove: false,
135            ..Default::default()
136        },
137        QuotaOp::Remove { key } => AlterOp {
138            key: key.clone(),
139            // Wire requires a value field even on remove; broker ignores it.
140            value: 0.0,
141            remove: true,
142            ..Default::default()
143        },
144    }
145}
146
147/// Pure: diff the desired key-set against the current key-set, producing
148/// the minimal `(set, remove)` op stream. Floats compare bit-equal so a
149/// no-op `Set` with the same value is not re-issued.
150#[must_use]
151pub fn diff_user_quotas(current: &UserQuotaConfig, desired: &UserQuotaConfig) -> Vec<QuotaOp> {
152    let mut ops = Vec::new();
153    for (k, v) in desired {
154        match current.get(k) {
155            Some(cur) if cur.to_bits() == v.to_bits() => {}
156            _ => ops.push(QuotaOp::Set {
157                key: k.clone(),
158                value: *v,
159            }),
160        }
161    }
162    for k in current.keys() {
163        if !desired.contains_key(k) {
164            ops.push(QuotaOp::Remove { key: k.clone() });
165        }
166    }
167    ops
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use assert2::assert;
174
175    #[test]
176    fn diff_no_change_returns_empty() {
177        let mut c = UserQuotaConfig::new();
178        c.insert("producer_byte_rate".into(), 1_048_576.0);
179        let d = c.clone();
180        assert!(diff_user_quotas(&c, &d).is_empty());
181    }
182
183    #[test]
184    fn diff_set_added_keys() {
185        let c = UserQuotaConfig::new();
186        let mut d = UserQuotaConfig::new();
187        d.insert("producer_byte_rate".into(), 1_048_576.0);
188        d.insert("request_percentage".into(), 25.0);
189        let ops = diff_user_quotas(&c, &d);
190        assert!(ops.len() == 2);
191        assert!(ops.iter().any(|op| matches!(op, QuotaOp::Set { key, value }
192            if key == "producer_byte_rate" && (*value - 1_048_576.0).abs() < f64::EPSILON)));
193        assert!(ops.iter().any(|op| matches!(op, QuotaOp::Set { key, value }
194            if key == "request_percentage" && (*value - 25.0).abs() < f64::EPSILON)));
195    }
196
197    #[test]
198    fn diff_remove_dropped_keys() {
199        let mut c = UserQuotaConfig::new();
200        c.insert("producer_byte_rate".into(), 1.0);
201        c.insert("consumer_byte_rate".into(), 2.0);
202        let mut d = UserQuotaConfig::new();
203        d.insert("producer_byte_rate".into(), 1.0);
204        let ops = diff_user_quotas(&c, &d);
205        assert!(
206            ops == vec![QuotaOp::Remove {
207                key: "consumer_byte_rate".into()
208            }]
209        );
210    }
211
212    #[test]
213    fn diff_value_change_is_a_set() {
214        let mut c = UserQuotaConfig::new();
215        c.insert("producer_byte_rate".into(), 1.0);
216        let mut d = UserQuotaConfig::new();
217        d.insert("producer_byte_rate".into(), 2.0);
218        let ops = diff_user_quotas(&c, &d);
219        assert!(
220            ops == vec![QuotaOp::Set {
221                key: "producer_byte_rate".into(),
222                value: 2.0,
223            }]
224        );
225    }
226
227    #[test]
228    fn diff_mixed_add_change_remove() {
229        let mut c = UserQuotaConfig::new();
230        c.insert("producer_byte_rate".into(), 1.0);
231        c.insert("consumer_byte_rate".into(), 2.0);
232        let mut d = UserQuotaConfig::new();
233        d.insert("producer_byte_rate".into(), 5.0); // change
234        d.insert("request_percentage".into(), 25.0); // add
235        // consumer_byte_rate dropped
236        let ops = diff_user_quotas(&c, &d);
237        assert!(ops.len() == 3);
238        assert!(ops.contains(&QuotaOp::Set {
239            key: "producer_byte_rate".into(),
240            value: 5.0,
241        }));
242        assert!(ops.contains(&QuotaOp::Set {
243            key: "request_percentage".into(),
244            value: 25.0,
245        }));
246        assert!(ops.contains(&QuotaOp::Remove {
247            key: "consumer_byte_rate".into(),
248        }));
249    }
250
251    #[test]
252    fn op_to_wire_set() {
253        let op = QuotaOp::Set {
254            key: "producer_byte_rate".into(),
255            value: 1.0,
256        };
257        let w = op_to_wire(&op);
258        assert!(w.key == "producer_byte_rate");
259        assert!((w.value - 1.0).abs() < f64::EPSILON);
260        assert!(!w.remove);
261    }
262
263    #[test]
264    fn op_to_wire_remove_sends_zero_value_and_flag() {
265        let op = QuotaOp::Remove {
266            key: "producer_byte_rate".into(),
267        };
268        let w = op_to_wire(&op);
269        assert!(w.key == "producer_byte_rate");
270        assert!(w.value.to_bits() == 0.0_f64.to_bits());
271        assert!(w.remove);
272    }
273}