use bytes::{Bytes, BytesMut};
use crabka_metadata::AclOperation;
use crabka_protocol::owned::list_config_resources_request::ListConfigResourcesRequest;
use crabka_protocol::owned::list_config_resources_response::{
ConfigResource, ListConfigResourcesResponse,
};
use crabka_protocol::{Decode, Encode};
use crate::authorizer::{AuthorizationRequest, AuthorizationResult};
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;
const RESOURCE_TYPE_TOPIC: i8 = 2;
const RESOURCE_TYPE_BROKER: i8 = 4;
const RESOURCE_TYPE_CLIENT_METRICS: i8 = 16;
const DEFAULT_RESOURCE_TYPES: [i8; 3] = [
RESOURCE_TYPE_TOPIC,
RESOURCE_TYPE_BROKER,
RESOURCE_TYPE_CLIENT_METRICS,
];
#[allow(clippy::unused_async)]
pub(crate) async fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
ctx: &crate::handlers::RequestContext<'_>,
) -> Result<Bytes, BrokerError> {
let image = broker.controller.current_image();
let mut cur: &[u8] = req_bytes;
let req = ListConfigResourcesRequest::decode(&mut cur, version)?;
let allow = broker.config.authorizer.authorize(
&*image,
&AuthorizationRequest {
principal: ctx.principal,
host: ctx.peer,
resource_type: crabka_metadata::ResourceType::Cluster,
resource_name: "kafka-cluster",
operation: AclOperation::Describe,
},
);
if allow == AuthorizationResult::Deny {
let resp = ListConfigResourcesResponse {
error_code: codes::CLUSTER_AUTHORIZATION_FAILED,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
return Ok(buf.freeze());
}
let resources = collect_resources(&image, version, &req.resource_types);
let resp = ListConfigResourcesResponse {
throttle_time_ms: 0,
error_code: codes::NONE,
config_resources: resources,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}
fn collect_resources(
image: &crabka_metadata::MetadataImage,
version: i16,
requested: &[i8],
) -> Vec<ConfigResource> {
let types: &[i8] = if version < 1 {
&[RESOURCE_TYPE_CLIENT_METRICS]
} else if requested.is_empty() {
&DEFAULT_RESOURCE_TYPES
} else {
requested
};
let mut out: Vec<ConfigResource> = Vec::new();
for &rt in types {
match rt {
RESOURCE_TYPE_TOPIC => {
for t in image.topics() {
out.push(ConfigResource {
resource_name: t.name.clone(),
resource_type: RESOURCE_TYPE_TOPIC,
..Default::default()
});
}
}
RESOURCE_TYPE_BROKER => {
for b in image.brokers() {
out.push(ConfigResource {
resource_name: b.node_id.to_string(),
resource_type: RESOURCE_TYPE_BROKER,
..Default::default()
});
}
}
RESOURCE_TYPE_CLIENT_METRICS => {
for (name, _cfgs) in image.client_metrics_subscriptions() {
out.push(ConfigResource {
resource_name: name.clone(),
resource_type: RESOURCE_TYPE_CLIENT_METRICS,
..Default::default()
});
}
}
_ => {}
}
}
out.sort_by(|a, b| {
a.resource_type
.cmp(&b.resource_type)
.then_with(|| a.resource_name.cmp(&b.resource_name))
});
out
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use crabka_metadata::{BrokerRegistrationRecord, MetadataImage, MetadataRecord, TopicRecord};
use uuid::Uuid;
fn image_with_topics_and_brokers(topics: &[&str], broker_ids: &[u64]) -> MetadataImage {
let mut img = MetadataImage::new(Uuid::nil());
for t in topics {
img.apply(&MetadataRecord::V1Topic(TopicRecord {
name: (*t).to_string(),
topic_id: Uuid::nil(),
partitions: 1,
replication_factor: 1,
}));
}
for &id in broker_ids {
img.apply(&MetadataRecord::V1BrokerRegistration(
BrokerRegistrationRecord {
node_id: id,
broker_epoch: 0,
incarnation_id: uuid::Uuid::nil(),
host: "127.0.0.1".into(),
port: 9092,
rack: None,
endpoints: vec![],
},
));
}
img
}
fn image_with_subs(names: &[&str]) -> MetadataImage {
use crabka_metadata::ClientMetricsConfigRecord;
let mut img = MetadataImage::new(Uuid::nil());
for n in names {
let mut cfgs = std::collections::BTreeMap::new();
cfgs.insert("interval.ms".to_string(), "60000".to_string());
img.apply(&MetadataRecord::V1ClientMetricsConfig(
ClientMetricsConfigRecord {
name: (*n).into(),
configs: cfgs,
},
));
}
img
}
#[test]
fn v0_returns_client_metrics_subscriptions() {
let img = image_with_subs(&["sub-b", "sub-a"]);
let out = collect_resources(&img, 0, &[]);
assert_eq!(out.len(), 2);
assert!(
out.iter()
.all(|r| r.resource_type == RESOURCE_TYPE_CLIENT_METRICS)
);
assert_eq!(out[0].resource_name, "sub-a"); assert_eq!(out[1].resource_name, "sub-b");
}
#[test]
fn v1_client_metrics_filter_returns_subscriptions() {
let img = image_with_subs(&["sub-a"]);
let out = collect_resources(&img, 1, &[RESOURCE_TYPE_CLIENT_METRICS]);
assert_eq!(out.len(), 1);
assert_eq!(out[0].resource_type, RESOURCE_TYPE_CLIENT_METRICS);
assert_eq!(out[0].resource_name, "sub-a");
}
#[test]
fn v1_empty_filter_returns_default_set() {
let img = image_with_topics_and_brokers(&["t-a", "t-b"], &[1, 2]);
let out = collect_resources(&img, 1, &[]);
assert!(out.len() == 4);
assert!(out[0].resource_type == RESOURCE_TYPE_TOPIC);
assert!(out[0].resource_name == "t-a");
assert!(out[1].resource_type == RESOURCE_TYPE_TOPIC);
assert!(out[1].resource_name == "t-b");
assert!(out[2].resource_type == RESOURCE_TYPE_BROKER);
assert!(out[2].resource_name == "1");
assert!(out[3].resource_type == RESOURCE_TYPE_BROKER);
assert!(out[3].resource_name == "2");
}
#[test]
fn v1_explicit_topic_only_filter_skips_brokers() {
let img = image_with_topics_and_brokers(&["t-a"], &[1, 2]);
let out = collect_resources(&img, 1, &[RESOURCE_TYPE_TOPIC]);
assert!(out.len() == 1);
assert!(out[0].resource_type == RESOURCE_TYPE_TOPIC);
assert!(out[0].resource_name == "t-a");
}
#[test]
fn v1_explicit_broker_only_filter_skips_topics() {
let img = image_with_topics_and_brokers(&["t-a", "t-b"], &[5, 7]);
let out = collect_resources(&img, 1, &[RESOURCE_TYPE_BROKER]);
assert!(out.len() == 2);
assert!(out[0].resource_type == RESOURCE_TYPE_BROKER);
assert!(out[0].resource_name == "5");
assert!(out[1].resource_type == RESOURCE_TYPE_BROKER);
assert!(out[1].resource_name == "7");
}
#[test]
fn v1_unknown_resource_type_returns_empty() {
let img = image_with_topics_and_brokers(&["t-a"], &[1]);
let out = collect_resources(&img, 1, &[8, 32]);
assert!(
out.is_empty(),
"unsupported resource types silently drop; got {out:?}"
);
}
#[test]
fn v1_mixed_supported_and_unsupported_returns_only_supported() {
let img = image_with_topics_and_brokers(&["t-a"], &[1]);
let out = collect_resources(
&img,
1,
&[
RESOURCE_TYPE_TOPIC,
8, RESOURCE_TYPE_BROKER,
],
);
assert!(out.len() == 2);
assert!(out[0].resource_type == RESOURCE_TYPE_TOPIC);
assert!(out[1].resource_type == RESOURCE_TYPE_BROKER);
}
#[test]
fn output_is_deterministic_regardless_of_input_order() {
let img = image_with_topics_and_brokers(&["z-late", "a-early"], &[10, 1]);
let out = collect_resources(&img, 1, &[]);
let names: Vec<&str> = out.iter().map(|r| r.resource_name.as_str()).collect();
assert!(names == vec!["a-early", "z-late", "1", "10"]);
}
}