use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use osproxy_core::{Clock, Epoch, Instant, PartitionId, SystemClock};
use osproxy_spi::Placement;
use osproxy_tenancy::{MigrationError, PartitionState, Phase, PlacementTable};
use thiserror::Error;
pub const DEFAULT_DRAIN_BARRIER: Duration = Duration::from_secs(30);
#[allow(
async_fn_in_trait,
reason = "driven by a single operator/automation controller, not the request \
hot path; consumed through generics in ControlPlane (no dyn), where \
Send is verified at the await site, mirroring the SPI traits (docs/02 §2). \
A distributed backend (etcd/Consul) needs async + fallible I/O here."
)]
pub trait MigrationStore {
async fn begin_migration(
&self,
partition: &PartitionId,
to: Placement,
) -> Result<Epoch, MigrationError>;
async fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, MigrationError>;
async fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError>;
async fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError>;
async fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)>;
}
impl MigrationStore for PlacementTable {
async fn begin_migration(
&self,
partition: &PartitionId,
to: Placement,
) -> Result<Epoch, MigrationError> {
PlacementTable::begin_migration(self, partition, to)
}
async fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
PlacementTable::enter_cutover(self, partition)
}
async fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
PlacementTable::complete_migration(self, partition)
}
async fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
PlacementTable::abort_migration(self, partition)
}
async fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)> {
PlacementTable::state(self, partition)
}
}
impl<T: MigrationStore + Sync + ?Sized> MigrationStore for Arc<T> {
async fn begin_migration(
&self,
partition: &PartitionId,
to: Placement,
) -> Result<Epoch, MigrationError> {
(**self).begin_migration(partition, to).await
}
async fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
(**self).enter_cutover(partition).await
}
async fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
(**self).complete_migration(partition).await
}
async fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
(**self).abort_migration(partition).await
}
async fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)> {
(**self).state(partition).await
}
}
#[non_exhaustive]
#[derive(Clone, PartialEq, Eq, Debug, Error)]
pub enum ControlError {
#[error("transition refused: {0}")]
Transition(#[from] MigrationError),
#[error("drain barrier not elapsed; wait {remaining:?} longer")]
BarrierPending {
remaining: Duration,
},
}
pub struct ControlPlane<S> {
store: S,
clock: Arc<dyn Clock>,
barrier: Duration,
cutover_at: Mutex<HashMap<PartitionId, Instant>>,
}
impl<S: std::fmt::Debug> std::fmt::Debug for ControlPlane<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ControlPlane")
.field("store", &self.store)
.field("barrier", &self.barrier)
.field("cutover_at", &self.cutover_at)
.finish_non_exhaustive()
}
}
impl<S: MigrationStore> ControlPlane<S> {
#[must_use]
pub fn new(store: S) -> Self {
Self {
store,
clock: Arc::new(SystemClock),
barrier: DEFAULT_DRAIN_BARRIER,
cutover_at: Mutex::new(HashMap::new()),
}
}
#[must_use]
pub fn with_barrier(mut self, barrier: Duration) -> Self {
self.barrier = barrier;
self
}
#[must_use]
pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
self.clock = clock;
self
}
pub async fn begin_migration(
&self,
partition: &PartitionId,
to: Placement,
) -> Result<Epoch, ControlError> {
Ok(self.store.begin_migration(partition, to).await?)
}
pub async fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, ControlError> {
let epoch = self.store.enter_cutover(partition).await?;
self.lock().insert(partition.clone(), self.clock.now());
Ok(epoch)
}
pub async fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, ControlError> {
let now = self.clock.now();
let in_cutover = matches!(
self.store.state(partition).await,
Some((
PartitionState::Migrating {
phase: Phase::Cutover,
..
},
_
))
);
if in_cutover {
let started = *self.lock().entry(partition.clone()).or_insert(now);
let elapsed = now.saturating_duration_since(started);
if elapsed < self.barrier {
return Err(ControlError::BarrierPending {
remaining: self.barrier.saturating_sub(elapsed),
});
}
}
let epoch = self.store.complete_migration(partition).await?;
self.lock().remove(partition);
Ok(epoch)
}
pub async fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, ControlError> {
let epoch = self.store.abort_migration(partition).await?;
self.lock().remove(partition);
Ok(epoch)
}
pub async fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)> {
self.store.state(partition).await
}
fn lock(&self) -> std::sync::MutexGuard<'_, HashMap<PartitionId, Instant>> {
self.cutover_at
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
}