use crate::version::WIRE_FORMAT_VERSION;
#[derive(Debug, Clone)]
pub struct ClusterVersionState {
pub min_version: u16,
pub max_version: u16,
pub node_count: usize,
pub node_versions: Vec<(u64, u16)>,
}
impl ClusterVersionState {
pub fn new() -> Self {
Self {
min_version: WIRE_FORMAT_VERSION,
max_version: WIRE_FORMAT_VERSION,
node_count: 0,
node_versions: Vec::new(),
}
}
pub fn report_version(&mut self, node_id: u64, wire_version: u16) {
if let Some(entry) = self.node_versions.iter_mut().find(|(id, _)| *id == node_id) {
entry.1 = wire_version;
} else {
self.node_versions.push((node_id, wire_version));
}
self.recalculate();
}
pub fn remove_node(&mut self, node_id: u64) {
self.node_versions.retain(|(id, _)| *id != node_id);
self.recalculate();
}
fn recalculate(&mut self) {
self.node_count = self.node_versions.len();
if self.node_versions.is_empty() {
self.min_version = WIRE_FORMAT_VERSION;
self.max_version = WIRE_FORMAT_VERSION;
} else {
self.min_version = self
.node_versions
.iter()
.map(|(_, v)| *v)
.min()
.unwrap_or(WIRE_FORMAT_VERSION);
self.max_version = self
.node_versions
.iter()
.map(|(_, v)| *v)
.max()
.unwrap_or(WIRE_FORMAT_VERSION);
}
}
pub fn is_mixed_version(&self) -> bool {
self.min_version != self.max_version
}
pub fn all_upgraded(&self) -> bool {
self.min_version == WIRE_FORMAT_VERSION
}
pub fn can_activate_feature(&self, required_version: u16) -> bool {
self.min_version >= required_version
}
pub fn version_spread(&self) -> u16 {
self.max_version.saturating_sub(self.min_version)
}
pub fn is_supported_spread(&self) -> bool {
self.version_spread() <= 1
}
}
impl Default for ClusterVersionState {
fn default() -> Self {
Self::new()
}
}
pub fn accept_message(remote_version: u16) -> crate::Result<()> {
crate::version::check_wire_compatibility(remote_version)?;
if WIRE_FORMAT_VERSION.saturating_sub(remote_version) > 1 {
return Err(crate::Error::VersionCompat {
detail: format!(
"message version {remote_version} is more than one generation behind {WIRE_FORMAT_VERSION}: N-2 not supported"
),
});
}
Ok(())
}
pub fn should_compat_mode(cluster_state: &ClusterVersionState) -> bool {
cluster_state.is_mixed_version()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn single_version_cluster() {
let mut state = ClusterVersionState::new();
state.report_version(1, WIRE_FORMAT_VERSION);
state.report_version(2, WIRE_FORMAT_VERSION);
state.report_version(3, WIRE_FORMAT_VERSION);
assert!(!state.is_mixed_version());
assert!(state.all_upgraded());
assert!(state.is_supported_spread());
assert!(!should_compat_mode(&state));
}
#[test]
fn mixed_version_n_minus_1() {
let mut state = ClusterVersionState::new();
state.report_version(1, WIRE_FORMAT_VERSION);
state.report_version(2, WIRE_FORMAT_VERSION);
state.report_version(3, WIRE_FORMAT_VERSION.saturating_sub(1));
assert!(state.is_mixed_version());
assert!(!state.all_upgraded());
assert!(state.is_supported_spread()); assert!(should_compat_mode(&state));
}
#[test]
fn accept_same_version() {
assert!(accept_message(WIRE_FORMAT_VERSION).is_ok());
}
#[test]
fn accept_n_minus_1() {
if WIRE_FORMAT_VERSION > 1 {
assert!(accept_message(WIRE_FORMAT_VERSION - 1).is_ok());
}
}
#[test]
fn reject_newer() {
assert!(accept_message(WIRE_FORMAT_VERSION + 1).is_err());
}
#[test]
fn feature_activation() {
let mut state = ClusterVersionState::new();
state.report_version(1, 2);
state.report_version(2, 2);
state.report_version(3, 1);
assert!(!state.can_activate_feature(2));
state.report_version(3, 2); assert!(state.can_activate_feature(2)); }
#[test]
fn node_removal() {
let mut state = ClusterVersionState::new();
state.report_version(1, 2);
state.report_version(2, 1);
assert!(state.is_mixed_version());
state.remove_node(2);
assert!(!state.is_mixed_version());
}
}