crabka-broker 0.3.6

Single-node Apache Kafka-compatible broker (MVP)
Documentation
//! `OffsetForLeaderEpoch` (`api_key=23`). For each requested (topic,
//! partition, `leader_epoch`), returns the `end_offset` of that epoch —
//! i.e., the first offset of the *next* epoch, which is the truncation
//! point a follower should use when recovering from `FENCED_LEADER_EPOCH`.
//!
//! Protocol:
//! - `requested_epoch > current_leader_epoch` → `UNKNOWN_LEADER_EPOCH`
//! - `requested_epoch == current_leader_epoch` → `end_offset = log_end_offset`
//! - `requested_epoch < current_leader_epoch` → `end_offset` from checkpoint,
//!   or `-1` (`UNDEFINED_OFFSET`) if not in checkpoint.
//!
//! Reference: KIP-101 (Alter Replication Protocol to use Leader Epoch
//! rather than High Watermark for Truncation).

use bytes::{Bytes, BytesMut};
use std::sync::atomic::Ordering;

use crabka_metadata::{AclOperation, ResourceType};
use crabka_protocol::owned::offset_for_leader_epoch_request::OffsetForLeaderEpochRequest;
use crabka_protocol::owned::offset_for_leader_epoch_response::{
    EpochEndOffset, OffsetForLeaderEpochResponse, OffsetForLeaderTopicResult,
};
use crabka_protocol::{Decode, Encode};

use crate::authorizer::{AuthorizationRequest, AuthorizationResult};
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;

#[allow(clippy::unused_async)] // async to match the inline-intercept handler shape.
pub(crate) async fn handle(
    broker: &Broker,
    version: i16,
    _correlation_id: i32,
    req_bytes: &[u8],
    ctx: &crate::handlers::RequestContext<'_>,
) -> Result<Bytes, BrokerError> {
    let partitions = broker.partitions.clone();
    // Test-only: count served OFLE requests so the KIP-320 proactive-validation
    // integration test can prove the consumer's validate pass issued an OFLE
    // RPC (vs. the reactive in-band fetch paths, which issue none).
    #[cfg(any(test, feature = "test-helpers"))]
    let ofle_counter = broker.offset_for_leader_epoch_requests.clone();
    {
        #[cfg(any(test, feature = "test-helpers"))]
        ofle_counter.fetch_add(1, std::sync::atomic::Ordering::AcqRel);

        let mut cur: &[u8] = req_bytes;
        let req = OffsetForLeaderEpochRequest::decode(&mut cur, version)?;

        // ── ACL preamble ────────────────────────────────────────────
        // Per-topic `Describe` on `Topic(name)`. A denied topic gets
        // `TOPIC_AUTHORIZATION_FAILED (29)` on every partition row it
        // requested; authorized topics proceed unchanged.
        let acl_image = broker.controller.current_image();

        let mut topics_out: Vec<OffsetForLeaderTopicResult> = Vec::with_capacity(req.topics.len());

        for topic in req.topics {
            if topic_describe_denied(
                broker.config.authorizer.as_ref(),
                &acl_image,
                ctx.principal,
                ctx.peer,
                &topic.topic,
            ) {
                let parts_out = topic
                    .partitions
                    .iter()
                    .map(|part| EpochEndOffset {
                        partition: part.partition,
                        leader_epoch: part.leader_epoch,
                        end_offset: -1,
                        error_code: codes::TOPIC_AUTHORIZATION_FAILED,
                        ..Default::default()
                    })
                    .collect();
                topics_out.push(OffsetForLeaderTopicResult {
                    topic: topic.topic,
                    partitions: parts_out,
                    ..Default::default()
                });
                continue;
            }
            let mut parts_out: Vec<EpochEndOffset> = Vec::with_capacity(topic.partitions.len());

            for part in &topic.partitions {
                let mut out = EpochEndOffset {
                    partition: part.partition,
                    leader_epoch: part.leader_epoch,
                    end_offset: -1,
                    ..Default::default()
                };

                let Some(p) = partitions.get(&topic.topic, part.partition) else {
                    out.error_code = codes::UNKNOWN_TOPIC_OR_PARTITION;
                    parts_out.push(out);
                    continue;
                };

                let current_epoch = p.current_leader_epoch.load(Ordering::Acquire);

                if part.leader_epoch > current_epoch {
                    // Follower is ahead of us — stale metadata on our side.
                    out.error_code = codes::UNKNOWN_LEADER_EPOCH;
                    out.end_offset = -1;
                } else {
                    // Compute end_offset via the epoch checkpoint.
                    // `end_offset_for_epoch` returns log_end_offset when
                    // leader_epoch == current_epoch (the epoch is still
                    // open), or the start-offset of the next epoch (which
                    // is the truncation point) for older epochs.
                    let log = p.log.lock().expect("log mutex poisoned");
                    let leo = log.log_end_offset();
                    let end_offset = log
                        .epoch_checkpoint()
                        .end_offset_for_epoch(part.leader_epoch, leo);
                    drop(log);
                    out.error_code = codes::NONE;
                    out.end_offset = end_offset;
                    // Report the leader's view of the epoch (same as
                    // requested unless our checkpoint doesn't know the
                    // exact epoch, in which case end_offset == -1).
                    out.leader_epoch = current_epoch;
                }

                parts_out.push(out);
            }

            topics_out.push(OffsetForLeaderTopicResult {
                topic: topic.topic,
                partitions: parts_out,
                ..Default::default()
            });
        }

        let resp = OffsetForLeaderEpochResponse {
            throttle_time_ms: 0,
            topics: topics_out,
            ..Default::default()
        };
        let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
        resp.encode(&mut buf, version)?;
        Ok(buf.freeze())
    }
}

/// `Describe` on `Topic(name)` gate. Returns `true` when denied.
fn topic_describe_denied(
    authorizer: &dyn crate::authorizer::Authorizer,
    image: &crabka_metadata::MetadataImage,
    principal: &crabka_security::Principal,
    host: &std::net::SocketAddr,
    topic: &str,
) -> bool {
    authorizer.authorize(
        image,
        &AuthorizationRequest {
            principal,
            host,
            resource_type: ResourceType::Topic,
            resource_name: topic,
            operation: AclOperation::Describe,
        },
    ) == AuthorizationResult::Deny
}

#[cfg(test)]
mod tests {
    use super::*;
    use assert2::assert;

    #[test]
    fn topic_describe_denied_yields_topic_authorization_failed_rows() {
        use crabka_protocol::owned::offset_for_leader_epoch_response::{
            self, EpochEndOffset, OffsetForLeaderEpochResponse, OffsetForLeaderTopicResult,
        };

        let authorizer =
            crate::authorizer::SimpleAclAuthorizer::new(std::collections::HashSet::new());
        let image = crabka_metadata::MetadataImage::new(uuid::Uuid::nil());
        let principal = crabka_security::Principal {
            name: "ANONYMOUS".into(),
            auth_method: crabka_security::AuthMethod::Anonymous,
            groups: vec![],
        };
        let peer = std::net::SocketAddr::from(([127, 0, 0, 1], 9092));

        assert!(topic_describe_denied(
            &authorizer,
            &image,
            &principal,
            &peer,
            "t"
        ));

        let resp = OffsetForLeaderEpochResponse {
            throttle_time_ms: 0,
            topics: vec![OffsetForLeaderTopicResult {
                topic: "t".into(),
                partitions: vec![EpochEndOffset {
                    partition: 0,
                    leader_epoch: 0,
                    end_offset: -1,
                    error_code: codes::TOPIC_AUTHORIZATION_FAILED,
                    ..Default::default()
                }],
                ..Default::default()
            }],
            ..Default::default()
        };
        let version = offset_for_leader_epoch_response::MAX_VERSION;
        let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
        resp.encode(&mut buf, version).expect("encode");
        let mut cur: &[u8] = &buf;
        let decoded = OffsetForLeaderEpochResponse::decode(&mut cur, version).unwrap();
        assert!(decoded.topics[0].partitions[0].error_code == codes::TOPIC_AUTHORIZATION_FAILED);
    }
}