use std::collections::BTreeMap;
use crabka_protocol::owned::{
describe_configs_request::{DescribeConfigsRequest, DescribeConfigsResource},
describe_configs_response::DescribeConfigsResourceResult,
incremental_alter_configs_request::{
AlterConfigsResource, AlterableConfig, IncrementalAlterConfigsRequest,
},
};
use crate::{AdminClient, AdminError, KafkaError, kafka_error_name};
const DYNAMIC_TOPIC_CONFIG_SOURCE: i8 = 1;
const RESOURCE_TYPE_TOPIC: i8 = 2;
#[derive(Debug, Clone, Default)]
pub struct TopicConfigOverrides {
pub topic: String,
pub overrides: BTreeMap<String, String>,
}
#[derive(Debug, Clone)]
pub enum IncrementalAlterOp {
Set {
topic: String,
key: String,
value: String,
},
Delete {
topic: String,
key: String,
},
}
#[derive(Debug, Clone)]
pub struct AlterConfigsOutcome {
pub topic: String,
pub error: Option<KafkaError>,
}
pub(crate) fn filter_dynamic_overrides(
topic: String,
entries: impl IntoIterator<Item = DescribeConfigsResourceResult>,
) -> TopicConfigOverrides {
let mut overrides = BTreeMap::new();
for entry in entries {
if entry.config_source == DYNAMIC_TOPIC_CONFIG_SOURCE
&& let Some(value) = entry.value
{
overrides.insert(entry.name, value);
}
}
TopicConfigOverrides { topic, overrides }
}
pub(crate) fn parse_describe_configs_resource(
r: crabka_protocol::owned::describe_configs_response::DescribeConfigsResult,
) -> Result<TopicConfigOverrides, AdminError> {
if r.error_code != 0 {
return Err(AdminError::Broker {
api: "DescribeConfigs",
code: r.error_code,
name: kafka_error_name(r.error_code),
message: r.error_message,
});
}
Ok(filter_dynamic_overrides(r.resource_name, r.configs))
}
pub(crate) fn parse_incremental_alter_outcomes(
resp: <IncrementalAlterConfigsRequest as crabka_protocol::ProtocolRequest>::Response,
) -> Vec<AlterConfigsOutcome> {
resp.responses
.into_iter()
.map(|r| AlterConfigsOutcome {
topic: r.resource_name,
error: if r.error_code == 0 {
None
} else {
Some(KafkaError {
code: r.error_code,
name: kafka_error_name(r.error_code),
message: r.error_message,
})
},
})
.collect()
}
impl AdminClient {
pub async fn describe_configs(
&mut self,
topics: &[&str],
) -> Result<Vec<TopicConfigOverrides>, AdminError> {
let req = DescribeConfigsRequest {
resources: topics
.iter()
.map(|t| DescribeConfigsResource {
resource_type: RESOURCE_TYPE_TOPIC,
resource_name: (*t).to_string(),
configuration_keys: None,
..Default::default()
})
.collect(),
include_synonyms: false,
include_documentation: false,
..Default::default()
};
let resp = self.conn.send(req).await?;
let mut out = Vec::with_capacity(resp.results.len());
for r in resp.results {
out.push(parse_describe_configs_resource(r)?);
}
Ok(out)
}
pub async fn incremental_alter_configs(
&mut self,
ops: &[IncrementalAlterOp],
) -> Result<Vec<AlterConfigsOutcome>, AdminError> {
let mut by_topic: BTreeMap<String, Vec<AlterableConfig>> = BTreeMap::new();
for op in ops {
match op {
IncrementalAlterOp::Set { topic, key, value } => {
by_topic
.entry(topic.clone())
.or_default()
.push(AlterableConfig {
name: key.clone(),
config_operation: 0, value: Some(value.clone()),
..Default::default()
});
}
IncrementalAlterOp::Delete { topic, key } => {
by_topic
.entry(topic.clone())
.or_default()
.push(AlterableConfig {
name: key.clone(),
config_operation: 1, value: None,
..Default::default()
});
}
}
}
let req = IncrementalAlterConfigsRequest {
resources: by_topic
.into_iter()
.map(|(topic, configs)| AlterConfigsResource {
resource_type: RESOURCE_TYPE_TOPIC,
resource_name: topic,
configs,
..Default::default()
})
.collect(),
validate_only: false,
..Default::default()
};
let resp = self.conn.send(req).await?;
Ok(parse_incremental_alter_outcomes(resp))
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn dynamic_topic_config_source_is_one() {
assert!(DYNAMIC_TOPIC_CONFIG_SOURCE == 1);
}
#[test]
fn resource_type_topic_is_two() {
assert!(RESOURCE_TYPE_TOPIC == 2);
}
#[test]
fn describe_configs_filters_to_dynamic_topic() {
let entries = vec![
DescribeConfigsResourceResult {
name: "retention.ms".into(),
value: Some("60000".into()),
config_source: 1, ..Default::default()
},
DescribeConfigsResourceResult {
name: "log.dirs".into(),
value: Some("/data".into()),
config_source: 4, ..Default::default()
},
DescribeConfigsResourceResult {
name: "cleanup.policy".into(),
value: Some("compact".into()),
config_source: 1,
..Default::default()
},
DescribeConfigsResourceResult {
name: "segment.bytes".into(),
value: None,
config_source: 1,
..Default::default()
},
DescribeConfigsResourceResult {
name: "max.message.bytes".into(),
value: Some("1048576".into()),
config_source: 5,
..Default::default()
},
];
let r = filter_dynamic_overrides("foo".into(), entries);
assert!(r.topic == "foo");
assert!(r.overrides.len() == 2);
assert!(r.overrides.get("retention.ms").map(String::as_str) == Some("60000"));
assert!(r.overrides.get("cleanup.policy").map(String::as_str) == Some("compact"));
assert!(!r.overrides.contains_key("log.dirs"));
assert!(!r.overrides.contains_key("segment.bytes"));
assert!(!r.overrides.contains_key("max.message.bytes"));
}
#[test]
fn parse_describe_configs_resource_returns_overrides_on_success() {
use crabka_protocol::owned::describe_configs_response::DescribeConfigsResult;
let r = DescribeConfigsResult {
error_code: 0,
error_message: None,
resource_type: RESOURCE_TYPE_TOPIC,
resource_name: "foo".into(),
configs: vec![DescribeConfigsResourceResult {
name: "retention.ms".into(),
value: Some("60000".into()),
config_source: 1,
..Default::default()
}],
..Default::default()
};
let parsed = parse_describe_configs_resource(r).expect("Ok branch");
assert!(parsed.topic == "foo");
assert!(parsed.overrides.get("retention.ms").map(String::as_str) == Some("60000"));
}
#[test]
fn parse_describe_configs_resource_returns_broker_error_when_error_code_set() {
use crabka_protocol::owned::describe_configs_response::DescribeConfigsResult;
let r = DescribeConfigsResult {
error_code: 3, error_message: Some("nope".into()),
resource_type: RESOURCE_TYPE_TOPIC,
resource_name: "missing".into(),
configs: Vec::new(),
..Default::default()
};
let err = parse_describe_configs_resource(r).expect_err("Err branch");
match err {
AdminError::Broker {
api,
code,
name,
message,
} => {
assert!(api == "DescribeConfigs");
assert!(code == 3);
assert!(name == "UNKNOWN_TOPIC_OR_PARTITION");
assert!(message.as_deref() == Some("nope"));
}
other => panic!("expected Broker, got {other:?}"),
}
}
#[test]
fn parse_incremental_alter_outcomes_success() {
use crabka_protocol::owned::incremental_alter_configs_response::{
AlterConfigsResourceResponse, IncrementalAlterConfigsResponse,
};
let resp = IncrementalAlterConfigsResponse {
responses: vec![AlterConfigsResourceResponse {
error_code: 0,
error_message: None,
resource_type: RESOURCE_TYPE_TOPIC,
resource_name: "foo".into(),
..Default::default()
}],
..Default::default()
};
let outs = parse_incremental_alter_outcomes(resp);
assert!(outs.len() == 1);
assert!(outs[0].topic == "foo");
assert!(outs[0].error.is_none());
}
#[test]
fn parse_incremental_alter_outcomes_carries_errors() {
use crabka_protocol::owned::incremental_alter_configs_response::{
AlterConfigsResourceResponse, IncrementalAlterConfigsResponse,
};
let resp = IncrementalAlterConfigsResponse {
responses: vec![
AlterConfigsResourceResponse {
error_code: 0,
error_message: None,
resource_type: RESOURCE_TYPE_TOPIC,
resource_name: "ok".into(),
..Default::default()
},
AlterConfigsResourceResponse {
error_code: 40, error_message: Some("bad value".into()),
resource_type: RESOURCE_TYPE_TOPIC,
resource_name: "bad".into(),
..Default::default()
},
],
..Default::default()
};
let outs = parse_incremental_alter_outcomes(resp);
assert!(outs.len() == 2);
assert!(outs[0].error.is_none());
let err = outs[1].error.as_ref().expect("error expected");
assert!(err.code == 40);
assert!(err.name == "INVALID_CONFIG");
assert!(err.message.as_deref() == Some("bad value"));
}
}