crabka-broker 0.3.6

Single-node Apache Kafka-compatible broker (MVP)
Documentation
//! Handler dispatch. One module per API key implements:
//!
//!   `pub async fn handle(broker: &Broker, version: i16, req_bytes: &[u8])
//!       -> Result<bytes::Bytes, BrokerError>`
//!
//! Handlers decode the request, do their work, encode the response, and
//! return the encoded bytes ready to ship after the response header is
//! prepended in `network::dispatch`.

#![allow(dead_code)] // handler modules are registered as each API is enabled.

use bytes::Bytes;

use crate::error::BrokerError;

/// Function signature every handler in this module exports.
pub type HandlerFn = fn(
    broker: &crate::broker::Broker,
    version: i16,
    correlation_id: i32,
    req_bytes: &[u8],
) -> futures_util::future::BoxFuture<'static, Result<Bytes, BrokerError>>;

/// API key → handler function. Built by `Broker::start` from the enabled
/// per-API modules.
#[derive(Default)]
pub struct HandlerTable {
    table: std::collections::HashMap<i16, HandlerFn>,
}

impl HandlerTable {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn register(&mut self, api_key: i16, handler: HandlerFn) {
        self.table.insert(api_key, handler);
    }

    #[must_use]
    pub fn get(&self, api_key: i16) -> Option<HandlerFn> {
        self.table.get(&api_key).copied()
    }
}

pub(crate) mod context;
pub(crate) use context::RequestContext;
pub(crate) use context::TelemetryContext;

pub(crate) mod acl_wire;
// KIP-853 dynamic-quorum reconfiguration (api_keys 80/81/82).
pub(crate) mod add_raft_voter;
pub(crate) mod alter_client_quotas;
pub(crate) mod alter_configs;
pub(crate) mod alter_partition;
pub(crate) mod alter_partition_reassignments;
pub(crate) mod alter_replica_log_dirs;
pub(crate) mod alter_user_scram_credentials;
pub(crate) mod api_versions;
pub(crate) mod assign_replicas_to_dirs;
// KIP-430: authorized-operations bitfield helper used by metadata,
// describe_cluster, describe_groups when the request opts in.
pub(crate) mod authorized_operations;
pub(crate) mod broker_heartbeat;
pub(crate) mod consumer_group_describe;
pub(crate) mod consumer_group_heartbeat;
pub(crate) mod create_acls;
pub(crate) mod create_delegation_token;
pub(crate) mod create_partitions;
pub(crate) mod create_topics;
pub(crate) mod delete_acls;
pub(crate) mod delete_groups;
pub(crate) mod delete_records;
pub(crate) mod delete_topics;
pub(crate) mod describe_acls;
pub(crate) mod describe_client_quotas;
pub(crate) mod describe_cluster;
pub(crate) mod describe_configs;
pub(crate) mod describe_delegation_token;
pub(crate) mod describe_groups;
pub(crate) mod describe_log_dirs;
// KIP-664 producer-state introspection (api_key 61).
pub(crate) mod describe_producers;
// KIP-595 raft-quorum introspection (api_key 55).
pub(crate) mod describe_quorum;
// KIP-664 transaction introspection (api_key 65).
pub(crate) mod describe_transactions;
// KIP-966 paginated topic listing (api_key 75).
pub(crate) mod describe_topic_partitions;
pub(crate) mod describe_user_scram_credentials;
pub(crate) mod elect_leaders;
pub(crate) mod expire_delegation_token;
pub(crate) mod fetch;
pub(crate) mod fetch_downconvert;
// KIP-630 controller-snapshot fetch (api_key 59).
pub(crate) mod fetch_snapshot;
pub(crate) mod find_coordinator;
pub(crate) mod get_replica_log_info;
// KIP-714 client telemetry. Pair of no-op handlers — `get` advertises
// "no metrics subscribed" so well-behaved clients skip `push` entirely;
// `push` is wired defensively in case a client races the subscription
// re-fetch.
pub(crate) mod get_telemetry_subscriptions;
pub(crate) mod heartbeat;
pub(crate) mod incremental_alter_configs;
pub(crate) mod init_producer_id;
pub(crate) mod join_group;
pub(crate) mod leave_group;
// KIP-1142 list-config-resources admin RPC (api_key 74). Generalises the
// v0 ListClientMetricsResources call (KIP-714) into a typed enumeration.
pub(crate) mod list_config_resources;
pub(crate) mod list_groups;
pub(crate) mod list_offsets;
// KIP-664 transaction-summary admin RPC (api_key 66).
pub(crate) mod list_partition_reassignments;
pub(crate) mod list_transactions;
pub(crate) mod metadata;
pub(crate) mod offset_commit;
pub(crate) mod offset_delete;
pub(crate) mod offset_fetch;
pub(crate) mod offset_for_leader_epoch;
pub(crate) mod produce;
// KIP-714 client-metrics push, paired with get_telemetry_subscriptions.
pub(crate) mod push_telemetry;
pub(crate) mod remove_raft_voter;
pub(crate) mod renew_delegation_token;
// KIP-932 ShareGroupDescribe (api_key 77). Intercepted inline in
// `network::dispatch` so the handler receives the per-connection principal +
// peer `SocketAddr` for the per-group Describe ACL gate.
pub(crate) mod share_group_describe;
// KIP-932 share-group membership (api_key 76).
pub(crate) mod share_group_heartbeat;
// KIP-932 admin offset RPCs (api_key 90/91/92). Intercepted inline in
// `network::dispatch` for the per-group Describe/Alter/Delete ACL gates.
pub(crate) mod alter_share_group_offsets;
pub(crate) mod delete_share_group_offsets;
pub(crate) mod describe_share_group_offsets;
// KIP-932 ShareAcknowledge (api_key 79). Intercepted inline in
// `network::dispatch` for the per-topic Read ACL gate.
pub(crate) mod share_acknowledge;
// KIP-932 ShareFetch (api_key 78). Intercepted inline in `network::dispatch`
// for the per-topic Read ACL gate.
pub(crate) mod share_fetch;
// KIP-1071 StreamsGroupDescribe (api_key 89). Plain 4-arg handler mirroring
// consumer_group_describe; it does not apply a per-group Describe ACL gate.
pub(crate) mod streams_group_describe;
// KIP-1071 streams-group membership / rebalance protocol (api_key 88).
pub(crate) mod streams_group_heartbeat;
pub(crate) mod sync_group;
// KIP-185 admin RPC to permanently drop a broker registration (api_key 64).
pub(crate) mod unregister_broker;
// KIP-584 feature finalization (api_key 57). Intercepted inline in
// `network::dispatch` so the handler receives the per-connection principal +
// peer `SocketAddr` for the Cluster:Alter ACL gate.
pub(crate) mod update_features;
pub(crate) mod update_raft_voter;

/// Build the dispatch table for plain 4-arg handlers. Inline-intercepted
/// handlers are documented below and registered in `network::dispatch`.
#[must_use]
pub(crate) fn build_table() -> HandlerTable {
    let mut t = HandlerTable::new();
    // Produce (api_key 0) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for per-topic Write ACL enforcement.
    // Fetch (api_key 1) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for per-topic Read ACL enforcement.
    // Metadata (api_key 3) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for per-topic Describe ACL enforcement.
    // CreateTopics (api_key 19) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for Cluster Create ACL enforcement.
    // DeleteTopics (api_key 20) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for per-topic Delete ACL enforcement.
    // AlterConfigs (api_key 33) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for per-resource AlterConfigs ACL enforcement.
    // IncrementalAlterConfigs (api_key 44) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for per-resource AlterConfigs ACL enforcement.
    // DeleteRecords (api_key 21) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for per-topic Delete ACL enforcement.
    // CreatePartitions (api_key 37) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for per-topic Alter ACL enforcement.
    // DescribeGroups (api_key 15) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for per-group Describe ACL enforcement.
    // ListGroups (api_key 16) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for per-group Describe ACL enforcement
    // (silent filter — denied groups are omitted, not error-coded).
    // DeleteGroups (api_key 42) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for per-group Delete ACL enforcement.
    // JoinGroup (api_key 11) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for per-group Read ACL enforcement.
    // OffsetCommit (api_key 8) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for Group Read + per-topic Read ACL
    // enforcement.
    // OffsetFetch (api_key 9) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for Group Describe + per-topic Read ACL
    // enforcement (including the fetch-all `topics: None` sentinel).
    // DescribeCluster (api_key 60) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for Cluster Describe ACL enforcement.
    // AlterUserScramCredentials (api_key 51) is intercepted inline in
    // `network::dispatch` so the handler can receive the
    // per-connection principal + peer `SocketAddr` for Cluster Alter ACL
    // enforcement.
    // InitProducerId (api_key 22) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for either `Write` on
    // `TransactionalId` (transactional path) or `IdempotentWrite` on
    // `Cluster` (idempotent-only path).
    // AddPartitionsToTxn (api_key 24) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for `Write` on `TransactionalId` and
    // per-topic `Write` on `Topic` ACL enforcement.
    // EndTxn (api_key 26) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for `Write` on `TransactionalId` ACL
    // enforcement.
    // TxnOffsetCommit (api_key 28) is intercepted inline in `network::dispatch`
    // so the handler can receive the per-connection
    // principal + peer `SocketAddr` for `Write` on `TransactionalId` +
    // `Read` on `Group` + per-topic `Read` on `Topic` ACL enforcement.
    // 2 (ListOffsets) intercepted inline — per-topic Describe ACL.
    // 10 (FindCoordinator) intercepted inline — per-key Group/TransactionalId
    // Describe ACL.
    // 12 (Heartbeat) intercepted inline — Group Read ACL.
    // 13 (LeaveGroup) intercepted inline — Group Read ACL.
    // 14 (SyncGroup) intercepted inline — Group Read ACL.
    // 15 (DescribeGroups) intercepted inline — see comment above.
    // 16 (ListGroups) intercepted inline — see comment above.
    t.register(18, api_versions::handle);
    // 21 (DeleteRecords) intercepted inline — see comment above.
    // 22 (InitProducerId) intercepted inline — see comment above.
    // 23 (OffsetForLeaderEpoch) intercepted inline — per-topic Describe ACL.
    // 24 (AddPartitionsToTxn) intercepted inline — see comment above.
    t.register(25, crate::txn::handlers::add_offset_commits_to_txn::handle);
    // 26 (EndTxn) intercepted inline — see comment above.
    t.register(27, crate::txn::handlers::write_txn_markers::handle);
    // 28 (TxnOffsetCommit) intercepted inline — see comment above.
    // 32 (DescribeConfigs) intercepted inline — per-resource DescribeConfigs ACL.
    // 33 (AlterConfigs) intercepted inline — see comment above.
    // 35 (DescribeLogDirs) intercepted inline — Cluster Describe ACL.
    // 37 (CreatePartitions) intercepted inline — see comment above.
    // 42 (DeleteGroups) intercepted inline — see comment above.
    // 44 (IncrementalAlterConfigs) intercepted inline — see comment above.
    // 56 (AlterPartition) intercepted inline — Cluster ClusterAction ACL.
    t.register(73, assign_replicas_to_dirs::handle);
    // FetchSnapshot (api_key 59, KIP-630) — controller-snapshot byte-range
    // fetch. Plain 4-arg signature: no per-connection ACL context needed.
    t.register(59, fetch_snapshot::handle);
    // 60 (DescribeCluster) intercepted inline — see comment above.
    // 63 (BrokerHeartbeat) intercepted inline — Cluster ClusterAction ACL.
    // 93 (GetReplicaLogInfo, KIP-966) intercepted inline — Cluster
    // ClusterAction ACL. Inter-broker RPC the controller's unclean recovery
    // manager uses to read each replica's LEO + leader epoch.
    // 68 (ConsumerGroupHeartbeat) intercepted inline — Group Read ACL.
    t.register(69, consumer_group_describe::handle);
    // 76 (ShareGroupHeartbeat, KIP-932) intercepted inline — Group Read ACL.
    // 88 (StreamsGroupHeartbeat, KIP-1071) intercepted inline — Group Read ACL.
    // StreamsGroupDescribe (89) stays a plain 4-arg handler and does not apply
    // a per-group Describe ACL gate.
    t.register(89, streams_group_describe::handle);
    // KIP-932 share-state persister RPCs (api keys 83–87). Inter-broker
    // handlers, gated per-partition on local share-state leadership.
    t.register(83, crate::share_coordinator::handlers::initialize::handle);
    t.register(84, crate::share_coordinator::handlers::read::handle);
    t.register(85, crate::share_coordinator::handlers::write::handle);
    t.register(86, crate::share_coordinator::handlers::delete::handle);
    t.register(87, crate::share_coordinator::handlers::read_summary::handle);
    // 71 (GetTelemetrySubscriptions) intercepted inline in `network::dispatch`
    // so the handler receives the per-connection peer SocketAddr and software
    // name/version for KIP-714 subscription matching.
    // 72 (PushTelemetry) intercepted inline in `network::dispatch` for the
    // same reason — it needs the per-connection context to authorize pushes.
    t
}