use std::sync::Arc;
use super::event::MigrationId;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct MigrationAbortError {
pub reason: String,
}
impl std::fmt::Display for MigrationAbortError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "migration-abort dispatcher failed: {}", self.reason)
}
}
impl std::error::Error for MigrationAbortError {}
pub trait MigrationAborter: Send + Sync + 'static {
fn abort(&self, migration: MigrationId) -> Result<(), MigrationAbortError>;
fn is_no_op(&self) -> bool {
false
}
}
#[derive(Debug, Default)]
pub struct NoOpMigrationAborter;
impl MigrationAborter for NoOpMigrationAborter {
fn abort(&self, _migration: MigrationId) -> Result<(), MigrationAbortError> {
Ok(())
}
fn is_no_op(&self) -> bool {
true
}
}
pub const DEFAULT_MIGRATION_ABORT_BUFFERING_CAPACITY: usize = 256;
#[derive(Debug)]
pub struct BufferingMigrationAborter {
calls: parking_lot::Mutex<std::collections::VecDeque<MigrationId>>,
capacity: usize,
dropped: std::sync::atomic::AtomicU64,
}
impl Default for BufferingMigrationAborter {
fn default() -> Self {
Self::with_capacity(DEFAULT_MIGRATION_ABORT_BUFFERING_CAPACITY)
}
}
impl BufferingMigrationAborter {
pub fn with_capacity(capacity: usize) -> Self {
Self {
calls: parking_lot::Mutex::new(std::collections::VecDeque::new()),
capacity: capacity.max(1),
dropped: std::sync::atomic::AtomicU64::new(0),
}
}
pub fn captured(&self) -> Vec<MigrationId> {
self.calls.lock().iter().copied().collect()
}
pub fn dropped_count(&self) -> u64 {
self.dropped.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl MigrationAborter for BufferingMigrationAborter {
fn abort(&self, migration: MigrationId) -> Result<(), MigrationAbortError> {
let mut buf = self.calls.lock();
if buf.len() >= self.capacity {
buf.pop_front();
self.dropped
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
buf.push_back(migration);
Ok(())
}
}
pub struct OrchestratorMigrationAborter {
orchestrator: Arc<crate::adapter::net::compute::orchestrator::MigrationOrchestrator>,
reason: String,
}
impl OrchestratorMigrationAborter {
pub fn new(
orchestrator: Arc<crate::adapter::net::compute::orchestrator::MigrationOrchestrator>,
) -> Self {
Self::with_reason(orchestrator, "ICE KillMigration".to_string())
}
pub fn with_reason(
orchestrator: Arc<crate::adapter::net::compute::orchestrator::MigrationOrchestrator>,
reason: String,
) -> Self {
Self {
orchestrator,
reason,
}
}
}
impl MigrationAborter for OrchestratorMigrationAborter {
fn abort(&self, migration: MigrationId) -> Result<(), MigrationAbortError> {
match self
.orchestrator
.abort_migration(migration, self.reason.clone())
{
Ok(_) => Ok(()),
Err(crate::adapter::net::MigrationError::DaemonNotFound(_)) => Ok(()),
Err(e) => Err(MigrationAbortError {
reason: e.to_string(),
}),
}
}
}
pub(crate) fn no_op_arc() -> Arc<dyn MigrationAborter> {
Arc::new(NoOpMigrationAborter)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn no_op_returns_ok_for_every_id() {
let a = NoOpMigrationAborter;
a.abort(1).expect("no_op infallible");
a.abort(u64::MAX).expect("no_op infallible");
}
#[test]
fn buffering_captures_ids_in_order() {
let a = BufferingMigrationAborter::default();
for i in 1..=3 {
a.abort(i).unwrap();
}
let captured = a.captured();
assert_eq!(captured, vec![1, 2, 3]);
assert_eq!(a.dropped_count(), 0);
}
#[test]
fn buffering_drops_oldest_when_over_capacity() {
let a = BufferingMigrationAborter::with_capacity(2);
for i in 1..=5 {
a.abort(i).unwrap();
}
assert_eq!(a.captured(), vec![4, 5]);
assert_eq!(a.dropped_count(), 3);
}
}