use bytes::{Bytes, BytesMut};
use crabka_metadata::AclOperation;
use crabka_protocol::owned::delete_records_request::DeleteRecordsRequest;
use crabka_protocol::owned::delete_records_response::{
DeleteRecordsPartitionResult, DeleteRecordsResponse, DeleteRecordsTopicResult,
};
use crabka_protocol::{Decode, Encode};
use crate::authorizer::{AuthorizationResult, authorize_topics};
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;
#[allow(clippy::too_many_lines)]
pub(crate) async fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
ctx: &crate::handlers::RequestContext<'_>,
) -> Result<Bytes, BrokerError> {
let mut cur: &[u8] = req_bytes;
let req = DeleteRecordsRequest::decode(&mut cur, version)?;
let partitions = broker.partitions.clone();
let node_id = broker.config.node_id;
let image = broker.controller.current_image();
let topic_names: Vec<&str> = req.topics.iter().map(|t| t.name.as_str()).collect();
let acl_results = authorize_topics(
broker.config.authorizer.as_ref(),
&*image,
ctx.principal,
ctx.peer,
AclOperation::Delete,
topic_names.iter().copied(),
);
let denied_topics: std::collections::HashSet<String> = acl_results
.iter()
.filter_map(|(name, r)| {
if *r == AuthorizationResult::Deny {
Some((*name).to_string())
} else {
None
}
})
.collect();
let mut topic_results: Vec<DeleteRecordsTopicResult> = Vec::with_capacity(req.topics.len());
for topic in req.topics {
if denied_topics.contains(&topic.name) {
let part_results: Vec<DeleteRecordsPartitionResult> = topic
.partitions
.iter()
.map(|fp| DeleteRecordsPartitionResult {
partition_index: fp.partition_index,
low_watermark: -1,
error_code: codes::TOPIC_AUTHORIZATION_FAILED,
..Default::default()
})
.collect();
topic_results.push(DeleteRecordsTopicResult {
name: topic.name,
partitions: part_results,
..Default::default()
});
continue;
}
let mut part_results: Vec<DeleteRecordsPartitionResult> =
Vec::with_capacity(topic.partitions.len());
for fp in topic.partitions {
let part_opt = partitions.get(&topic.name, fp.partition_index);
let Some(part) = part_opt else {
part_results.push(DeleteRecordsPartitionResult {
partition_index: fp.partition_index,
low_watermark: -1,
error_code: codes::UNKNOWN_TOPIC_OR_PARTITION,
..Default::default()
});
continue;
};
let cur_leader = part
.current_leader
.load(std::sync::atomic::Ordering::Acquire);
if cur_leader != node_id {
part_results.push(DeleteRecordsPartitionResult {
partition_index: fp.partition_index,
low_watermark: -1,
error_code: codes::NOT_LEADER_OR_FOLLOWER,
..Default::default()
});
continue;
}
let leo = part.log_end_offset();
let hw = part.high_watermark().await;
let target = if fp.offset == -1 { hw } else { fp.offset };
if target < 0 || target > leo {
part_results.push(DeleteRecordsPartitionResult {
partition_index: fp.partition_index,
low_watermark: -1,
error_code: codes::OFFSET_OUT_OF_RANGE,
..Default::default()
});
continue;
}
match part.trim_to_offset(target).await {
Ok(new_start) => {
part_results.push(DeleteRecordsPartitionResult {
partition_index: fp.partition_index,
low_watermark: new_start,
error_code: codes::NONE,
..Default::default()
});
}
Err(e) => {
tracing::warn!(
topic = %topic.name, partition = fp.partition_index, error = %e,
"DeleteRecords: trim_to_offset failed"
);
part_results.push(DeleteRecordsPartitionResult {
partition_index: fp.partition_index,
low_watermark: -1,
error_code: codes::UNKNOWN_SERVER_ERROR,
..Default::default()
});
}
}
}
topic_results.push(DeleteRecordsTopicResult {
name: topic.name,
partitions: part_results,
..Default::default()
});
}
let resp = DeleteRecordsResponse {
topics: topic_results,
throttle_time_ms: 0,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}