use std::collections::BTreeMap;
use crabka_protocol::owned::{
alter_client_quotas_request::{
AlterClientQuotasRequest, EntityData as AlterEntity, EntryData as AlterEntry,
OpData as AlterOp,
},
describe_client_quotas_request::{ComponentData, DescribeClientQuotasRequest},
};
use crate::{AdminClient, AdminError, KafkaError, kafka_error_name};
const MATCH_TYPE_EXACT: i8 = 0;
const _MATCH_TYPE_DEFAULT: i8 = 1;
const _MATCH_TYPE_ANY: i8 = 2;
#[derive(Debug, Clone, PartialEq)]
pub enum QuotaOp {
Set { key: String, value: f64 },
Remove { key: String },
}
pub type UserQuotaConfig = BTreeMap<String, f64>;
impl AdminClient {
pub async fn describe_user_quotas(
&mut self,
username: &str,
) -> Result<UserQuotaConfig, AdminError> {
let req = DescribeClientQuotasRequest {
components: vec![ComponentData {
entity_type: "user".into(),
match_type: MATCH_TYPE_EXACT,
match_: Some(username.into()),
..Default::default()
}],
strict: true,
..Default::default()
};
let resp = self.conn.send(req).await?;
if resp.error_code != 0 {
return Err(AdminError::Broker {
api: "DescribeClientQuotas",
code: resp.error_code,
name: kafka_error_name(resp.error_code),
message: resp.error_message,
});
}
let mut out = UserQuotaConfig::new();
for entry in resp.entries.unwrap_or_default() {
for v in entry.values {
out.insert(v.key, v.value);
}
}
Ok(out)
}
pub async fn alter_user_quotas(
&mut self,
username: &str,
ops: &[QuotaOp],
validate_only: bool,
) -> Result<Option<KafkaError>, AdminError> {
if ops.is_empty() {
return Ok(None);
}
let req = AlterClientQuotasRequest {
entries: vec![AlterEntry {
entity: vec![AlterEntity {
entity_type: "user".into(),
entity_name: Some(username.into()),
..Default::default()
}],
ops: ops.iter().map(op_to_wire).collect(),
..Default::default()
}],
validate_only,
..Default::default()
};
let resp = self.conn.send(req).await?;
let entry = resp.entries.into_iter().next();
let Some(entry) = entry else {
return Ok(None);
};
if entry.error_code == 0 {
return Ok(None);
}
Ok(Some(KafkaError {
code: entry.error_code,
name: kafka_error_name(entry.error_code),
message: entry.error_message,
}))
}
}
fn op_to_wire(op: &QuotaOp) -> AlterOp {
match op {
QuotaOp::Set { key, value } => AlterOp {
key: key.clone(),
value: *value,
remove: false,
..Default::default()
},
QuotaOp::Remove { key } => AlterOp {
key: key.clone(),
value: 0.0,
remove: true,
..Default::default()
},
}
}
#[must_use]
pub fn diff_user_quotas(current: &UserQuotaConfig, desired: &UserQuotaConfig) -> Vec<QuotaOp> {
let mut ops = Vec::new();
for (k, v) in desired {
match current.get(k) {
Some(cur) if cur.to_bits() == v.to_bits() => {}
_ => ops.push(QuotaOp::Set {
key: k.clone(),
value: *v,
}),
}
}
for k in current.keys() {
if !desired.contains_key(k) {
ops.push(QuotaOp::Remove { key: k.clone() });
}
}
ops
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn diff_no_change_returns_empty() {
let mut c = UserQuotaConfig::new();
c.insert("producer_byte_rate".into(), 1_048_576.0);
let d = c.clone();
assert!(diff_user_quotas(&c, &d).is_empty());
}
#[test]
fn diff_set_added_keys() {
let c = UserQuotaConfig::new();
let mut d = UserQuotaConfig::new();
d.insert("producer_byte_rate".into(), 1_048_576.0);
d.insert("request_percentage".into(), 25.0);
let ops = diff_user_quotas(&c, &d);
assert!(ops.len() == 2);
assert!(ops.iter().any(|op| matches!(op, QuotaOp::Set { key, value }
if key == "producer_byte_rate" && (*value - 1_048_576.0).abs() < f64::EPSILON)));
assert!(ops.iter().any(|op| matches!(op, QuotaOp::Set { key, value }
if key == "request_percentage" && (*value - 25.0).abs() < f64::EPSILON)));
}
#[test]
fn diff_remove_dropped_keys() {
let mut c = UserQuotaConfig::new();
c.insert("producer_byte_rate".into(), 1.0);
c.insert("consumer_byte_rate".into(), 2.0);
let mut d = UserQuotaConfig::new();
d.insert("producer_byte_rate".into(), 1.0);
let ops = diff_user_quotas(&c, &d);
assert!(
ops == vec![QuotaOp::Remove {
key: "consumer_byte_rate".into()
}]
);
}
#[test]
fn diff_value_change_is_a_set() {
let mut c = UserQuotaConfig::new();
c.insert("producer_byte_rate".into(), 1.0);
let mut d = UserQuotaConfig::new();
d.insert("producer_byte_rate".into(), 2.0);
let ops = diff_user_quotas(&c, &d);
assert!(
ops == vec![QuotaOp::Set {
key: "producer_byte_rate".into(),
value: 2.0,
}]
);
}
#[test]
fn diff_mixed_add_change_remove() {
let mut c = UserQuotaConfig::new();
c.insert("producer_byte_rate".into(), 1.0);
c.insert("consumer_byte_rate".into(), 2.0);
let mut d = UserQuotaConfig::new();
d.insert("producer_byte_rate".into(), 5.0); d.insert("request_percentage".into(), 25.0); let ops = diff_user_quotas(&c, &d);
assert!(ops.len() == 3);
assert!(ops.contains(&QuotaOp::Set {
key: "producer_byte_rate".into(),
value: 5.0,
}));
assert!(ops.contains(&QuotaOp::Set {
key: "request_percentage".into(),
value: 25.0,
}));
assert!(ops.contains(&QuotaOp::Remove {
key: "consumer_byte_rate".into(),
}));
}
#[test]
fn op_to_wire_set() {
let op = QuotaOp::Set {
key: "producer_byte_rate".into(),
value: 1.0,
};
let w = op_to_wire(&op);
assert!(w.key == "producer_byte_rate");
assert!((w.value - 1.0).abs() < f64::EPSILON);
assert!(!w.remove);
}
#[test]
fn op_to_wire_remove_sends_zero_value_and_flag() {
let op = QuotaOp::Remove {
key: "producer_byte_rate".into(),
};
let w = op_to_wire(&op);
assert!(w.key == "producer_byte_rate");
assert!(w.value.to_bits() == 0.0_f64.to_bits());
assert!(w.remove);
}
}