crabka-broker 0.3.6

Single-node Apache Kafka-compatible broker (MVP)
Documentation
//! `DeleteAcls` handler (`api_key` 31).

use bytes::Bytes;
use crabka_metadata::{AclEntry, AclEntryFilter, MetadataRecord};
use crabka_protocol::Encode;
use crabka_protocol::owned::delete_acls_request::{DeleteAclsFilter, DeleteAclsRequest};
use crabka_protocol::owned::delete_acls_response::{
    DeleteAclsFilterResult, DeleteAclsMatchingAcl, DeleteAclsResponse,
};

use super::acl_wire::{
    operation_filter, operation_to_wire, pattern_type_filter, pattern_type_to_wire,
    permission_filter, permission_to_wire, resource_type_filter, resource_type_to_wire,
};
use crate::authorizer::{AuthorizationRequest, AuthorizationResult};
use crate::broker::Broker;
use crate::codes;

pub(crate) async fn handle(
    broker: &Broker,
    req: DeleteAclsRequest,
    ctx: &crate::handlers::RequestContext<'_>,
    api_version: i16,
) -> Result<Bytes, crate::error::BrokerError> {
    let image = broker.controller.current_image();

    // Whole-request cluster-alter gate.
    let allow = broker.config.authorizer.authorize(
        &*image,
        &AuthorizationRequest {
            principal: ctx.principal,
            host: ctx.peer,
            resource_type: crabka_metadata::ResourceType::Cluster,
            resource_name: "kafka-cluster",
            operation: crabka_metadata::AclOperation::Alter,
        },
    );
    if allow == AuthorizationResult::Deny {
        let filter_results = req
            .filters
            .iter()
            .map(|_| DeleteAclsFilterResult {
                error_code: codes::CLUSTER_AUTHORIZATION_FAILED,
                error_message: Some("delete-acls denied".into()),
                matching_acls: vec![],
                ..Default::default()
            })
            .collect();
        return encode_response(
            &DeleteAclsResponse {
                throttle_time_ms: 0,
                filter_results,
                ..Default::default()
            },
            api_version,
        );
    }

    let mut filter_results: Vec<DeleteAclsFilterResult> = Vec::with_capacity(req.filters.len());
    let mut to_submit: Vec<MetadataRecord> = Vec::with_capacity(req.filters.len());

    for f in &req.filters {
        match build_filter(f) {
            Ok(filter) => {
                let matched: Vec<&AclEntry> =
                    image.all_acls().filter(|e| filter.matches(e)).collect();
                let matching_acls = matched
                    .iter()
                    .map(|e| DeleteAclsMatchingAcl {
                        error_code: 0,
                        error_message: None,
                        resource_type: resource_type_to_wire(e.resource_type),
                        resource_name: e.resource_name.clone(),
                        pattern_type: pattern_type_to_wire(e.pattern_type),
                        principal: e.principal.clone(),
                        host: e.host.clone(),
                        operation: operation_to_wire(e.operation),
                        permission_type: permission_to_wire(e.permission_type),
                        ..Default::default()
                    })
                    .collect::<Vec<_>>();
                filter_results.push(DeleteAclsFilterResult {
                    error_code: 0,
                    error_message: None,
                    matching_acls,
                    ..Default::default()
                });
                to_submit.push(MetadataRecord::V1DeleteAccessControlEntry(filter));
            }
            Err(_) => {
                filter_results.push(DeleteAclsFilterResult {
                    error_code: codes::INVALID_REQUEST,
                    error_message: Some("malformed filter axis".into()),
                    matching_acls: vec![],
                    ..Default::default()
                });
            }
        }
    }

    if !to_submit.is_empty()
        && let Err(e) = broker.controller.submit_change(to_submit).await
    {
        tracing::warn!(error = %e, "delete-acls submit failed");
        for r in &mut filter_results {
            if r.error_code == 0 {
                r.error_code = codes::COORDINATOR_NOT_AVAILABLE;
                r.error_message = Some(format!("submit failed: {e}"));
            }
        }
    }

    encode_response(
        &DeleteAclsResponse {
            throttle_time_ms: 0,
            filter_results,
            ..Default::default()
        },
        api_version,
    )
}

fn build_filter(f: &DeleteAclsFilter) -> Result<AclEntryFilter, super::acl_wire::WireAclError> {
    let resource_name = f.resource_name_filter.clone().filter(|s| !s.is_empty());
    let principal = f.principal_filter.clone().filter(|s| !s.is_empty());
    let host = f.host_filter.clone().filter(|s| !s.is_empty());
    Ok(AclEntryFilter {
        resource_type: resource_type_filter(f.resource_type_filter)?,
        resource_name,
        pattern_type: pattern_type_filter(f.pattern_type_filter)?,
        principal,
        host,
        operation: operation_filter(f.operation)?,
        permission_type: permission_filter(f.permission_type)?,
    })
}

fn encode_response<R: Encode>(
    resp: &R,
    api_version: i16,
) -> Result<Bytes, crate::error::BrokerError> {
    let mut body = Vec::new();
    resp.encode(&mut body, api_version)
        .map_err(|e| crate::error::BrokerError::Replication(format!("encode DeleteAcls: {e}")))?;
    Ok(Bytes::from(body))
}