use super::error::VersionError;
use super::event::VersionedEvent;
use super::gate::VersionGate;
use super::schema::{expected_schema_version, SchemaVersion};
pub struct VersionPolicy;
impl VersionPolicy {
pub fn can_apply<E: VersionedEvent>(event: &E, gate: &VersionGate) -> Result<(), VersionError> {
gate.local_wire().check_can_replay(event.wire_version())?;
if let Some((table, required)) = event.schema_version() {
let local_max = expected_schema_version(table).unwrap_or(SchemaVersion::new(0));
SchemaVersion::check_can_apply(local_max, required, table)?;
}
Ok(())
}
pub fn can_emit(feature_id: &'static str, gate: &VersionGate) -> Result<(), VersionError> {
gate.can_use_feature(feature_id)
}
pub fn can_emit_at_version<E: VersionedEvent>(
event: &E,
gate: &VersionGate,
) -> Result<(), VersionError> {
let cluster_min = gate.cluster_min();
if event.wire_version() > cluster_min {
return Err(VersionError::ClusterTooOld {
event: event.wire_version(),
cluster_min,
});
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::version::wire::WireVersion;
struct TestEvent {
wire: WireVersion,
schema: Option<(&'static str, SchemaVersion)>,
}
impl VersionedEvent for TestEvent {
fn wire_version(&self) -> WireVersion {
self.wire
}
fn schema_version(&self) -> Option<(&'static str, SchemaVersion)> {
self.schema
}
}
#[test]
fn can_apply_accepts_compatible_event() {
let gate = VersionGate::new(WireVersion::new(1, 5));
let event = TestEvent {
wire: WireVersion::new(1, 0),
schema: Some(("memory_commit_log", SchemaVersion::new(1))),
};
assert!(VersionPolicy::can_apply(&event, &gate).is_ok());
}
#[test]
fn can_apply_rejects_wire_major_mismatch() {
let gate = VersionGate::new(WireVersion::new(1, 5));
let event = TestEvent {
wire: WireVersion::new(2, 0),
schema: None,
};
let err = VersionPolicy::can_apply(&event, &gate).unwrap_err();
assert!(matches!(err, VersionError::WireMajorMismatch { .. }));
}
#[test]
fn can_apply_rejects_schema_too_new() {
let gate = VersionGate::new(WireVersion::new(1, 0));
let event = TestEvent {
wire: WireVersion::new(1, 0),
schema: Some(("memory_commit_log", SchemaVersion::new(999))),
};
let err = VersionPolicy::can_apply(&event, &gate).unwrap_err();
match err {
VersionError::SchemaTooNew {
table,
node_max,
event_required,
} => {
assert_eq!(table, "memory_commit_log");
assert_eq!(node_max, SchemaVersion::new(1));
assert_eq!(event_required, SchemaVersion::new(999));
}
other => panic!("wrong error: {other:?}"),
}
}
#[test]
fn can_apply_event_without_schema_skips_schema_check() {
let gate = VersionGate::new(WireVersion::new(1, 0));
let event = TestEvent {
wire: WireVersion::new(1, 0),
schema: None,
};
assert!(VersionPolicy::can_apply(&event, &gate).is_ok());
}
#[test]
fn can_emit_at_version_rejects_when_cluster_too_old() {
let gate = VersionGate::new(WireVersion::new(1, 5));
gate.observe_peer(2, WireVersion::new(1, 0));
let event = TestEvent {
wire: WireVersion::new(1, 5),
schema: None,
};
let err = VersionPolicy::can_emit_at_version(&event, &gate).unwrap_err();
match err {
VersionError::ClusterTooOld {
event: e,
cluster_min,
} => {
assert_eq!(e, WireVersion::new(1, 5));
assert_eq!(cluster_min, WireVersion::new(1, 0));
}
other => panic!("wrong error: {other:?}"),
}
}
#[test]
fn can_emit_at_version_accepts_when_cluster_caught_up() {
let gate = VersionGate::new(WireVersion::new(1, 5));
gate.observe_peer(2, WireVersion::new(1, 5));
let event = TestEvent {
wire: WireVersion::new(1, 5),
schema: None,
};
assert!(VersionPolicy::can_emit_at_version(&event, &gate).is_ok());
}
#[test]
fn can_emit_feature_check_uses_gate_floor() {
let gate = VersionGate::new(WireVersion::new(1, 5));
assert!(VersionPolicy::can_emit("baseline_v1_0", &gate).is_ok());
}
}