1use crate::sharding::node_ring::NodeRing;
11
12#[derive(Debug, Clone)]
14pub struct MigrationTask {
15 pub key: Vec<u8>,
17 pub from_node: usize,
19 pub to_node: usize,
21}
22
23pub struct Rebalancer<'a> {
27 old_ring: &'a NodeRing,
28 new_ring: &'a NodeRing,
29}
30
31impl<'a> Rebalancer<'a> {
32 pub fn new(old_ring: &'a NodeRing, new_ring: &'a NodeRing) -> Self {
34 Self { old_ring, new_ring }
35 }
36
37 pub fn compute_tasks(&self, keys: &[Vec<u8>]) -> Vec<MigrationTask> {
41 let mut tasks = Vec::new();
42
43 for key in keys {
44 let hash = fnv1a_hash(key);
45 let old_node = self.old_ring.get_node(hash);
46 let new_node = self.new_ring.get_node(hash);
47
48 match (old_node, new_node) {
49 (Some(old), Some(new)) if old != new => {
50 tasks.push(MigrationTask {
51 key: key.clone(),
52 from_node: old,
53 to_node: new,
54 });
55 }
56 _ => {} }
58 }
59
60 tasks
61 }
62
63 pub fn execute<R, W, D>(
69 &self,
70 tasks: &[MigrationTask],
71 mut read_fn: R,
72 mut write_fn: W,
73 mut delete_fn: D,
74 ) where
75 R: FnMut(usize, &[u8]) -> Option<Vec<u8>>,
76 W: FnMut(usize, &[u8], &[u8]),
77 D: FnMut(usize, &[u8]),
78 {
79 for task in tasks {
80 if let Some(data) = read_fn(task.from_node, &task.key) {
81 write_fn(task.to_node, &task.key, &data);
82 delete_fn(task.from_node, &task.key);
83 }
84 }
85 }
86}
87
88fn fnv1a_hash(data: &[u8]) -> u64 {
90 const FNV_OFFSET: u64 = 14_695_981_039_346_656_037;
91 const FNV_PRIME: u64 = 1_099_511_628_211;
92 let mut hash = FNV_OFFSET;
93 for &byte in data {
94 hash ^= byte as u64;
95 hash = hash.wrapping_mul(FNV_PRIME);
96 }
97 hash
98}
99
100pub fn rebalancer_on_add<'a>(old_ring: &'a NodeRing, new_ring: &'a NodeRing) -> Rebalancer<'a> {
102 Rebalancer::new(old_ring, new_ring)
103}
104
105pub fn rebalancer_on_remove<'a>(old_ring: &'a NodeRing, new_ring: &'a NodeRing) -> Rebalancer<'a> {
107 Rebalancer::new(old_ring, new_ring)
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113 use crate::sharding::router::ShardNode;
114 use std::collections::HashMap;
115 use std::sync::{Arc, Mutex};
116
117 #[test]
118 fn test_rebalance_on_add_node() {
119 let mut old_ring = NodeRing::new(50);
121 old_ring.add_node(&ShardNode {
122 id: 0,
123 address: "A".into(),
124 weight: 1.0,
125 });
126 old_ring.add_node(&ShardNode {
127 id: 1,
128 address: "B".into(),
129 weight: 1.0,
130 });
131
132 let mut new_ring = NodeRing::new(50);
134 new_ring.add_node(&ShardNode {
135 id: 0,
136 address: "A".into(),
137 weight: 1.0,
138 });
139 new_ring.add_node(&ShardNode {
140 id: 1,
141 address: "B".into(),
142 weight: 1.0,
143 });
144 new_ring.add_node(&ShardNode {
145 id: 2,
146 address: "C".into(),
147 weight: 1.0,
148 });
149
150 let rebalancer = Rebalancer::new(&old_ring, &new_ring);
151
152 let keys: Vec<Vec<u8>> = (0..100u32)
154 .map(|i| format!("key-{}", i).into_bytes())
155 .collect();
156 let tasks = rebalancer.compute_tasks(&keys);
157
158 assert!(!tasks.is_empty(), "노드 추가 시 이관 태스크가 있어야 함");
160
161 for task in &tasks {
163 assert_eq!(task.to_node, 2, "새 노드(2)로 이관되어야 함");
164 }
165 }
166
167 #[test]
168 fn test_rebalance_execute() {
169 let mut old_ring = NodeRing::new(50);
170 old_ring.add_node(&ShardNode {
171 id: 0,
172 address: "A".into(),
173 weight: 1.0,
174 });
175 old_ring.add_node(&ShardNode {
176 id: 1,
177 address: "B".into(),
178 weight: 1.0,
179 });
180
181 let mut new_ring = NodeRing::new(50);
182 new_ring.add_node(&ShardNode {
183 id: 0,
184 address: "A".into(),
185 weight: 1.0,
186 });
187 new_ring.add_node(&ShardNode {
188 id: 1,
189 address: "B".into(),
190 weight: 1.0,
191 });
192 new_ring.add_node(&ShardNode {
193 id: 2,
194 address: "C".into(),
195 weight: 1.0,
196 });
197
198 let store: Arc<Mutex<HashMap<(usize, Vec<u8>), Vec<u8>>>> =
200 Arc::new(Mutex::new(HashMap::new()));
201
202 let keys: Vec<Vec<u8>> = (0..20u32)
204 .map(|i| format!("key-{}", i).into_bytes())
205 .collect();
206 for key in &keys {
207 let hash = fnv1a_hash(key);
208 let node = old_ring.get_node(hash).unwrap();
209 store
210 .lock()
211 .unwrap()
212 .insert((node, key.clone()), b"value".to_vec());
213 }
214
215 let rebalancer = Rebalancer::new(&old_ring, &new_ring);
216 let tasks = rebalancer.compute_tasks(&keys);
217
218 let store_r = Arc::clone(&store);
219 let store_w = Arc::clone(&store);
220 let store_d = Arc::clone(&store);
221
222 rebalancer.execute(
223 &tasks,
224 move |node_id, key| {
225 store_r
226 .lock()
227 .unwrap()
228 .get(&(node_id, key.to_vec()))
229 .cloned()
230 },
231 move |node_id, key, value| {
232 store_w
233 .lock()
234 .unwrap()
235 .insert((node_id, key.to_vec()), value.to_vec());
236 },
237 move |node_id, key| {
238 store_d.lock().unwrap().remove(&(node_id, key.to_vec()));
239 },
240 );
241
242 let snapshot = store.lock().unwrap();
244 for task in &tasks {
245 assert!(
246 snapshot.contains_key(&(task.to_node, task.key.clone())),
247 "이관된 키는 새 노드에 있어야 함"
248 );
249 assert!(
250 !snapshot.contains_key(&(task.from_node, task.key.clone())),
251 "이관된 키는 이전 노드에서 삭제되어야 함"
252 );
253 }
254 }
255
256 #[test]
257 fn test_no_migration_same_ring() {
258 let mut ring = NodeRing::new(50);
259 ring.add_node(&ShardNode {
260 id: 0,
261 address: "A".into(),
262 weight: 1.0,
263 });
264 ring.add_node(&ShardNode {
265 id: 1,
266 address: "B".into(),
267 weight: 1.0,
268 });
269
270 let rebalancer = Rebalancer::new(&ring, &ring);
271 let keys: Vec<Vec<u8>> = (0..50u32)
272 .map(|i| format!("k-{}", i).into_bytes())
273 .collect();
274 let tasks = rebalancer.compute_tasks(&keys);
275 assert!(tasks.is_empty(), "동일한 링이면 이관 없음");
276 }
277}