use bytes::{Bytes, BytesMut};
use std::sync::atomic::Ordering;
use crabka_metadata::{AclOperation, ResourceType};
use crabka_protocol::owned::offset_for_leader_epoch_request::OffsetForLeaderEpochRequest;
use crabka_protocol::owned::offset_for_leader_epoch_response::{
EpochEndOffset, OffsetForLeaderEpochResponse, OffsetForLeaderTopicResult,
};
use crabka_protocol::{Decode, Encode};
use crate::authorizer::{AuthorizationRequest, AuthorizationResult};
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;
#[allow(clippy::unused_async)] pub(crate) async fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
ctx: &crate::handlers::RequestContext<'_>,
) -> Result<Bytes, BrokerError> {
let partitions = broker.partitions.clone();
#[cfg(any(test, feature = "test-helpers"))]
let ofle_counter = broker.offset_for_leader_epoch_requests.clone();
{
#[cfg(any(test, feature = "test-helpers"))]
ofle_counter.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let mut cur: &[u8] = req_bytes;
let req = OffsetForLeaderEpochRequest::decode(&mut cur, version)?;
let acl_image = broker.controller.current_image();
let mut topics_out: Vec<OffsetForLeaderTopicResult> = Vec::with_capacity(req.topics.len());
for topic in req.topics {
if topic_describe_denied(
broker.config.authorizer.as_ref(),
&acl_image,
ctx.principal,
ctx.peer,
&topic.topic,
) {
let parts_out = topic
.partitions
.iter()
.map(|part| EpochEndOffset {
partition: part.partition,
leader_epoch: part.leader_epoch,
end_offset: -1,
error_code: codes::TOPIC_AUTHORIZATION_FAILED,
..Default::default()
})
.collect();
topics_out.push(OffsetForLeaderTopicResult {
topic: topic.topic,
partitions: parts_out,
..Default::default()
});
continue;
}
let mut parts_out: Vec<EpochEndOffset> = Vec::with_capacity(topic.partitions.len());
for part in &topic.partitions {
let mut out = EpochEndOffset {
partition: part.partition,
leader_epoch: part.leader_epoch,
end_offset: -1,
..Default::default()
};
let Some(p) = partitions.get(&topic.topic, part.partition) else {
out.error_code = codes::UNKNOWN_TOPIC_OR_PARTITION;
parts_out.push(out);
continue;
};
let current_epoch = p.current_leader_epoch.load(Ordering::Acquire);
if part.leader_epoch > current_epoch {
out.error_code = codes::UNKNOWN_LEADER_EPOCH;
out.end_offset = -1;
} else {
let log = p.log.lock().expect("log mutex poisoned");
let leo = log.log_end_offset();
let end_offset = log
.epoch_checkpoint()
.end_offset_for_epoch(part.leader_epoch, leo);
drop(log);
out.error_code = codes::NONE;
out.end_offset = end_offset;
out.leader_epoch = current_epoch;
}
parts_out.push(out);
}
topics_out.push(OffsetForLeaderTopicResult {
topic: topic.topic,
partitions: parts_out,
..Default::default()
});
}
let resp = OffsetForLeaderEpochResponse {
throttle_time_ms: 0,
topics: topics_out,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}
}
fn topic_describe_denied(
authorizer: &dyn crate::authorizer::Authorizer,
image: &crabka_metadata::MetadataImage,
principal: &crabka_security::Principal,
host: &std::net::SocketAddr,
topic: &str,
) -> bool {
authorizer.authorize(
image,
&AuthorizationRequest {
principal,
host,
resource_type: ResourceType::Topic,
resource_name: topic,
operation: AclOperation::Describe,
},
) == AuthorizationResult::Deny
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn topic_describe_denied_yields_topic_authorization_failed_rows() {
use crabka_protocol::owned::offset_for_leader_epoch_response::{
self, EpochEndOffset, OffsetForLeaderEpochResponse, OffsetForLeaderTopicResult,
};
let authorizer =
crate::authorizer::SimpleAclAuthorizer::new(std::collections::HashSet::new());
let image = crabka_metadata::MetadataImage::new(uuid::Uuid::nil());
let principal = crabka_security::Principal {
name: "ANONYMOUS".into(),
auth_method: crabka_security::AuthMethod::Anonymous,
groups: vec![],
};
let peer = std::net::SocketAddr::from(([127, 0, 0, 1], 9092));
assert!(topic_describe_denied(
&authorizer,
&image,
&principal,
&peer,
"t"
));
let resp = OffsetForLeaderEpochResponse {
throttle_time_ms: 0,
topics: vec![OffsetForLeaderTopicResult {
topic: "t".into(),
partitions: vec![EpochEndOffset {
partition: 0,
leader_epoch: 0,
end_offset: -1,
error_code: codes::TOPIC_AUTHORIZATION_FAILED,
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
let version = offset_for_leader_epoch_response::MAX_VERSION;
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version).expect("encode");
let mut cur: &[u8] = &buf;
let decoded = OffsetForLeaderEpochResponse::decode(&mut cur, version).unwrap();
assert!(decoded.topics[0].partitions[0].error_code == codes::TOPIC_AUTHORIZATION_FAILED);
}
}