1use std::collections::BTreeMap;
13
14use crabka_protocol::owned::{
15 alter_client_quotas_request::{
16 AlterClientQuotasRequest, EntityData as AlterEntity, EntryData as AlterEntry,
17 OpData as AlterOp,
18 },
19 describe_client_quotas_request::{ComponentData, DescribeClientQuotasRequest},
20};
21
22use crate::{AdminClient, AdminError, KafkaError, kafka_error_name};
23
24const MATCH_TYPE_EXACT: i8 = 0;
26const _MATCH_TYPE_DEFAULT: i8 = 1;
27const _MATCH_TYPE_ANY: i8 = 2;
28
29#[derive(Debug, Clone, PartialEq)]
32pub enum QuotaOp {
33 Set { key: String, value: f64 },
36 Remove { key: String },
39}
40
41pub type UserQuotaConfig = BTreeMap<String, f64>;
44
45impl AdminClient {
46 pub async fn describe_user_quotas(
52 &mut self,
53 username: &str,
54 ) -> Result<UserQuotaConfig, AdminError> {
55 let req = DescribeClientQuotasRequest {
56 components: vec![ComponentData {
57 entity_type: "user".into(),
58 match_type: MATCH_TYPE_EXACT,
59 match_: Some(username.into()),
60 ..Default::default()
61 }],
62 strict: true,
63 ..Default::default()
64 };
65 let resp = self.conn.send(req).await?;
66 if resp.error_code != 0 {
67 return Err(AdminError::Broker {
68 api: "DescribeClientQuotas",
69 code: resp.error_code,
70 name: kafka_error_name(resp.error_code),
71 message: resp.error_message,
72 });
73 }
74 let mut out = UserQuotaConfig::new();
75 for entry in resp.entries.unwrap_or_default() {
78 for v in entry.values {
79 out.insert(v.key, v.value);
80 }
81 }
82 Ok(out)
83 }
84
85 pub async fn alter_user_quotas(
91 &mut self,
92 username: &str,
93 ops: &[QuotaOp],
94 validate_only: bool,
95 ) -> Result<Option<KafkaError>, AdminError> {
96 if ops.is_empty() {
97 return Ok(None);
98 }
99 let req = AlterClientQuotasRequest {
100 entries: vec![AlterEntry {
101 entity: vec![AlterEntity {
102 entity_type: "user".into(),
103 entity_name: Some(username.into()),
104 ..Default::default()
105 }],
106 ops: ops.iter().map(op_to_wire).collect(),
107 ..Default::default()
108 }],
109 validate_only,
110 ..Default::default()
111 };
112 let resp = self.conn.send(req).await?;
113 let entry = resp.entries.into_iter().next();
115 let Some(entry) = entry else {
116 return Ok(None);
117 };
118 if entry.error_code == 0 {
119 return Ok(None);
120 }
121 Ok(Some(KafkaError {
122 code: entry.error_code,
123 name: kafka_error_name(entry.error_code),
124 message: entry.error_message,
125 }))
126 }
127}
128
129fn op_to_wire(op: &QuotaOp) -> AlterOp {
130 match op {
131 QuotaOp::Set { key, value } => AlterOp {
132 key: key.clone(),
133 value: *value,
134 remove: false,
135 ..Default::default()
136 },
137 QuotaOp::Remove { key } => AlterOp {
138 key: key.clone(),
139 value: 0.0,
141 remove: true,
142 ..Default::default()
143 },
144 }
145}
146
147#[must_use]
151pub fn diff_user_quotas(current: &UserQuotaConfig, desired: &UserQuotaConfig) -> Vec<QuotaOp> {
152 let mut ops = Vec::new();
153 for (k, v) in desired {
154 match current.get(k) {
155 Some(cur) if cur.to_bits() == v.to_bits() => {}
156 _ => ops.push(QuotaOp::Set {
157 key: k.clone(),
158 value: *v,
159 }),
160 }
161 }
162 for k in current.keys() {
163 if !desired.contains_key(k) {
164 ops.push(QuotaOp::Remove { key: k.clone() });
165 }
166 }
167 ops
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use assert2::assert;
174
175 #[test]
176 fn diff_no_change_returns_empty() {
177 let mut c = UserQuotaConfig::new();
178 c.insert("producer_byte_rate".into(), 1_048_576.0);
179 let d = c.clone();
180 assert!(diff_user_quotas(&c, &d).is_empty());
181 }
182
183 #[test]
184 fn diff_set_added_keys() {
185 let c = UserQuotaConfig::new();
186 let mut d = UserQuotaConfig::new();
187 d.insert("producer_byte_rate".into(), 1_048_576.0);
188 d.insert("request_percentage".into(), 25.0);
189 let ops = diff_user_quotas(&c, &d);
190 assert!(ops.len() == 2);
191 assert!(ops.iter().any(|op| matches!(op, QuotaOp::Set { key, value }
192 if key == "producer_byte_rate" && (*value - 1_048_576.0).abs() < f64::EPSILON)));
193 assert!(ops.iter().any(|op| matches!(op, QuotaOp::Set { key, value }
194 if key == "request_percentage" && (*value - 25.0).abs() < f64::EPSILON)));
195 }
196
197 #[test]
198 fn diff_remove_dropped_keys() {
199 let mut c = UserQuotaConfig::new();
200 c.insert("producer_byte_rate".into(), 1.0);
201 c.insert("consumer_byte_rate".into(), 2.0);
202 let mut d = UserQuotaConfig::new();
203 d.insert("producer_byte_rate".into(), 1.0);
204 let ops = diff_user_quotas(&c, &d);
205 assert!(
206 ops == vec![QuotaOp::Remove {
207 key: "consumer_byte_rate".into()
208 }]
209 );
210 }
211
212 #[test]
213 fn diff_value_change_is_a_set() {
214 let mut c = UserQuotaConfig::new();
215 c.insert("producer_byte_rate".into(), 1.0);
216 let mut d = UserQuotaConfig::new();
217 d.insert("producer_byte_rate".into(), 2.0);
218 let ops = diff_user_quotas(&c, &d);
219 assert!(
220 ops == vec![QuotaOp::Set {
221 key: "producer_byte_rate".into(),
222 value: 2.0,
223 }]
224 );
225 }
226
227 #[test]
228 fn diff_mixed_add_change_remove() {
229 let mut c = UserQuotaConfig::new();
230 c.insert("producer_byte_rate".into(), 1.0);
231 c.insert("consumer_byte_rate".into(), 2.0);
232 let mut d = UserQuotaConfig::new();
233 d.insert("producer_byte_rate".into(), 5.0); d.insert("request_percentage".into(), 25.0); let ops = diff_user_quotas(&c, &d);
237 assert!(ops.len() == 3);
238 assert!(ops.contains(&QuotaOp::Set {
239 key: "producer_byte_rate".into(),
240 value: 5.0,
241 }));
242 assert!(ops.contains(&QuotaOp::Set {
243 key: "request_percentage".into(),
244 value: 25.0,
245 }));
246 assert!(ops.contains(&QuotaOp::Remove {
247 key: "consumer_byte_rate".into(),
248 }));
249 }
250
251 #[test]
252 fn op_to_wire_set() {
253 let op = QuotaOp::Set {
254 key: "producer_byte_rate".into(),
255 value: 1.0,
256 };
257 let w = op_to_wire(&op);
258 assert!(w.key == "producer_byte_rate");
259 assert!((w.value - 1.0).abs() < f64::EPSILON);
260 assert!(!w.remove);
261 }
262
263 #[test]
264 fn op_to_wire_remove_sends_zero_value_and_flag() {
265 let op = QuotaOp::Remove {
266 key: "producer_byte_rate".into(),
267 };
268 let w = op_to_wire(&op);
269 assert!(w.key == "producer_byte_rate");
270 assert!(w.value.to_bits() == 0.0_f64.to_bits());
271 assert!(w.remove);
272 }
273}