crabka-broker 0.3.6

Single-node Apache Kafka-compatible broker (MVP)
Documentation
//! KIP-584 supported-feature surface for the broker. Re-exports the
//! `crabka_metadata` feature registry and derives the `ApiVersions`
//! advertisement rows from it, so the advertised and validated feature sets
//! can never disagree. Behavioral gating helpers (`require_feature`) live here
//! because they return broker error codes.

pub(crate) use crabka_metadata::metadata_version::METADATA_VERSION_FEATURE as METADATA_VERSION;
// Re-exported for `ApiVersions` tests / range-bound assertions; consumed only
// from `#[cfg(test)]` modules, so the non-test lib target sees them as unused.
#[allow(unused_imports)]
pub(crate) use crabka_metadata::metadata_version::METADATA_VERSION_MAX;
#[allow(unused_imports)]
pub(crate) use crabka_metadata::metadata_version::METADATA_VERSION_MIN;

use crabka_metadata::MetadataImage;
/// The `share.version` feature name (KIP-932). Consumed only from the
/// `#[cfg(test)]` module that asserts share.version is advertised.
#[allow(unused_imports)]
pub(crate) use crabka_metadata::metadata_version::SHARE_VERSION_FEATURE as SHARE_VERSION;
/// The `streams.version` feature name (KIP-1071). Gates `StreamsGroupHeartbeat`
/// / `StreamsGroupDescribe`; read by those handlers via `feature_enabled`.
pub(crate) use crabka_metadata::metadata_version::STREAMS_VERSION_FEATURE as STREAMS_VERSION;

/// One row of the `ApiVersions.supported_features` advertisement.
#[derive(Debug, Clone, Copy)]
pub(crate) struct SupportedFeature {
    pub name: &'static str,
    pub min_version: i16,
    pub max_version: i16,
}

/// The features this broker supports finalizing, derived from the
/// `crabka_metadata` registry (single source of truth).
pub(crate) fn supported_features() -> Vec<SupportedFeature> {
    crabka_metadata::feature_registry()
        .iter()
        .map(|f| {
            let (min_version, max_version) = f.supported_range();
            SupportedFeature {
                name: f.name(),
                min_version,
                max_version,
            }
        })
        .collect()
}

/// Look up a supported feature by name. Pairs with `supported_features` as the
/// module's feature-surface API; the `UpdateFeatures` handler now resolves the
/// registry feature directly, so the non-test lib target sees this as unused.
#[allow(dead_code)]
pub(crate) fn lookup(name: &str) -> Option<SupportedFeature> {
    crabka_metadata::feature(name).map(|f| {
        let (min_version, max_version) = f.supported_range();
        SupportedFeature {
            name: f.name(),
            min_version,
            max_version,
        }
    })
}

/// KIP-584 admission gate. `Err(UNSUPPORTED_VERSION)` when `name` is finalized
/// below `required_level`. Permissive when the feature is unfinalized (no level
/// to gate against) — matching the range guard's treatment of a missing level.
pub(crate) fn require_feature(
    image: &MetadataImage,
    name: &str,
    required_level: i16,
) -> Result<(), i16> {
    let finalized = image.finalized_features().get(name).copied();
    if finalized.is_some_and(|level| level < required_level) {
        Err(crate::codes::UNSUPPORTED_VERSION)
    } else {
        Ok(())
    }
}

/// True when `name` is finalized at >= `level`, treating an UNFINALIZED feature
/// as level 0 (disabled). Use for features where absence means "off" (e.g.
/// `group.version` → next-gen disabled), unlike `require_feature` which is
/// permissive on absence (used for metadata.version-gated RPCs on legacy images).
pub(crate) fn feature_enabled(
    image: &crabka_metadata::MetadataImage,
    name: &str,
    level: i16,
) -> bool {
    image.finalized_features().get(name).copied().unwrap_or(0) >= level
}

#[cfg(test)]
mod tests {
    use super::*;
    use assert2::assert;

    #[test]
    fn feature_enabled_treats_absence_as_disabled() {
        use crabka_metadata::{FeatureLevelRecord, MetadataRecord};
        let mut image = MetadataImage::new(uuid::Uuid::nil());
        assert!(!feature_enabled(&image, "group.version", 1)); // absent → disabled
        image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
            name: "group.version".into(),
            level: 1,
        }));
        assert!(feature_enabled(&image, "group.version", 1)); // present at 1 → enabled
    }

    #[test]
    fn supported_features_include_metadata_version() {
        let f = lookup(METADATA_VERSION).expect("metadata.version supported");
        assert!(f.min_version == METADATA_VERSION_MIN);
        assert!(f.max_version == METADATA_VERSION_MAX);
        assert!(lookup("not.a.feature").is_none());
    }

    #[test]
    fn share_version_is_supported() {
        let f = lookup(SHARE_VERSION).expect("share.version supported");
        assert!(f.min_version == 0);
        assert!(f.max_version == 1);
        // Advertised via the registry-derived supported-feature table.
        assert!(
            supported_features()
                .iter()
                .any(|f| f.name == SHARE_VERSION && f.min_version == 0 && f.max_version == 1)
        );
    }

    #[test]
    fn streams_version_is_supported() {
        let f = lookup(STREAMS_VERSION).expect("streams.version supported");
        assert!(f.min_version == 0);
        assert!(f.max_version == 1);
        // Advertised via the registry-derived supported-feature table.
        assert!(
            supported_features()
                .iter()
                .any(|f| f.name == STREAMS_VERSION && f.min_version == 0 && f.max_version == 1)
        );
    }

    #[test]
    fn require_feature_is_permissive_on_unfinalized() {
        let image = MetadataImage::new(uuid::Uuid::nil());
        assert!(require_feature(&image, METADATA_VERSION, 11).is_ok());
    }

    #[test]
    fn require_feature_gates_below_level() {
        use crabka_metadata::{FeatureLevelRecord, MetadataRecord};
        let mut image = MetadataImage::new(uuid::Uuid::nil());
        image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
            name: METADATA_VERSION.to_string(),
            level: 10,
        }));
        assert!(
            require_feature(&image, METADATA_VERSION, 11) == Err(crate::codes::UNSUPPORTED_VERSION)
        );
        assert!(require_feature(&image, METADATA_VERSION, 10).is_ok());
        assert!(require_feature(&image, METADATA_VERSION, 7).is_ok());
    }
}