use crabka_metadata::{AclOperation, FeatureLevelRecord, MetadataRecord};
use crabka_protocol::owned::update_features_request::UpdateFeaturesRequest;
use crabka_protocol::owned::update_features_response::{
UpdatableFeatureResult, UpdateFeaturesResponse,
};
use crabka_raft::RaftError;
use crate::authorizer::{AuthorizationRequest, AuthorizationResult};
use crate::broker::Broker;
use crate::codes;
fn dependencies_met(image: &crabka_metadata::MetadataImage, deps: &[(&str, i16)]) -> bool {
deps.iter().all(|(dep, min_level)| {
image
.finalized_features()
.get(*dep)
.is_some_and(|finalized| finalized >= min_level)
})
}
fn downgrade_allowed(version: i16, allow_downgrade: bool, upgrade_type: i8) -> bool {
if version == 0 {
allow_downgrade
} else {
matches!(upgrade_type, 2 | 3)
}
}
#[allow(clippy::too_many_lines)]
pub(crate) async fn handle(
broker: &Broker,
req: UpdateFeaturesRequest,
version: i16,
ctx: &crate::handlers::RequestContext<'_>,
) -> UpdateFeaturesResponse {
let image = broker.controller.current_image();
let authorized = broker.config.authorizer.authorize(
&*image,
&AuthorizationRequest {
principal: ctx.principal,
host: ctx.peer,
resource_type: crabka_metadata::ResourceType::Cluster,
resource_name: "kafka-cluster",
operation: AclOperation::Alter,
},
) == AuthorizationResult::Allow;
if !authorized {
return top_level_error(
codes::CLUSTER_AUTHORIZATION_FAILED,
"Cluster authorization failed.",
version,
);
}
if req.feature_updates.is_empty() {
return top_level_error(
codes::INVALID_REQUEST,
"Can not provide empty feature updates in the request.",
version,
);
}
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut results: Vec<UpdatableFeatureResult> = Vec::new();
let mut records: Vec<MetadataRecord> = Vec::new();
for upd in &req.feature_updates {
let name = upd.feature.clone();
if !seen.insert(name.clone()) {
results.push(row(
name,
codes::INVALID_REQUEST,
"Provided feature can not be updated more than once in the request.",
));
continue;
}
let Some(feat) = crabka_metadata::feature(&name) else {
results.push(row(
name,
codes::INVALID_REQUEST,
"Could not apply finalized feature update because the provided feature is not supported.",
));
continue;
};
let level = upd.max_version_level;
let current = image.finalized_features().get(&name).copied();
let allow_dg = downgrade_allowed(version, upd.allow_downgrade, upd.upgrade_type);
let (_min, max) = feat.supported_range();
if level < 0 || level > max {
results.push(row(
name,
codes::INVALID_UPDATE_VERSION,
"Provided version level is not in the supported range.",
));
continue;
}
let floor = feat.min_required_floor(&image);
if level > 0 && level < floor {
results.push(row(
name,
codes::INVALID_UPDATE_VERSION,
"Can not downgrade the feature below the level required by existing cluster state.",
));
continue;
}
if !dependencies_met(&image, feat.dependencies(level)) {
results.push(row(
name,
codes::INVALID_UPDATE_VERSION,
"Can not finalize feature: a required dependency feature is not finalized at a high enough level.",
));
continue;
}
if level == 0 {
if current.is_none() {
results.push(row(
name,
codes::INVALID_UPDATE_VERSION,
"Can not delete a finalized feature that does not exist.",
));
continue;
}
if !allow_dg {
results.push(row(
name,
codes::INVALID_UPDATE_VERSION,
"Can not delete a finalized feature without setting the downgrade flag.",
));
continue;
}
} else if let Some(cur) = current
&& level < cur
&& !allow_dg
{
results.push(row(
name,
codes::INVALID_UPDATE_VERSION,
"Can not downgrade a finalized feature without setting the downgrade flag.",
));
continue;
}
records.push(MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
name: name.clone(),
level,
}));
results.push(row(name, codes::NONE, ""));
}
if req.validate_only {
return finalize(results, version);
}
if !records.is_empty() {
match broker.controller.submit_change(records).await {
Ok(()) => {}
Err(RaftError::NotLeader { .. } | RaftError::LeaderUnknown) => {
return apply_request_wide(
results,
codes::NOT_CONTROLLER,
"This broker is not the active controller.",
version,
);
}
Err(e) => {
tracing::warn!(error = %e, "UpdateFeatures: submit_change failed");
return apply_request_wide(
results,
codes::FEATURE_UPDATE_FAILED,
"Failed to persist the feature update.",
version,
);
}
}
}
finalize(results, version)
}
fn row(feature: String, error_code: i16, msg: &str) -> UpdatableFeatureResult {
UpdatableFeatureResult {
feature,
error_code,
error_message: (error_code != codes::NONE).then(|| msg.to_string()),
..Default::default()
}
}
fn top_level_error(code: i16, msg: &str, version: i16) -> UpdateFeaturesResponse {
let _ = version;
UpdateFeaturesResponse {
throttle_time_ms: 0,
error_code: code,
error_message: Some(msg.to_string()),
results: Vec::new(),
..Default::default()
}
}
fn apply_request_wide(
mut results: Vec<UpdatableFeatureResult>,
code: i16,
msg: &str,
version: i16,
) -> UpdateFeaturesResponse {
for r in results.iter_mut().filter(|r| r.error_code == codes::NONE) {
r.error_code = code;
r.error_message = Some(msg.to_string());
}
let mut resp = finalize(results, version);
resp.error_code = code;
resp.error_message = Some(msg.to_string());
resp
}
fn finalize(results: Vec<UpdatableFeatureResult>, version: i16) -> UpdateFeaturesResponse {
let (top_code, top_msg) = if version >= 2 {
results
.iter()
.find(|r| r.error_code != codes::NONE)
.map_or((codes::NONE, None), |r| {
(r.error_code, r.error_message.clone())
})
} else {
(codes::NONE, None)
};
UpdateFeaturesResponse {
throttle_time_ms: 0,
error_code: top_code,
error_message: top_msg,
results,
..Default::default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn downgrade_flag_v0_uses_allow_downgrade() {
assert!(downgrade_allowed(0, true, 1));
assert!(!downgrade_allowed(0, false, 2));
}
#[test]
fn downgrade_flag_v1_uses_upgrade_type() {
assert!(!downgrade_allowed(1, true, 1)); assert!(downgrade_allowed(1, false, 2)); assert!(downgrade_allowed(1, false, 3)); }
#[test]
fn row_sets_message_only_on_error() {
assert!(
row("metadata.version".into(), codes::NONE, "x")
.error_message
.is_none()
);
assert!(
row(
"metadata.version".into(),
codes::INVALID_UPDATE_VERSION,
"bad"
)
.error_message
.as_deref()
== Some("bad")
);
}
#[test]
fn finalize_v2_promotes_first_error_to_top_level() {
let results = vec![
row("a".into(), codes::NONE, ""),
row("b".into(), codes::INVALID_UPDATE_VERSION, "bad"),
];
let resp = finalize(results, 2);
assert!(resp.error_code == codes::INVALID_UPDATE_VERSION);
}
#[test]
fn finalize_v1_keeps_top_level_none() {
let results = vec![row("b".into(), codes::INVALID_UPDATE_VERSION, "bad")];
let resp = finalize(results, 1);
assert!(resp.error_code == codes::NONE);
}
#[test]
fn metadata_version_floor_via_registry() {
let image = crabka_metadata::MetadataImage::new(uuid::Uuid::nil());
let feat = crabka_metadata::feature("metadata.version").unwrap();
assert!(feat.min_required_floor(&image) == crate::features::METADATA_VERSION_MIN);
}
#[test]
fn dependencies_met_checks_finalized_levels() {
use crabka_metadata::{FeatureLevelRecord, MetadataImage, MetadataRecord};
let mut image = MetadataImage::new(uuid::Uuid::nil());
assert!(dependencies_met(&image, &[]));
assert!(!dependencies_met(&image, &[("metadata.version", 22)]));
image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
name: "metadata.version".into(),
level: 25,
}));
assert!(dependencies_met(&image, &[("metadata.version", 22)]));
assert!(!dependencies_met(&image, &[("metadata.version", 26)]));
}
}