use std::collections::BTreeMap;
use crate::MetadataImage;
pub trait Feature: Sync {
fn name(&self) -> &'static str;
fn supported_range(&self) -> (i16, i16);
fn default_level(&self, bootstrap_mv: i16) -> i16;
fn min_required_floor(&self, _image: &MetadataImage) -> i16 {
self.supported_range().0
}
fn dependencies(&self, _level: i16) -> &'static [(&'static str, i16)] {
&[]
}
fn level_name(&self, _level: i16) -> Option<&'static str> {
None
}
}
pub struct MetadataVersionFeature;
impl Feature for MetadataVersionFeature {
fn name(&self) -> &'static str {
crate::metadata_version::METADATA_VERSION_FEATURE
}
fn supported_range(&self) -> (i16, i16) {
(
crate::metadata_version::METADATA_VERSION_MIN,
crate::metadata_version::METADATA_VERSION_MAX,
)
}
fn default_level(&self, bootstrap_mv: i16) -> i16 {
bootstrap_mv.clamp(
crate::metadata_version::METADATA_VERSION_MIN,
crate::metadata_version::METADATA_VERSION_MAX,
)
}
fn min_required_floor(&self, image: &MetadataImage) -> i16 {
image.min_required_metadata_version()
}
fn level_name(&self, level: i16) -> Option<&'static str> {
crate::metadata_version::from_feature_level(level)
.map(crate::metadata_version::MetadataVersion::ivn)
}
}
pub struct GroupVersionFeature;
impl Feature for GroupVersionFeature {
fn name(&self) -> &'static str {
crate::group_version::GROUP_VERSION_FEATURE
}
fn supported_range(&self) -> (i16, i16) {
(
crate::group_version::GROUP_VERSION_MIN,
crate::group_version::GROUP_VERSION_MAX,
)
}
fn default_level(&self, bootstrap_mv: i16) -> i16 {
if bootstrap_mv >= crate::group_version::GROUP_VERSION_GA_METADATA_LEVEL {
crate::group_version::GROUP_VERSION_MAX
} else {
crate::group_version::GROUP_VERSION_MIN
}
}
}
pub struct TransactionVersionFeature;
impl Feature for TransactionVersionFeature {
fn name(&self) -> &'static str {
crate::transaction_version::TRANSACTION_VERSION_FEATURE
}
fn supported_range(&self) -> (i16, i16) {
(
crate::transaction_version::TRANSACTION_VERSION_MIN,
crate::transaction_version::TRANSACTION_VERSION_MAX,
)
}
#[allow(clippy::bool_to_int_with_if)]
fn default_level(&self, bootstrap_mv: i16) -> i16 {
use crate::transaction_version::{TV1_METADATA_LEVEL, TV2_METADATA_LEVEL};
if bootstrap_mv >= TV2_METADATA_LEVEL {
2
} else if bootstrap_mv >= TV1_METADATA_LEVEL {
1
} else {
0
}
}
}
pub struct ShareVersionFeature;
impl Feature for ShareVersionFeature {
fn name(&self) -> &'static str {
crate::metadata_version::SHARE_VERSION_FEATURE
}
fn supported_range(&self) -> (i16, i16) {
(
crate::metadata_version::SHARE_VERSION_MIN,
crate::metadata_version::SHARE_VERSION_MAX,
)
}
fn default_level(&self, _bootstrap_mv: i16) -> i16 {
crate::metadata_version::SHARE_VERSION_MIN
}
}
pub struct StreamsVersionFeature;
impl Feature for StreamsVersionFeature {
fn name(&self) -> &'static str {
crate::metadata_version::STREAMS_VERSION_FEATURE
}
fn supported_range(&self) -> (i16, i16) {
(
crate::metadata_version::STREAMS_VERSION_MIN,
crate::metadata_version::STREAMS_VERSION_MAX,
)
}
fn default_level(&self, _bootstrap_mv: i16) -> i16 {
crate::metadata_version::STREAMS_VERSION_MIN
}
}
#[must_use]
pub fn feature_registry() -> &'static [&'static dyn Feature] {
const REGISTRY: &[&dyn Feature] = &[
&MetadataVersionFeature,
&GroupVersionFeature,
&TransactionVersionFeature,
&ShareVersionFeature,
&StreamsVersionFeature,
];
REGISTRY
}
#[must_use]
pub fn feature(name: &str) -> Option<&'static dyn Feature> {
feature_registry()
.iter()
.copied()
.find(|f| f.name() == name)
}
#[must_use]
pub fn bootstrap_feature_records(bootstrap_mv: i16) -> Vec<crate::MetadataRecord> {
bootstrap_feature_records_with_overrides(bootstrap_mv, &BTreeMap::new())
}
#[must_use]
pub fn bootstrap_feature_records_with_overrides(
bootstrap_mv: i16,
overrides: &BTreeMap<String, i16>,
) -> Vec<crate::MetadataRecord> {
feature_registry()
.iter()
.filter_map(|f| {
let level = overrides
.get(f.name())
.copied()
.unwrap_or_else(|| f.default_level(bootstrap_mv));
(level > 0).then(|| {
crate::MetadataRecord::V1FeatureLevel(crate::FeatureLevelRecord {
name: f.name().to_string(),
level,
})
})
})
.collect()
}
pub fn validate_feature_dependencies(resolved: &BTreeMap<String, i16>) -> Result<(), String> {
check_deps(resolved, |name, level| {
feature(name).map_or(&[][..], |f| f.dependencies(level))
})
}
fn check_deps(
resolved: &BTreeMap<String, i16>,
deps_of: impl Fn(&str, i16) -> &'static [(&'static str, i16)],
) -> Result<(), String> {
for (name, &level) in resolved {
for &(dep, min) in deps_of(name, level) {
let have = resolved.get(dep).copied().unwrap_or(0);
if have < min {
return Err(format!(
"feature {name}={level} requires {dep}>={min}, but {dep} is finalized at {have}"
));
}
}
}
Ok(())
}
#[must_use]
pub fn is_supported_level(name: &str, level: i16) -> bool {
feature(name).is_some_and(|f| {
let (min, max) = f.supported_range();
(min..=max).contains(&level)
})
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use std::collections::BTreeMap;
#[test]
fn registry_contains_metadata_version() {
let f = feature("metadata.version").expect("registered");
assert!(f.supported_range() == (7, 25));
assert!(feature("not.a.feature").is_none());
}
#[test]
fn metadata_version_default_is_the_bootstrap_level_clamped() {
let f = feature("metadata.version").unwrap();
assert!(f.default_level(25) == 25);
assert!(f.default_level(7) == 7);
assert!(f.default_level(99) == 25); assert!(f.default_level(1) == 7); }
#[test]
fn is_supported_level_checks_range() {
assert!(is_supported_level("metadata.version", 7));
assert!(is_supported_level("metadata.version", 25));
assert!(!is_supported_level("metadata.version", 6));
assert!(!is_supported_level("metadata.version", 26));
assert!(!is_supported_level("not.a.feature", 1));
}
#[test]
fn metadata_version_level_name() {
let f = feature("metadata.version").unwrap();
assert!(f.level_name(25) == Some("4.0-IV3"));
assert!(f.level_name(7) == Some("3.3-IV3"));
assert!(f.level_name(99).is_none());
}
#[test]
fn group_version_registered_with_range() {
let f = feature("group.version").expect("registered");
assert!(f.supported_range() == (0, 1));
}
#[test]
fn group_version_default_follows_release() {
let f = feature("group.version").unwrap();
assert!(f.default_level(crate::group_version::GROUP_VERSION_GA_METADATA_LEVEL - 1) == 0);
assert!(f.default_level(crate::group_version::GROUP_VERSION_GA_METADATA_LEVEL) == 1);
assert!(f.default_level(25) == 1);
}
#[test]
fn group_version_declares_no_hard_dependencies() {
let f = feature("group.version").unwrap();
assert!(f.dependencies(0).is_empty());
assert!(f.dependencies(1).is_empty());
}
#[test]
fn transaction_version_registered() {
let f = feature("transaction.version").expect("registered");
assert!(f.supported_range() == (0, 2));
}
#[test]
fn streams_version_registered_opt_in() {
let f = feature("streams.version").expect("registered");
assert!(f.supported_range() == (0, 1));
assert!(f.default_level(25) == 0);
assert!(f.dependencies(1).is_empty());
}
#[test]
fn transaction_version_default_jumps_to_two_at_4_0_iv2() {
let f = feature("transaction.version").unwrap();
assert!(f.default_level(23) == 0); assert!(f.default_level(24) == 2); assert!(f.default_level(25) == 2);
}
#[test]
fn transaction_version_declares_no_hard_dependencies() {
let f = feature("transaction.version").unwrap();
assert!(f.dependencies(0).is_empty());
assert!(f.dependencies(1).is_empty());
assert!(f.dependencies(2).is_empty());
}
#[test]
fn transaction_version_ga_threshold_is_4_0_iv2() {
assert!(
crate::metadata_version::from_feature_level(
crate::transaction_version::TV2_METADATA_LEVEL
)
.unwrap()
.ivn()
== "4.0-IV2"
);
}
fn levels_of(recs: &[crate::MetadataRecord]) -> BTreeMap<String, i16> {
recs.iter()
.filter_map(|r| match r {
crate::MetadataRecord::V1FeatureLevel(f) => Some((f.name.clone(), f.level)),
_ => None,
})
.collect()
}
#[test]
fn bootstrap_omits_level_zero_features() {
let levels = levels_of(&bootstrap_feature_records(25));
assert!(levels.get("metadata.version") == Some(&25));
assert!(levels.get("group.version") == Some(&1));
assert!(levels.get("transaction.version") == Some(&2));
assert!(!levels.contains_key("share.version"));
assert!(!levels.contains_key("streams.version"));
}
#[test]
fn bootstrap_with_overrides_applies_explicit_levels() {
let mut ov = BTreeMap::new();
ov.insert("group.version".to_string(), 0i16);
let levels = levels_of(&bootstrap_feature_records_with_overrides(25, &ov));
assert!(levels.get("metadata.version") == Some(&25));
assert!(levels.get("transaction.version") == Some(&2));
assert!(!levels.contains_key("group.version"));
}
#[test]
fn bootstrap_override_can_enable_an_opt_in_feature() {
let mut ov = BTreeMap::new();
ov.insert("streams.version".to_string(), 1i16);
let levels = levels_of(&bootstrap_feature_records_with_overrides(25, &ov));
assert!(levels.get("streams.version") == Some(&1));
}
#[test]
fn bootstrap_unlisted_feature_follows_bootstrap_mv() {
let levels = levels_of(&bootstrap_feature_records_with_overrides(
23,
&BTreeMap::new(),
));
assert!(levels.get("metadata.version") == Some(&23));
assert!(levels.get("group.version") == Some(&1));
assert!(!levels.contains_key("transaction.version"));
}
#[test]
fn validate_dependencies_ok_for_real_registry() {
let mut resolved = BTreeMap::new();
resolved.insert("metadata.version".to_string(), 25i16);
resolved.insert("group.version".to_string(), 1i16);
resolved.insert("transaction.version".to_string(), 2i16);
assert!(validate_feature_dependencies(&resolved).is_ok());
}
#[test]
fn check_deps_enforces_minimum_dependency_levels() {
fn deps_of(name: &str, level: i16) -> &'static [(&'static str, i16)] {
match (name, level) {
("b", 1) => &[("a", 2)],
_ => &[],
}
}
let mut ok = BTreeMap::new();
ok.insert("a".to_string(), 2i16);
ok.insert("b".to_string(), 1i16);
assert!(check_deps(&ok, deps_of).is_ok());
let mut too_low = BTreeMap::new();
too_low.insert("a".to_string(), 1i16);
too_low.insert("b".to_string(), 1i16);
assert!(check_deps(&too_low, deps_of).is_err());
let mut missing = BTreeMap::new();
missing.insert("b".to_string(), 1i16);
assert!(check_deps(&missing, deps_of).is_err());
}
#[test]
fn group_version_ga_threshold_is_4_0_iv0() {
assert!(
crate::metadata_version::from_feature_level(
crate::group_version::GROUP_VERSION_GA_METADATA_LEVEL
)
.unwrap()
.ivn()
== "4.0-IV0"
);
}
}