use std::sync::Arc;
use super::snapshot::{MigrationPhaseSnapshot, MigrationSnapshot};
pub trait MigrationSnapshotSource: Send + Sync + 'static {
fn list(&self) -> Vec<MigrationSnapshot>;
}
#[derive(Debug, Default)]
pub struct NoOpMigrationSnapshotSource;
impl MigrationSnapshotSource for NoOpMigrationSnapshotSource {
fn list(&self) -> Vec<MigrationSnapshot> {
Vec::new()
}
}
pub struct OrchestratorMigrationSnapshotSource {
orchestrator: Arc<crate::adapter::net::compute::MigrationOrchestrator>,
source_handler: Option<Arc<crate::adapter::net::compute::MigrationSourceHandler>>,
}
impl OrchestratorMigrationSnapshotSource {
pub fn new(orchestrator: Arc<crate::adapter::net::compute::MigrationOrchestrator>) -> Self {
Self {
orchestrator,
source_handler: None,
}
}
#[must_use]
pub fn with_source_handler(
mut self,
handler: Arc<crate::adapter::net::compute::MigrationSourceHandler>,
) -> Self {
self.source_handler = Some(handler);
self
}
}
impl MigrationSnapshotSource for OrchestratorMigrationSnapshotSource {
fn list(&self) -> Vec<MigrationSnapshot> {
self.orchestrator
.list_migrations()
.into_iter()
.map(|item| {
let phase = MigrationPhaseSnapshot::from(item.phase);
let buffered_events = self
.source_handler
.as_ref()
.and_then(|sh| sh.buffered_event_count(item.daemon_origin))
.unwrap_or(0) as u32;
MigrationSnapshot {
daemon_origin: item.daemon_origin,
phase,
elapsed_ms: item.elapsed_ms,
source_node: item.source_node,
target_node: item.target_node,
age_in_phase_ms: item.age_in_phase_ms,
snapshot_bytes: item.snapshot_bytes,
retries: item.retries,
progress_pct: phase_progress_pct(phase),
buffered_events,
}
})
.collect()
}
}
fn phase_progress_pct(phase: MigrationPhaseSnapshot) -> Option<u8> {
#[allow(unreachable_patterns)]
Some(match phase {
MigrationPhaseSnapshot::Snapshot => 10,
MigrationPhaseSnapshot::Transfer => 30,
MigrationPhaseSnapshot::Restore => 50,
MigrationPhaseSnapshot::Replay => 70,
MigrationPhaseSnapshot::Cutover => 90,
MigrationPhaseSnapshot::Complete => 100,
_ => return None,
})
}
pub(crate) fn no_op_arc() -> Arc<dyn MigrationSnapshotSource> {
Arc::new(NoOpMigrationSnapshotSource)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn no_op_returns_empty_vec() {
let s = NoOpMigrationSnapshotSource;
assert!(s.list().is_empty());
}
#[test]
fn orchestrator_adapter_returns_empty_for_idle_orchestrator() {
use crate::adapter::net::compute::{DaemonRegistry, MigrationOrchestrator};
let registry = Arc::new(DaemonRegistry::new());
let orch = Arc::new(MigrationOrchestrator::new(registry, 7));
let s = OrchestratorMigrationSnapshotSource::new(orch);
assert!(s.list().is_empty());
}
#[test]
fn phase_progress_pct_returns_known_percentages_for_every_phase() {
assert_eq!(
phase_progress_pct(MigrationPhaseSnapshot::Snapshot),
Some(10)
);
assert_eq!(
phase_progress_pct(MigrationPhaseSnapshot::Transfer),
Some(30)
);
assert_eq!(
phase_progress_pct(MigrationPhaseSnapshot::Restore),
Some(50)
);
assert_eq!(phase_progress_pct(MigrationPhaseSnapshot::Replay), Some(70));
assert_eq!(
phase_progress_pct(MigrationPhaseSnapshot::Cutover),
Some(90)
);
assert_eq!(
phase_progress_pct(MigrationPhaseSnapshot::Complete),
Some(100)
);
let pcts = [
phase_progress_pct(MigrationPhaseSnapshot::Snapshot).unwrap(),
phase_progress_pct(MigrationPhaseSnapshot::Transfer).unwrap(),
phase_progress_pct(MigrationPhaseSnapshot::Restore).unwrap(),
phase_progress_pct(MigrationPhaseSnapshot::Replay).unwrap(),
phase_progress_pct(MigrationPhaseSnapshot::Cutover).unwrap(),
phase_progress_pct(MigrationPhaseSnapshot::Complete).unwrap(),
];
for w in pcts.windows(2) {
assert!(w[0] < w[1], "phase progress regressed: {} ≥ {}", w[0], w[1]);
}
}
}