crabka-broker 0.3.6

Single-node Apache Kafka-compatible broker (MVP)
Documentation
//! `Metadata` (`api_key=3`). Returns all registered brokers and the
//! requested topics' (or all topics, if `topics: None`) partitions.
//! Metadata is sourced from `controller.current_image()` — the
//! quorum-replicated snapshot — rather than a local in-memory struct.

use bytes::{Bytes, BytesMut};

use crabka_metadata::{AclOperation, ResourceType};
use crabka_protocol::owned::metadata_request::MetadataRequest;
use crabka_protocol::owned::metadata_response::{
    MetadataResponse, MetadataResponseBroker, MetadataResponsePartition, MetadataResponseTopic,
};
use crabka_protocol::primitives::uuid::Uuid as WireUuid;
use crabka_protocol::{Decode, Encode};

use crate::authorizer::{AuthorizationResult, authorize_topics};
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;
use crate::handlers::authorized_operations::authorized_operations_bits;

#[allow(clippy::too_many_lines)] // ACL preamble + asymmetric loop
#[allow(clippy::unused_async)] // Handler is wholly sync but we keep the
// `async fn` shape so it mirrors the other inline-intercept handlers
// (produce/fetch/etc) and lets future Metadata work (e.g. waiting on
// topic creation) add `.await`s without changing the signature.
pub(crate) async fn handle(
    broker: &Broker,
    version: i16,
    _correlation_id: i32,
    req_bytes: &[u8],
    ctx: &crate::handlers::RequestContext<'_>,
) -> Result<Bytes, BrokerError> {
    let controller = broker.controller.clone();
    let inter_broker_name = broker.config.inter_broker_listener_name.clone();

    let mut cur: &[u8] = req_bytes;
    let req = MetadataRequest::decode(&mut cur, version)?;

    let image = controller.current_image();

    // ── ACL preamble ────────────────────────────────────────
    // Metadata has asymmetric authorization semantics for `Describe`:
    //   • Named-topic request (`req.topics = Some([...])`): every
    //     requested topic appears in the response — Allow rows carry
    //     `error_code = 0`, Deny rows carry
    //     `error_code = TOPIC_AUTHORIZATION_FAILED (29)`.
    //   • Fetch-all (`req.topics = None`): only `Allow` topics appear
    //     in the response. Deny topics are silently omitted so the
    //     broker doesn't leak their existence to unauthorized clients.
    //
    // For a named request we resolve each requested `(name, topic_id)`
    // pair up front via the KIP-516 strict resolver, carrying the outcome
    // (`Ok(record)` or an error wire code) per request entry so the
    // response loop below can echo errors without collapsing an unknown
    // id to an empty name. The set of names we authorize is sourced from
    // the *resolved* records (plus the requested name for the
    // name-only-miss case), so a topic requested by id is still
    // ACL-checked under its real name.
    let named = req.topics.is_some();
    let resolved: Vec<(
        &crabka_protocol::owned::metadata_request::MetadataRequestTopic,
        Result<&crabka_metadata::TopicRecord, i16>,
    )> = match &req.topics {
        Some(list) => list
            .iter()
            .map(|t| {
                let name_str = t.name.as_deref().unwrap_or("");
                (
                    t,
                    crate::topic_resolve::resolve(&image, name_str, t.topic_id),
                )
            })
            .collect(),
        None => Vec::new(),
    };
    // Names to batch-authorize: resolved records' real names, plus the
    // requested name for a name-only miss (so the UNKNOWN_TOPIC_OR_PARTITION
    // row still respects Deny → omit / auth-failed semantics). Topic-id
    // errors carry no trustworthy name and are surfaced unconditionally.
    let candidate_topics: Vec<String> = match &req.topics {
        Some(_) => resolved
            .iter()
            .filter_map(|(t, r)| match r {
                Ok(rec) => Some(rec.name.clone()),
                Err(code) if *code == codes::UNKNOWN_TOPIC_OR_PARTITION => {
                    t.name.clone().filter(|n| !n.is_empty())
                }
                Err(_) => None,
            })
            .collect(),
        None => image.topics().map(|t| t.name.clone()).collect(),
    };
    let acl_by_name = authorize_topics(
        broker.config.authorizer.as_ref(),
        &*image,
        ctx.principal,
        ctx.peer,
        AclOperation::Describe,
        candidate_topics.iter().map(String::as_str),
    );

    // Brokers: enumerate all registered nodes from the metadata image.
    let brokers: Vec<MetadataResponseBroker> = image
        .brokers()
        .map(|b| project_broker(b, &inter_broker_name))
        .collect();

    let allowed = |name: &str| {
        acl_by_name
            .get(name)
            .copied()
            .unwrap_or(AuthorizationResult::Deny)
            == AuthorizationResult::Allow
    };
    // Build a fully-populated success row for a known topic by name.
    let success_row = |name: &str, rec: &crabka_metadata::TopicRecord| {
        // Partitions are stored in a `HashMap`; sort by index so clients
        // (and tests) see a deterministic ordering.
        let mut sorted: Vec<_> = image.partitions_of(name).collect();
        sorted.sort_by_key(|p| p.partition);
        let partitions: Vec<MetadataResponsePartition> = sorted
            .into_iter()
            .map(|p| MetadataResponsePartition {
                error_code: codes::NONE,
                partition_index: p.partition,
                leader_id: i32::try_from(p.leader).unwrap_or(i32::MAX),
                leader_epoch: p.leader_epoch,
                replica_nodes: p
                    .replicas
                    .iter()
                    .map(|&r| i32::try_from(r).unwrap_or(i32::MAX))
                    .collect(),
                isr_nodes: p
                    .isr
                    .iter()
                    .map(|&r| i32::try_from(r).unwrap_or(i32::MAX))
                    .collect(),
                ..Default::default()
            })
            .collect();
        // KIP-430: per-topic bitfield, only when the client opted in.
        // Schema gates the field on version (v8+) so the value is
        // harmlessly dropped on the wire below v8.
        let topic_authorized_operations = if req.include_topic_authorized_operations {
            authorized_operations_bits(
                broker.config.authorizer.as_ref(),
                &image,
                ctx.principal,
                ctx.peer,
                ResourceType::Topic,
                name,
            )
        } else {
            i32::MIN
        };
        MetadataResponseTopic {
            error_code: codes::NONE,
            name: Some(rec.name.clone()),
            topic_id: WireUuid(rec.topic_id.into_bytes()),
            partitions,
            is_internal: false,
            topic_authorized_operations,
            ..Default::default()
        }
    };

    let mut topics_out: Vec<MetadataResponseTopic> = Vec::with_capacity(candidate_topics.len());
    if named {
        // Named request: drive off the per-entry resolution outcome so
        // KIP-516 topic-id errors echo the requested id rather than
        // collapsing to an empty name.
        for (t, outcome) in &resolved {
            match outcome {
                Ok(rec) => {
                    if !allowed(&rec.name) {
                        // Named-topic Deny: surface explicit auth-failed row.
                        topics_out.push(MetadataResponseTopic {
                            error_code: codes::TOPIC_AUTHORIZATION_FAILED,
                            name: Some(rec.name.clone()),
                            topic_id: WireUuid::ZERO,
                            ..Default::default()
                        });
                        continue;
                    }
                    topics_out.push(success_row(&rec.name, rec));
                }
                Err(code) if *code == codes::UNKNOWN_TOPIC_OR_PARTITION => {
                    // Name-only miss. Preserve the existing behavior: a
                    // Deny on the requested name yields an auth-failed row
                    // (don't reveal whether the topic exists); otherwise
                    // surface UNKNOWN_TOPIC_OR_PARTITION.
                    let name_str = t.name.as_deref().unwrap_or("");
                    if !name_str.is_empty() && !allowed(name_str) {
                        topics_out.push(MetadataResponseTopic {
                            error_code: codes::TOPIC_AUTHORIZATION_FAILED,
                            name: t.name.clone(),
                            topic_id: WireUuid::ZERO,
                            ..Default::default()
                        });
                        continue;
                    }
                    topics_out.push(MetadataResponseTopic {
                        error_code: codes::UNKNOWN_TOPIC_OR_PARTITION,
                        name: t.name.clone(),
                        topic_id: t.topic_id,
                        ..Default::default()
                    });
                }
                Err(code) => {
                    // KIP-516: UNKNOWN_TOPIC_ID / INCONSISTENT_TOPIC_ID.
                    // Echo the requested name (may be `None`) and id.
                    topics_out.push(MetadataResponseTopic {
                        error_code: *code,
                        name: t.name.clone(),
                        topic_id: t.topic_id,
                        ..Default::default()
                    });
                }
            }
        }
    } else {
        // Fetch-all: only `Allow` topics appear; Deny topics are silently
        // omitted so the broker doesn't leak their existence.
        for name in &candidate_topics {
            if !allowed(name) {
                continue;
            }
            if let Some(rec) = image.topic(name) {
                topics_out.push(success_row(name, rec));
            }
        }
    }

    // controller_id: the current Raft leader, or -1 when unknown.
    let controller_id: i32 = controller
        .watch_leader()
        .borrow()
        .and_then(|id| i32::try_from(id).ok())
        .unwrap_or(-1);

    // KIP-430: the cluster-level field only exists on the wire for v8-10;
    // the codegen drops it on other versions. Compute when the opt-in
    // flag is set so the response carries the value on the in-range
    // versions, leaving the default `i32::MIN` otherwise.
    let cluster_authorized_operations = if req.include_cluster_authorized_operations {
        authorized_operations_bits(
            broker.config.authorizer.as_ref(),
            &image,
            ctx.principal,
            ctx.peer,
            ResourceType::Cluster,
            "kafka-cluster",
        )
    } else {
        i32::MIN
    };

    let resp = MetadataResponse {
        throttle_time_ms: 0,
        brokers,
        cluster_id: Some(image.cluster_id().to_string()),
        controller_id,
        topics: topics_out,
        cluster_authorized_operations,
        ..Default::default()
    };
    tracing::info!(
        version,
        req_topics = ?req.topics.as_ref().map(|ts| ts.iter().filter_map(|t| t.name.clone()).collect::<Vec<_>>()),
        resp_brokers = ?resp.brokers.iter().map(|b| format!("{}@{}:{}", b.node_id, b.host, b.port)).collect::<Vec<_>>(),
        resp_controller_id = resp.controller_id,
        resp_cluster_id = ?resp.cluster_id,
        resp_topics = ?resp.topics.iter().map(|t| format!("{}={:?}/p{}", t.name.as_deref().unwrap_or("?"), t.error_code, t.partitions.len())).collect::<Vec<_>>(),
        "metadata response"
    );
    let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
    resp.encode(&mut buf, version)?;
    Ok(buf.freeze())
}

/// Project a stored [`crabka_metadata::BrokerRegistrationRecord`] into a
/// single wire-format [`MetadataResponseBroker`].
///
/// The Kafka `MetadataResponse` wire format (v0..v12 at time of writing)
/// carries exactly one `host:port`/`rack` tuple per broker — there is no
/// `endpoints[]` array on `MetadataResponseBroker`. To honor per-listener
/// registration we pick the broker's inter-broker endpoint
/// (matched by name) and fall back to the first recorded endpoint, then
/// to the legacy top-level `host`/`port` if `endpoints` is empty.
/// Clamps `node_id` to `i32::MAX` if the openraft `u64` overflows — broker
/// ids are tiny in practice so this is purely defensive.
fn project_broker(
    b: &crabka_metadata::BrokerRegistrationRecord,
    inter_broker_name: &str,
) -> MetadataResponseBroker {
    let primary = b
        .endpoints
        .iter()
        .find(|e| e.name == inter_broker_name)
        .or_else(|| b.endpoints.first());
    let (host, port) = match primary {
        Some(e) => (e.host.clone(), i32::from(e.port)),
        None => (b.host.clone(), i32::from(b.port)),
    };
    MetadataResponseBroker {
        node_id: i32::try_from(b.node_id).unwrap_or(i32::MAX),
        host,
        port,
        rack: b.rack.clone(),
        ..Default::default()
    }
}

fn parse_host_port(addr: &str) -> (String, i32) {
    if let Some((h, p)) = addr.rsplit_once(':')
        && let Ok(port) = p.parse::<u16>()
    {
        return (h.to_string(), i32::from(port));
    }
    tracing::warn!(
        addr,
        "advertised_listener not host:port; falling back to localhost:9092"
    );
    ("localhost".into(), 9092)
}

#[cfg(test)]
mod tests {
    use super::*;
    use assert2::assert;

    #[test]
    fn parse_host_port_ok() {
        assert!(parse_host_port("foo:1234") == ("foo".into(), 1234));
    }

    #[test]
    fn parse_host_port_falls_back() {
        assert!(parse_host_port("not-an-addr") == ("localhost".into(), 9092));
    }
}