#![allow(clippy::unwrap_used)]
use std::sync::Arc;
use std::time::Duration;
use osproxy_control::{ControlError, ControlPlane, MigrationStore};
use osproxy_core::{ClusterId, Epoch, ManualClock, PartitionId};
use osproxy_spi::Placement;
use osproxy_tenancy::{MigrationError, PartitionState, Phase, PlacementTable, WriteAdmission};
const BARRIER: Duration = Duration::from_secs(30);
fn cluster(name: &str) -> Placement {
Placement::DedicatedCluster {
cluster: ClusterId::from(name),
}
}
fn backend() -> (Arc<PlacementTable>, PartitionId) {
let table = Arc::new(PlacementTable::new());
let p = PartitionId::from("acme");
table.set(p.clone(), cluster("a"));
(table, p)
}
#[tokio::test]
async fn complete_is_held_until_the_drain_barrier_elapses() {
let (table, p) = backend();
let clock = Arc::new(ManualClock::new());
let cp = ControlPlane::new(Arc::clone(&table))
.with_clock(clock.clone())
.with_barrier(BARRIER);
cp.begin_migration(&p, cluster("b")).await.unwrap();
cp.enter_cutover(&p).await.unwrap();
assert!(matches!(
cp.complete_migration(&p).await,
Err(ControlError::BarrierPending { .. })
));
assert!(matches!(
table.state(&p).await.unwrap().0,
PartitionState::Migrating { .. }
));
clock.advance(BARRIER.saturating_sub(Duration::from_secs(1)));
assert!(matches!(
cp.complete_migration(&p).await,
Err(ControlError::BarrierPending { .. })
));
clock.advance(Duration::from_secs(1));
cp.complete_migration(&p).await.unwrap();
assert_eq!(table.get(&p).unwrap().placement, cluster("b"));
}
#[tokio::test]
async fn every_instance_polling_fresh_sees_one_consistent_state() {
let (backend, p) = backend();
let instance_a = Arc::clone(&backend);
let instance_b = Arc::clone(&backend);
let clock = Arc::new(ManualClock::new());
let cp = ControlPlane::new(Arc::clone(&backend))
.with_clock(clock.clone())
.with_barrier(BARRIER);
let epoch_active = backend.state(&p).await.unwrap().1;
assert_eq!(
instance_a.admit_write(&p, epoch_active),
WriteAdmission::Admit
);
assert_eq!(
instance_b.admit_write(&p, epoch_active),
WriteAdmission::Admit
);
cp.begin_migration(&p, cluster("b")).await.unwrap();
let epoch_cutover = cp.enter_cutover(&p).await.unwrap();
assert_eq!(
instance_a.admit_write(&p, epoch_cutover),
WriteAdmission::Reject
);
assert_eq!(
instance_b.admit_write(&p, epoch_cutover),
WriteAdmission::Reject
);
clock.advance(BARRIER);
let epoch_b = cp.complete_migration(&p).await.unwrap();
for instance in [&instance_a, &instance_b] {
assert_eq!(instance.get(&p).unwrap().placement, cluster("b"));
assert_eq!(instance.admit_write(&p, epoch_b), WriteAdmission::Admit);
assert_eq!(
instance.admit_write(&p, epoch_cutover),
WriteAdmission::Reject
);
}
}
#[tokio::test]
async fn abort_clears_the_barrier_and_returns_to_origin() {
let (table, p) = backend();
let clock = Arc::new(ManualClock::new());
let cp = ControlPlane::new(Arc::clone(&table))
.with_clock(clock.clone())
.with_barrier(BARRIER);
cp.begin_migration(&p, cluster("b")).await.unwrap();
cp.enter_cutover(&p).await.unwrap();
cp.abort_migration(&p).await.unwrap();
assert_eq!(table.get(&p).unwrap().placement, cluster("a"));
assert!(matches!(
cp.complete_migration(&p).await,
Err(ControlError::Transition(_))
));
}
#[tokio::test]
async fn out_of_phase_transitions_surface_as_control_errors() {
let (table, p) = backend();
let cp = ControlPlane::new(Arc::clone(&table));
assert!(matches!(
cp.enter_cutover(&p).await,
Err(ControlError::Transition(_))
));
cp.begin_migration(&p, cluster("b")).await.unwrap();
assert!(matches!(
cp.begin_migration(&p, cluster("b")).await,
Err(ControlError::Transition(_))
));
}
struct FlakyCutover {
inner: PlacementTable,
}
impl MigrationStore for FlakyCutover {
async fn begin_migration(
&self,
partition: &PartitionId,
to: Placement,
) -> Result<Epoch, MigrationError> {
PlacementTable::begin_migration(&self.inner, partition, to)
}
async fn enter_cutover(&self, _partition: &PartitionId) -> Result<Epoch, MigrationError> {
Err(MigrationError::Backend {
reason: "store unreachable",
})
}
async fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
PlacementTable::complete_migration(&self.inner, partition)
}
async fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
PlacementTable::abort_migration(&self.inner, partition)
}
async fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)> {
PlacementTable::state(&self.inner, partition)
}
}
#[tokio::test]
async fn a_backend_failure_surfaces_and_leaves_the_partition_unchanged() {
let table = PlacementTable::new();
let p = PartitionId::from("acme");
table.set(p.clone(), cluster("a"));
let cp = ControlPlane::new(FlakyCutover { inner: table });
cp.begin_migration(&p, cluster("b")).await.unwrap();
assert!(matches!(
cp.enter_cutover(&p).await,
Err(ControlError::Transition(MigrationError::Backend { .. }))
));
assert!(matches!(
cp.state(&p).await.unwrap().0,
PartitionState::Migrating {
phase: Phase::Draining,
..
}
));
}