#![allow(clippy::unwrap_used)]
use osproxy_core::{Epoch, PartitionId};
use osproxy_spi::Placement;
use osproxy_tenancy::{MigrationError, PartitionState, PlacementTable, WriteAdmission};
fn cluster(name: &str) -> Placement {
Placement::DedicatedCluster {
cluster: osproxy_core::ClusterId::from(name),
}
}
fn registered() -> (PlacementTable, PartitionId, Epoch, Placement) {
let table = PlacementTable::new();
let p = PartitionId::from("acme");
let e_active = table.set(p.clone(), cluster("a"));
(table, p, e_active, cluster("b"))
}
#[test]
fn inv_m1_no_write_commits_during_cutover() {
let (table, p, e_active, b) = registered();
let e_drain = table.begin_migration(&p, b).unwrap();
let e_cutover = table.enter_cutover(&p).unwrap();
assert_eq!(table.admit_write(&p, e_cutover), WriteAdmission::Reject);
assert_eq!(table.admit_write(&p, e_drain), WriteAdmission::Reject);
assert_eq!(table.admit_write(&p, e_active), WriteAdmission::Reject);
assert_eq!(table.get(&p).unwrap().placement, cluster("a"));
}
#[test]
fn inv_m2_after_cutover_a_stale_write_never_admits() {
let (table, p, e_active, b) = registered();
let e_drain = table.begin_migration(&p, b).unwrap();
table.enter_cutover(&p).unwrap();
let e_flipped = table.complete_migration(&p).unwrap();
assert_eq!(table.admit_write(&p, e_active), WriteAdmission::Reject);
assert_eq!(table.admit_write(&p, e_drain), WriteAdmission::Reject);
assert_eq!(table.admit_write(&p, e_flipped), WriteAdmission::Admit);
assert_eq!(table.get(&p).unwrap().placement, cluster("b"));
assert!(matches!(
table.state(&p).unwrap().0,
PartitionState::Active(_)
));
}
#[test]
fn inv_m3_abort_returns_to_origin_and_admits_the_origin_epoch_again() {
for enter_cutover in [false, true] {
let (table, p, e_active, b) = registered();
let e_drain = table.begin_migration(&p, b).unwrap();
if enter_cutover {
table.enter_cutover(&p).unwrap();
}
let e_aborted = table.abort_migration(&p).unwrap();
assert_eq!(table.admit_write(&p, e_aborted), WriteAdmission::Admit);
assert_eq!(table.admit_write(&p, e_active), WriteAdmission::Reject);
assert_eq!(table.admit_write(&p, e_drain), WriteAdmission::Reject);
assert_eq!(table.get(&p).unwrap().placement, cluster("a"));
}
}
#[test]
fn inv_m4_reads_are_always_a_single_placement() {
let (table, p, _e, b) = registered();
let a = cluster("a");
assert_eq!(table.get(&p).unwrap().placement, a);
table.begin_migration(&p, b.clone()).unwrap();
assert_eq!(table.get(&p).unwrap().placement, a, "draining reads from A");
table.enter_cutover(&p).unwrap();
assert_eq!(table.get(&p).unwrap().placement, a, "cutover still reads A");
table.complete_migration(&p).unwrap();
assert_eq!(table.get(&p).unwrap().placement, b, "flip moves reads to B");
}
#[test]
fn draining_keeps_writes_flowing_at_the_draining_epoch() {
let (table, p, e_active, b) = registered();
let e_drain = table.begin_migration(&p, b).unwrap();
assert_eq!(table.admit_write(&p, e_drain), WriteAdmission::Admit);
assert_eq!(table.admit_write(&p, e_active), WriteAdmission::Reject);
}
#[test]
fn epoch_advances_monotonically_across_every_transition() {
let (table, p, e0, b) = registered();
let e1 = table.begin_migration(&p, b).unwrap();
let e2 = table.enter_cutover(&p).unwrap();
let e3 = table.complete_migration(&p).unwrap();
assert!(e0 < e1 && e1 < e2 && e2 < e3, "{e0:?}<{e1:?}<{e2:?}<{e3:?}");
}
#[test]
fn an_unrelated_partitions_migration_does_not_stale_this_one() {
let (table, p, e_p, _b) = registered();
let q = PartitionId::from("other");
table.set(q.clone(), cluster("x"));
table.begin_migration(&q, cluster("y")).unwrap();
table.enter_cutover(&q).unwrap();
assert_eq!(table.admit_write(&p, e_p), WriteAdmission::Admit);
}
#[test]
fn transitions_are_rejected_out_of_phase_and_leave_the_table_unchanged() {
let (table, p, _e, b) = registered();
assert_eq!(table.enter_cutover(&p), Err(MigrationError::NotMigrating));
assert_eq!(
table.complete_migration(&p),
Err(MigrationError::NotMigrating)
);
assert_eq!(table.abort_migration(&p), Err(MigrationError::NotMigrating));
table.begin_migration(&p, b.clone()).unwrap();
assert_eq!(
table.begin_migration(&p, b),
Err(MigrationError::AlreadyMigrating)
);
assert_eq!(
table.complete_migration(&p),
Err(MigrationError::NotCutover)
);
assert!(table.state(&p).unwrap().0.is_migrating());
let unknown = PartitionId::from("nobody");
assert_eq!(
table.begin_migration(&unknown, cluster("z")),
Err(MigrationError::UnknownPartition)
);
}
#[test]
fn at_most_one_epoch_is_admissible_per_phase_no_split_brain() {
let (table, p, e_active, b) = registered();
let e_drain = table.begin_migration(&p, b).unwrap();
let e_cutover = table.enter_cutover(&p).unwrap();
let e_flipped = table.complete_migration(&p).unwrap();
let epochs = [e_active, e_drain, e_cutover, e_flipped];
let admissible = epochs
.iter()
.filter(|&&e| table.admit_write(&p, e) == WriteAdmission::Admit)
.count();
assert_eq!(admissible, 1, "exactly the current epoch admits");
assert_eq!(table.admit_write(&p, e_flipped), WriteAdmission::Admit);
}