use thiserror::Error;
use super::schema::SchemaVersion;
use super::wire::WireVersion;
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum VersionError {
#[error(
"wire major version mismatch: this node speaks {node}, event was written at {event}. \
Cluster cannot operate in mixed-major mode. Upgrade the older node or contact ops."
)]
WireMajorMismatch {
node: WireVersion,
event: WireVersion,
},
#[error(
"schema version too new for table `{table}`: this node has v{node_max}, \
event requires v{event_required}. Run pending migrations or wait for them."
)]
SchemaTooNew {
table: &'static str,
node_max: SchemaVersion,
event_required: SchemaVersion,
},
#[error(
"feature `{feature}` requires cluster min wire version {requires}, \
but cluster currently negotiates min {cluster_min}. \
Upgrade lagging nodes before using this feature."
)]
FeatureGated {
feature: &'static str,
requires: WireVersion,
cluster_min: WireVersion,
},
#[error(
"cluster has nodes too old: write would emit event at {event}, \
but cluster minimum is {cluster_min}. Write rejected for safety."
)]
ClusterTooOld {
event: WireVersion,
cluster_min: WireVersion,
},
}
impl VersionError {
pub fn metric_label(&self) -> &'static str {
match self {
VersionError::WireMajorMismatch { .. } => "wire_major_mismatch",
VersionError::SchemaTooNew { .. } => "schema_too_new",
VersionError::FeatureGated { .. } => "feature_gated",
VersionError::ClusterTooOld { .. } => "cluster_too_old",
}
}
pub fn http_status(&self) -> u16 {
409
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn metric_labels_are_stable() {
assert_eq!(
VersionError::WireMajorMismatch {
node: WireVersion::new(1, 0),
event: WireVersion::new(2, 0),
}
.metric_label(),
"wire_major_mismatch"
);
assert_eq!(
VersionError::SchemaTooNew {
table: "x",
node_max: SchemaVersion::new(1),
event_required: SchemaVersion::new(2),
}
.metric_label(),
"schema_too_new"
);
assert_eq!(
VersionError::FeatureGated {
feature: "f",
requires: WireVersion::new(1, 1),
cluster_min: WireVersion::new(1, 0),
}
.metric_label(),
"feature_gated"
);
assert_eq!(
VersionError::ClusterTooOld {
event: WireVersion::new(1, 5),
cluster_min: WireVersion::new(1, 0),
}
.metric_label(),
"cluster_too_old"
);
}
#[test]
fn display_messages_include_offending_versions() {
let e = VersionError::WireMajorMismatch {
node: WireVersion::new(1, 5),
event: WireVersion::new(2, 0),
};
let s = format!("{e}");
assert!(s.contains("1.5"));
assert!(s.contains("2.0"));
assert!(s.contains("mixed-major"));
}
#[test]
fn all_version_errors_map_to_409() {
let cases = [
VersionError::WireMajorMismatch {
node: WireVersion::new(1, 0),
event: WireVersion::new(2, 0),
},
VersionError::SchemaTooNew {
table: "t",
node_max: SchemaVersion::new(1),
event_required: SchemaVersion::new(2),
},
VersionError::FeatureGated {
feature: "f",
requires: WireVersion::new(1, 5),
cluster_min: WireVersion::new(1, 0),
},
VersionError::ClusterTooOld {
event: WireVersion::new(1, 5),
cluster_min: WireVersion::new(1, 0),
},
];
for case in &cases {
assert_eq!(case.http_status(), 409, "{case:?}");
}
}
}