Skip to main content

dbx_core/sharding/
rebalancer.rs

1//! 데이터 리밸런싱 (Data Rebalancing)
2//!
3//! 노드가 추가/제거될 때 영향받는 해시 범위의 데이터를 새 담당 노드로 이관합니다.
4//!
5//! ## 동작 원리
6//! 1. `NodeRing`에서 노드 추가/제거 후 영향받는 해시 범위 계산
7//! 2. `Rebalancer`는 해당 범위의 키를 이전 담당 노드에서 읽어 새 담당 노드로 이관
8//! 3. 이관 완료 전까지 읽기는 old/new 양쪽 모두 시도 (double-read 전략)
9
10use crate::sharding::node_ring::NodeRing;
11
12/// 키 이관 작업 단위
13#[derive(Debug, Clone)]
14pub struct MigrationTask {
15    /// 이관 대상 키
16    pub key: Vec<u8>,
17    /// 기존 담당 노드 ID
18    pub from_node: usize,
19    /// 새 담당 노드 ID
20    pub to_node: usize,
21}
22
23/// 리밸런서
24///
25/// 노드 추가/제거 시 영향받는 키 집합을 계산하고 이관 태스크를 생성합니다.
26pub struct Rebalancer<'a> {
27    old_ring: &'a NodeRing,
28    new_ring: &'a NodeRing,
29}
30
31impl<'a> Rebalancer<'a> {
32    /// 이전 링과 새 링 비교로 리밸런서 생성
33    pub fn new(old_ring: &'a NodeRing, new_ring: &'a NodeRing) -> Self {
34        Self { old_ring, new_ring }
35    }
36
37    /// 주어진 키 목록 중 이관이 필요한 키를 찾아 `MigrationTask` 목록으로 반환
38    ///
39    /// 동일 해시에 대해 old/new 담당 노드가 다를 경우 이관 대상으로 판단합니다.
40    pub fn compute_tasks(&self, keys: &[Vec<u8>]) -> Vec<MigrationTask> {
41        let mut tasks = Vec::new();
42
43        for key in keys {
44            let hash = fnv1a_hash(key);
45            let old_node = self.old_ring.get_node(hash);
46            let new_node = self.new_ring.get_node(hash);
47
48            match (old_node, new_node) {
49                (Some(old), Some(new)) if old != new => {
50                    tasks.push(MigrationTask {
51                        key: key.clone(),
52                        from_node: old,
53                        to_node: new,
54                    });
55                }
56                _ => {} // 담당 노드가 그대로이거나 링이 비어있는 경우 스킵
57            }
58        }
59
60        tasks
61    }
62
63    /// 이관 태스크를 실행합니다.
64    ///
65    /// `read_fn(node_id, key)` → 데이터 조회
66    /// `write_fn(node_id, key, value)` → 이관 대상에 저장
67    /// `delete_fn(node_id, key)` → 이전 노드에서 삭제
68    pub fn execute<R, W, D>(
69        &self,
70        tasks: &[MigrationTask],
71        mut read_fn: R,
72        mut write_fn: W,
73        mut delete_fn: D,
74    ) where
75        R: FnMut(usize, &[u8]) -> Option<Vec<u8>>,
76        W: FnMut(usize, &[u8], &[u8]),
77        D: FnMut(usize, &[u8]),
78    {
79        for task in tasks {
80            if let Some(data) = read_fn(task.from_node, &task.key) {
81                write_fn(task.to_node, &task.key, &data);
82                delete_fn(task.from_node, &task.key);
83            }
84        }
85    }
86}
87
88/// FNV-1a 해시 (node_ring과 동일)
89fn fnv1a_hash(data: &[u8]) -> u64 {
90    const FNV_OFFSET: u64 = 14_695_981_039_346_656_037;
91    const FNV_PRIME: u64 = 1_099_511_628_211;
92    let mut hash = FNV_OFFSET;
93    for &byte in data {
94        hash ^= byte as u64;
95        hash = hash.wrapping_mul(FNV_PRIME);
96    }
97    hash
98}
99
100/// 새 노드 추가 시 리밸런서를 생성하는 헬퍼
101pub fn rebalancer_on_add<'a>(old_ring: &'a NodeRing, new_ring: &'a NodeRing) -> Rebalancer<'a> {
102    Rebalancer::new(old_ring, new_ring)
103}
104
105/// 노드 제거 시 리밸런서를 생성하는 헬퍼
106pub fn rebalancer_on_remove<'a>(old_ring: &'a NodeRing, new_ring: &'a NodeRing) -> Rebalancer<'a> {
107    Rebalancer::new(old_ring, new_ring)
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113    use crate::sharding::router::ShardNode;
114    use std::collections::HashMap;
115    use std::sync::{Arc, Mutex};
116
117    #[test]
118    fn test_rebalance_on_add_node() {
119        // 초기: 노드 0, 1
120        let mut old_ring = NodeRing::new(50);
121        old_ring.add_node(&ShardNode {
122            id: 0,
123            address: "A".into(),
124            weight: 1.0,
125        });
126        old_ring.add_node(&ShardNode {
127            id: 1,
128            address: "B".into(),
129            weight: 1.0,
130        });
131
132        // 새 노드 2 추가
133        let mut new_ring = NodeRing::new(50);
134        new_ring.add_node(&ShardNode {
135            id: 0,
136            address: "A".into(),
137            weight: 1.0,
138        });
139        new_ring.add_node(&ShardNode {
140            id: 1,
141            address: "B".into(),
142            weight: 1.0,
143        });
144        new_ring.add_node(&ShardNode {
145            id: 2,
146            address: "C".into(),
147            weight: 1.0,
148        });
149
150        let rebalancer = Rebalancer::new(&old_ring, &new_ring);
151
152        // 테스트용 키 목록
153        let keys: Vec<Vec<u8>> = (0..100u32)
154            .map(|i| format!("key-{}", i).into_bytes())
155            .collect();
156        let tasks = rebalancer.compute_tasks(&keys);
157
158        // 노드가 추가되었으므로 일부 키는 이관 대상이어야 함
159        assert!(!tasks.is_empty(), "노드 추가 시 이관 태스크가 있어야 함");
160
161        // 이관 태스크는 to_node = 2 (새 노드)여야 함
162        for task in &tasks {
163            assert_eq!(task.to_node, 2, "새 노드(2)로 이관되어야 함");
164        }
165    }
166
167    #[test]
168    fn test_rebalance_execute() {
169        let mut old_ring = NodeRing::new(50);
170        old_ring.add_node(&ShardNode {
171            id: 0,
172            address: "A".into(),
173            weight: 1.0,
174        });
175        old_ring.add_node(&ShardNode {
176            id: 1,
177            address: "B".into(),
178            weight: 1.0,
179        });
180
181        let mut new_ring = NodeRing::new(50);
182        new_ring.add_node(&ShardNode {
183            id: 0,
184            address: "A".into(),
185            weight: 1.0,
186        });
187        new_ring.add_node(&ShardNode {
188            id: 1,
189            address: "B".into(),
190            weight: 1.0,
191        });
192        new_ring.add_node(&ShardNode {
193            id: 2,
194            address: "C".into(),
195            weight: 1.0,
196        });
197
198        // 시뮬레이션 스토어 (노드 ID → 키-값 맵)
199        let store: Arc<Mutex<HashMap<(usize, Vec<u8>), Vec<u8>>>> =
200            Arc::new(Mutex::new(HashMap::new()));
201
202        // 초기 데이터 삽입
203        let keys: Vec<Vec<u8>> = (0..20u32)
204            .map(|i| format!("key-{}", i).into_bytes())
205            .collect();
206        for key in &keys {
207            let hash = fnv1a_hash(key);
208            let node = old_ring.get_node(hash).unwrap();
209            store
210                .lock()
211                .unwrap()
212                .insert((node, key.clone()), b"value".to_vec());
213        }
214
215        let rebalancer = Rebalancer::new(&old_ring, &new_ring);
216        let tasks = rebalancer.compute_tasks(&keys);
217
218        let store_r = Arc::clone(&store);
219        let store_w = Arc::clone(&store);
220        let store_d = Arc::clone(&store);
221
222        rebalancer.execute(
223            &tasks,
224            move |node_id, key| {
225                store_r
226                    .lock()
227                    .unwrap()
228                    .get(&(node_id, key.to_vec()))
229                    .cloned()
230            },
231            move |node_id, key, value| {
232                store_w
233                    .lock()
234                    .unwrap()
235                    .insert((node_id, key.to_vec()), value.to_vec());
236            },
237            move |node_id, key| {
238                store_d.lock().unwrap().remove(&(node_id, key.to_vec()));
239            },
240        );
241
242        // 이관된 키들은 이제 새 담당 노드에 있어야 함
243        let snapshot = store.lock().unwrap();
244        for task in &tasks {
245            assert!(
246                snapshot.contains_key(&(task.to_node, task.key.clone())),
247                "이관된 키는 새 노드에 있어야 함"
248            );
249            assert!(
250                !snapshot.contains_key(&(task.from_node, task.key.clone())),
251                "이관된 키는 이전 노드에서 삭제되어야 함"
252            );
253        }
254    }
255
256    #[test]
257    fn test_no_migration_same_ring() {
258        let mut ring = NodeRing::new(50);
259        ring.add_node(&ShardNode {
260            id: 0,
261            address: "A".into(),
262            weight: 1.0,
263        });
264        ring.add_node(&ShardNode {
265            id: 1,
266            address: "B".into(),
267            weight: 1.0,
268        });
269
270        let rebalancer = Rebalancer::new(&ring, &ring);
271        let keys: Vec<Vec<u8>> = (0..50u32)
272            .map(|i| format!("k-{}", i).into_bytes())
273            .collect();
274        let tasks = rebalancer.compute_tasks(&keys);
275        assert!(tasks.is_empty(), "동일한 링이면 이관 없음");
276    }
277}