use crate::sharding::node_ring::NodeRing;
#[derive(Debug, Clone)]
pub struct MigrationTask {
pub key: Vec<u8>,
pub from_node: usize,
pub to_node: usize,
}
pub struct Rebalancer<'a> {
old_ring: &'a NodeRing,
new_ring: &'a NodeRing,
}
impl<'a> Rebalancer<'a> {
pub fn new(old_ring: &'a NodeRing, new_ring: &'a NodeRing) -> Self {
Self { old_ring, new_ring }
}
pub fn compute_tasks(&self, keys: &[Vec<u8>]) -> Vec<MigrationTask> {
let mut tasks = Vec::new();
for key in keys {
let hash = fnv1a_hash(key);
let old_node = self.old_ring.get_node(hash);
let new_node = self.new_ring.get_node(hash);
match (old_node, new_node) {
(Some(old), Some(new)) if old != new => {
tasks.push(MigrationTask {
key: key.clone(),
from_node: old,
to_node: new,
});
}
_ => {} }
}
tasks
}
pub fn execute<R, W, D>(
&self,
tasks: &[MigrationTask],
mut read_fn: R,
mut write_fn: W,
mut delete_fn: D,
) where
R: FnMut(usize, &[u8]) -> Option<Vec<u8>>,
W: FnMut(usize, &[u8], &[u8]),
D: FnMut(usize, &[u8]),
{
for task in tasks {
if let Some(data) = read_fn(task.from_node, &task.key) {
write_fn(task.to_node, &task.key, &data);
delete_fn(task.from_node, &task.key);
}
}
}
}
fn fnv1a_hash(data: &[u8]) -> u64 {
const FNV_OFFSET: u64 = 14_695_981_039_346_656_037;
const FNV_PRIME: u64 = 1_099_511_628_211;
let mut hash = FNV_OFFSET;
for &byte in data {
hash ^= byte as u64;
hash = hash.wrapping_mul(FNV_PRIME);
}
hash
}
pub fn rebalancer_on_add<'a>(old_ring: &'a NodeRing, new_ring: &'a NodeRing) -> Rebalancer<'a> {
Rebalancer::new(old_ring, new_ring)
}
pub fn rebalancer_on_remove<'a>(old_ring: &'a NodeRing, new_ring: &'a NodeRing) -> Rebalancer<'a> {
Rebalancer::new(old_ring, new_ring)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sharding::router::ShardNode;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[test]
fn test_rebalance_on_add_node() {
let mut old_ring = NodeRing::new(50);
old_ring.add_node(&ShardNode {
id: 0,
address: "A".into(),
weight: 1.0,
});
old_ring.add_node(&ShardNode {
id: 1,
address: "B".into(),
weight: 1.0,
});
let mut new_ring = NodeRing::new(50);
new_ring.add_node(&ShardNode {
id: 0,
address: "A".into(),
weight: 1.0,
});
new_ring.add_node(&ShardNode {
id: 1,
address: "B".into(),
weight: 1.0,
});
new_ring.add_node(&ShardNode {
id: 2,
address: "C".into(),
weight: 1.0,
});
let rebalancer = Rebalancer::new(&old_ring, &new_ring);
let keys: Vec<Vec<u8>> = (0..100u32)
.map(|i| format!("key-{}", i).into_bytes())
.collect();
let tasks = rebalancer.compute_tasks(&keys);
assert!(!tasks.is_empty(), "노드 추가 시 이관 태스크가 있어야 함");
for task in &tasks {
assert_eq!(task.to_node, 2, "새 노드(2)로 이관되어야 함");
}
}
#[test]
fn test_rebalance_execute() {
let mut old_ring = NodeRing::new(50);
old_ring.add_node(&ShardNode {
id: 0,
address: "A".into(),
weight: 1.0,
});
old_ring.add_node(&ShardNode {
id: 1,
address: "B".into(),
weight: 1.0,
});
let mut new_ring = NodeRing::new(50);
new_ring.add_node(&ShardNode {
id: 0,
address: "A".into(),
weight: 1.0,
});
new_ring.add_node(&ShardNode {
id: 1,
address: "B".into(),
weight: 1.0,
});
new_ring.add_node(&ShardNode {
id: 2,
address: "C".into(),
weight: 1.0,
});
let store: Arc<Mutex<HashMap<(usize, Vec<u8>), Vec<u8>>>> =
Arc::new(Mutex::new(HashMap::new()));
let keys: Vec<Vec<u8>> = (0..20u32)
.map(|i| format!("key-{}", i).into_bytes())
.collect();
for key in &keys {
let hash = fnv1a_hash(key);
let node = old_ring.get_node(hash).unwrap();
store
.lock()
.unwrap()
.insert((node, key.clone()), b"value".to_vec());
}
let rebalancer = Rebalancer::new(&old_ring, &new_ring);
let tasks = rebalancer.compute_tasks(&keys);
let store_r = Arc::clone(&store);
let store_w = Arc::clone(&store);
let store_d = Arc::clone(&store);
rebalancer.execute(
&tasks,
move |node_id, key| {
store_r
.lock()
.unwrap()
.get(&(node_id, key.to_vec()))
.cloned()
},
move |node_id, key, value| {
store_w
.lock()
.unwrap()
.insert((node_id, key.to_vec()), value.to_vec());
},
move |node_id, key| {
store_d.lock().unwrap().remove(&(node_id, key.to_vec()));
},
);
let snapshot = store.lock().unwrap();
for task in &tasks {
assert!(
snapshot.contains_key(&(task.to_node, task.key.clone())),
"이관된 키는 새 노드에 있어야 함"
);
assert!(
!snapshot.contains_key(&(task.from_node, task.key.clone())),
"이관된 키는 이전 노드에서 삭제되어야 함"
);
}
}
#[test]
fn test_no_migration_same_ring() {
let mut ring = NodeRing::new(50);
ring.add_node(&ShardNode {
id: 0,
address: "A".into(),
weight: 1.0,
});
ring.add_node(&ShardNode {
id: 1,
address: "B".into(),
weight: 1.0,
});
let rebalancer = Rebalancer::new(&ring, &ring);
let keys: Vec<Vec<u8>> = (0..50u32)
.map(|i| format!("k-{}", i).into_bytes())
.collect();
let tasks = rebalancer.compute_tasks(&keys);
assert!(tasks.is_empty(), "동일한 링이면 이관 없음");
}
}