use std::collections::HashMap;
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct MigrationPlan {
pub source: String,
pub target: String,
pub key_ranges: Vec<(String, String)>,
pub priority: u8,
}
impl MigrationPlan {
pub fn new(
source: impl Into<String>,
target: impl Into<String>,
key_ranges: Vec<(String, String)>,
priority: u8,
) -> Self {
Self {
source: source.into(),
target: target.into(),
key_ranges,
priority,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum MigrationStatus {
Pending,
InProgress {
transferred: usize,
total: usize,
},
Completed,
Failed(String),
}
#[derive(Debug, Clone)]
pub struct DataChunk {
pub key: String,
pub value: Vec<u8>,
pub checksum: u32,
}
impl DataChunk {
pub fn new(key: impl Into<String>, value: Vec<u8>) -> Self {
let checksum = Self::compute_checksum(&value);
Self {
key: key.into(),
value,
checksum,
}
}
pub fn compute_checksum(data: &[u8]) -> u32 {
crc32(data)
}
pub fn is_valid(&self) -> bool {
Self::compute_checksum(&self.value) == self.checksum
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct MigrationStats {
pub id: String,
pub bytes_transferred: u64,
pub chunks_transferred: usize,
pub duration_ms: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub enum MigrationError {
NotFound(String),
AlreadyComplete,
ChecksumMismatch,
InvalidRange,
}
impl std::fmt::Display for MigrationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MigrationError::NotFound(id) => write!(f, "Migration not found: {id}"),
MigrationError::AlreadyComplete => {
write!(f, "Migration has already been completed")
}
MigrationError::ChecksumMismatch => {
write!(f, "Chunk checksum does not match data")
}
MigrationError::InvalidRange => {
write!(f, "Key range is invalid (start > end)")
}
}
}
}
impl std::error::Error for MigrationError {}
struct MigrationState {
plan: MigrationPlan,
status: MigrationStatus,
start_time: Instant,
bytes_transferred: u64,
chunks_transferred: usize,
}
impl std::fmt::Debug for MigrationState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MigrationState")
.field("source", &self.plan.source)
.field("target", &self.plan.target)
.field("status", &self.status)
.field("bytes_transferred", &self.bytes_transferred)
.field("chunks_transferred", &self.chunks_transferred)
.finish()
}
}
impl MigrationState {
fn new(plan: MigrationPlan) -> Self {
Self {
plan,
status: MigrationStatus::Pending,
start_time: Instant::now(),
bytes_transferred: 0,
chunks_transferred: 0,
}
}
}
#[derive(Debug, Default)]
pub struct DataMigrator {
migrations: HashMap<String, MigrationState>,
next_id: u64,
}
impl DataMigrator {
pub fn new() -> Self {
Self {
migrations: HashMap::new(),
next_id: 1,
}
}
pub fn create_plan(
&mut self,
source: &str,
target: &str,
ranges: Vec<(String, String)>,
) -> MigrationPlan {
MigrationPlan::new(source, target, ranges, 128)
}
pub fn start_migration(&mut self, plan: MigrationPlan) -> String {
let id = format!("mig-{}", self.next_id);
self.next_id += 1;
let mut state = MigrationState::new(plan);
state.status = MigrationStatus::InProgress {
transferred: 0,
total: 0,
};
self.migrations.insert(id.clone(), state);
id
}
pub fn get_status(&self, id: &str) -> Option<&MigrationStatus> {
self.migrations.get(id).map(|s| &s.status)
}
pub fn transfer_chunk(&mut self, id: &str, chunk: DataChunk) -> Result<(), MigrationError> {
let state = self
.migrations
.get_mut(id)
.ok_or_else(|| MigrationError::NotFound(id.to_string()))?;
match &state.status {
MigrationStatus::Completed | MigrationStatus::Failed(_) => {
return Err(MigrationError::AlreadyComplete);
}
MigrationStatus::Pending => {
state.status = MigrationStatus::InProgress {
transferred: 0,
total: 0,
};
}
MigrationStatus::InProgress { .. } => {}
}
if !chunk.is_valid() {
return Err(MigrationError::ChecksumMismatch);
}
state.bytes_transferred += chunk.value.len() as u64;
state.chunks_transferred += 1;
if let MigrationStatus::InProgress { transferred, .. } = &mut state.status {
*transferred = state.chunks_transferred;
}
Ok(())
}
pub fn complete_migration(&mut self, id: &str) -> Result<MigrationStats, MigrationError> {
let state = self
.migrations
.get_mut(id)
.ok_or_else(|| MigrationError::NotFound(id.to_string()))?;
match &state.status {
MigrationStatus::Completed | MigrationStatus::Failed(_) => {
return Err(MigrationError::AlreadyComplete);
}
_ => {}
}
let duration_ms = state.start_time.elapsed().as_millis() as u64;
let stats = MigrationStats {
id: id.to_string(),
bytes_transferred: state.bytes_transferred,
chunks_transferred: state.chunks_transferred,
duration_ms,
};
state.status = MigrationStatus::Completed;
Ok(stats)
}
pub fn cancel_migration(&mut self, id: &str) -> Result<(), MigrationError> {
let state = self
.migrations
.get_mut(id)
.ok_or_else(|| MigrationError::NotFound(id.to_string()))?;
match &state.status {
MigrationStatus::Completed | MigrationStatus::Failed(_) => {
return Err(MigrationError::AlreadyComplete);
}
_ => {}
}
state.status = MigrationStatus::Failed("Cancelled".to_string());
Ok(())
}
}
fn crc32(data: &[u8]) -> u32 {
const POLY: u32 = 0xEDB8_8320;
let mut crc: u32 = 0xFFFF_FFFF;
for &byte in data {
crc ^= u32::from(byte);
for _ in 0..8 {
if crc & 1 != 0 {
crc = (crc >> 1) ^ POLY;
} else {
crc >>= 1;
}
}
}
crc ^ 0xFFFF_FFFF
}
#[cfg(test)]
mod tests {
use super::*;
fn migrator() -> DataMigrator {
DataMigrator::new()
}
fn simple_chunk(key: &str, value: &[u8]) -> DataChunk {
DataChunk::new(key, value.to_vec())
}
#[test]
fn test_create_plan_basic() {
let mut m = migrator();
let plan = m.create_plan("node-1", "node-2", vec![("a".to_string(), "z".to_string())]);
assert_eq!(plan.source, "node-1");
assert_eq!(plan.target, "node-2");
assert_eq!(plan.key_ranges.len(), 1);
}
#[test]
fn test_plan_multiple_ranges() {
let mut m = migrator();
let plan = m.create_plan(
"src",
"dst",
vec![("a".into(), "m".into()), ("n".into(), "z".into())],
);
assert_eq!(plan.key_ranges.len(), 2);
}
#[test]
fn test_start_migration_returns_id() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
assert!(!id.is_empty());
}
#[test]
fn test_start_migration_status_in_progress() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
match m.get_status(&id) {
Some(MigrationStatus::InProgress { .. }) => {}
other => panic!("Expected InProgress, got {other:?}"),
}
}
#[test]
fn test_start_migration_unique_ids() {
let mut m = migrator();
let plan1 = m.create_plan("s", "t", vec![]);
let id1 = m.start_migration(plan1);
let plan2 = m.create_plan("s", "t", vec![]);
let id2 = m.start_migration(plan2);
assert_ne!(id1, id2);
}
#[test]
fn test_get_status_unknown_id() {
let m = migrator();
assert!(m.get_status("no-such-id").is_none());
}
#[test]
fn test_transfer_chunk_valid() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
let chunk = simple_chunk("key1", b"hello world");
assert!(m.transfer_chunk(&id, chunk).is_ok());
}
#[test]
fn test_transfer_chunk_invalid_checksum() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
let mut chunk = simple_chunk("key1", b"hello");
chunk.checksum = 0; assert_eq!(
m.transfer_chunk(&id, chunk),
Err(MigrationError::ChecksumMismatch)
);
}
#[test]
fn test_transfer_chunk_unknown_migration() {
let mut m = migrator();
let chunk = simple_chunk("k", b"v");
assert_eq!(
m.transfer_chunk("bad-id", chunk),
Err(MigrationError::NotFound("bad-id".to_string()))
);
}
#[test]
fn test_transfer_chunk_to_completed_fails() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
m.complete_migration(&id).expect("complete ok");
let chunk = simple_chunk("k", b"v");
assert_eq!(
m.transfer_chunk(&id, chunk),
Err(MigrationError::AlreadyComplete)
);
}
#[test]
fn test_transfer_multiple_chunks_updates_count() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
for i in 0..5u8 {
let chunk = simple_chunk(&format!("k{i}"), &[i]);
m.transfer_chunk(&id, chunk).expect("ok");
}
if let Some(MigrationStatus::InProgress { transferred, .. }) = m.get_status(&id) {
assert_eq!(*transferred, 5);
} else {
panic!("Expected InProgress");
}
}
#[test]
fn test_complete_migration_returns_stats() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
m.transfer_chunk(&id, simple_chunk("k1", b"abc"))
.expect("ok");
let stats = m.complete_migration(&id).expect("complete ok");
assert_eq!(stats.id, id);
assert_eq!(stats.chunks_transferred, 1);
assert_eq!(stats.bytes_transferred, 3);
}
#[test]
fn test_complete_migration_twice_fails() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
m.complete_migration(&id).expect("first ok");
assert_eq!(
m.complete_migration(&id),
Err(MigrationError::AlreadyComplete)
);
}
#[test]
fn test_complete_migration_unknown() {
let mut m = migrator();
assert_eq!(
m.complete_migration("nope"),
Err(MigrationError::NotFound("nope".to_string()))
);
}
#[test]
fn test_complete_marks_completed() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
m.complete_migration(&id).expect("ok");
assert_eq!(m.get_status(&id), Some(&MigrationStatus::Completed));
}
#[test]
fn test_cancel_migration_ok() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
assert!(m.cancel_migration(&id).is_ok());
matches!(m.get_status(&id), Some(MigrationStatus::Failed(_)));
}
#[test]
fn test_cancel_completed_migration_fails() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
m.complete_migration(&id).expect("ok");
assert_eq!(
m.cancel_migration(&id),
Err(MigrationError::AlreadyComplete)
);
}
#[test]
fn test_cancel_unknown_migration_fails() {
let mut m = migrator();
assert_eq!(
m.cancel_migration("ghost"),
Err(MigrationError::NotFound("ghost".to_string()))
);
}
#[test]
fn test_chunk_new_valid_checksum() {
let chunk = DataChunk::new("k", b"hello".to_vec());
assert!(chunk.is_valid());
}
#[test]
fn test_chunk_corrupted_checksum() {
let mut chunk = DataChunk::new("k", b"hello".to_vec());
chunk.checksum = chunk.checksum.wrapping_add(1);
assert!(!chunk.is_valid());
}
#[test]
fn test_chunk_empty_value() {
let chunk = DataChunk::new("k", vec![]);
assert!(chunk.is_valid());
}
#[test]
fn test_error_not_found_display() {
let e = MigrationError::NotFound("x".into());
assert!(e.to_string().contains("x"));
}
#[test]
fn test_error_already_complete_display() {
let e = MigrationError::AlreadyComplete;
assert!(e.to_string().contains("completed"));
}
#[test]
fn test_error_checksum_mismatch_display() {
let e = MigrationError::ChecksumMismatch;
assert!(e.to_string().contains("checksum"));
}
#[test]
fn test_error_invalid_range_display() {
let e = MigrationError::InvalidRange;
assert!(
e.to_string().contains("invalid")
|| e.to_string().contains("range")
|| e.to_string().contains("Key")
);
}
#[test]
fn test_error_is_std_error() {
let e: Box<dyn std::error::Error> = Box::new(MigrationError::AlreadyComplete);
assert!(e.to_string().contains("completed"));
}
#[test]
fn test_stats_bytes_accumulated() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
m.transfer_chunk(&id, DataChunk::new("k1", vec![0u8; 100]))
.expect("ok");
m.transfer_chunk(&id, DataChunk::new("k2", vec![0u8; 200]))
.expect("ok");
let stats = m.complete_migration(&id).expect("ok");
assert_eq!(stats.bytes_transferred, 300);
}
#[test]
fn test_migrator_default() {
let _m: DataMigrator = DataMigrator::default();
}
#[test]
fn test_plan_new_constructor() {
let plan = MigrationPlan::new("src", "dst", vec![("a".into(), "z".into())], 200);
assert_eq!(plan.source, "src");
assert_eq!(plan.target, "dst");
assert_eq!(plan.priority, 200);
}
#[test]
fn test_data_chunk_compute_checksum_stable() {
let data = b"hello world";
let c1 = DataChunk::compute_checksum(data);
let c2 = DataChunk::compute_checksum(data);
assert_eq!(c1, c2);
}
#[test]
fn test_data_chunk_different_data_different_checksum() {
let c1 = DataChunk::compute_checksum(b"aaa");
let c2 = DataChunk::compute_checksum(b"bbb");
assert_ne!(c1, c2);
}
#[test]
fn test_cancel_after_transfer_ok() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
m.transfer_chunk(&id, DataChunk::new("k", b"data".to_vec()))
.expect("ok");
m.cancel_migration(&id).expect("cancel ok");
matches!(m.get_status(&id), Some(MigrationStatus::Failed(_)));
}
#[test]
fn test_stats_id_matches_migration() {
let mut m = migrator();
let plan = m.create_plan("s", "t", vec![]);
let id = m.start_migration(plan);
let stats = m.complete_migration(&id).expect("ok");
assert_eq!(stats.id, id);
}
}