use bytes::{Bytes, BytesMut};
use crabka_metadata::{AclOperation, MetadataRecord, ResourceType, TopicConfigRecord};
use crabka_protocol::owned::alter_configs_request::AlterConfigsRequest;
use crabka_protocol::owned::alter_configs_response::{
AlterConfigsResourceResponse, AlterConfigsResponse,
};
use crabka_protocol::{Decode, Encode};
use crabka_raft::RaftError;
use crate::authorizer::{AuthorizationRequest, AuthorizationResult};
use crate::broker::Broker;
use crate::codes;
use crate::config_keys;
use crate::error::BrokerError;
const RESOURCE_TYPE_TOPIC: i8 = 2;
const RESOURCE_TYPE_BROKER: i8 = 4;
#[allow(clippy::too_many_lines)]
pub(crate) async fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
ctx: &crate::handlers::RequestContext<'_>,
) -> Result<Bytes, BrokerError> {
let mut cur: &[u8] = req_bytes;
let req = AlterConfigsRequest::decode(&mut cur, version)?;
let image = broker.controller.current_image();
let mut responses: Vec<AlterConfigsResourceResponse> = Vec::with_capacity(req.resources.len());
for resource in req.resources {
let mut out = AlterConfigsResourceResponse {
resource_type: resource.resource_type,
resource_name: resource.resource_name.clone(),
error_code: codes::NONE,
error_message: None,
..Default::default()
};
let acl_result = match resource.resource_type {
RESOURCE_TYPE_TOPIC => broker.config.authorizer.authorize(
&*image,
&AuthorizationRequest {
principal: ctx.principal,
host: ctx.peer,
resource_type: ResourceType::Topic,
resource_name: &resource.resource_name,
operation: AclOperation::AlterConfigs,
},
),
RESOURCE_TYPE_BROKER => broker.config.authorizer.authorize(
&*image,
&AuthorizationRequest {
principal: ctx.principal,
host: ctx.peer,
resource_type: ResourceType::Cluster,
resource_name: "kafka-cluster",
operation: AclOperation::AlterConfigs,
},
),
_ => {
out.error_code = codes::INVALID_RESOURCE_TYPE;
out.error_message = Some(format!(
"resource_type={} not supported",
resource.resource_type
));
responses.push(out);
continue;
}
};
if acl_result == AuthorizationResult::Deny {
out.error_code = match resource.resource_type {
RESOURCE_TYPE_TOPIC => codes::TOPIC_AUTHORIZATION_FAILED,
_ => codes::CLUSTER_AUTHORIZATION_FAILED,
};
responses.push(out);
continue;
}
if resource.resource_type != RESOURCE_TYPE_TOPIC {
out.error_code = codes::INVALID_RESOURCE_TYPE;
out.error_message = Some(format!(
"resource_type={} not supported",
resource.resource_type
));
responses.push(out);
continue;
}
if image.topic(&resource.resource_name).is_none() {
out.error_code = codes::UNKNOWN_TOPIC_OR_PARTITION;
out.error_message = Some(format!("unknown topic `{}`", resource.resource_name));
responses.push(out);
continue;
}
let mut overrides = std::collections::BTreeMap::new();
let mut validation_err: Option<String> = None;
for cfg in &resource.configs {
let value = cfg.value.clone().unwrap_or_default();
if let Err(reason) = config_keys::validate_topic_config(&cfg.name, &value) {
validation_err = Some(reason);
break;
}
overrides.insert(cfg.name.clone(), value);
}
if let Some(reason) = validation_err {
out.error_code = codes::INVALID_CONFIG;
out.error_message = Some(reason);
responses.push(out);
continue;
}
let record = MetadataRecord::V1TopicConfig(TopicConfigRecord {
topic: resource.resource_name.clone(),
overrides,
});
if req.validate_only {
responses.push(out);
continue;
}
match broker.controller.submit_change(vec![record]).await {
Ok(()) => {}
Err(RaftError::NotLeader { .. } | RaftError::LeaderUnknown) => {
out.error_code = codes::NOT_CONTROLLER;
}
Err(e) => {
tracing::error!(error = %e, "AlterConfigs submit_change failed");
out.error_code = codes::UNKNOWN_SERVER_ERROR;
}
}
responses.push(out);
}
let resp = AlterConfigsResponse {
responses,
throttle_time_ms: 0,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}