Skip to main content

dbx_core/sharding/
node_ring.rs

1//! 일관된 해싱(Consistent Hashing)을 구현하는 Hash Ring (vnode 지원)
2
3use crate::sharding::router::{ShardNode, fnv1a_hash};
4use std::collections::BTreeMap;
5
6/// Consistent Hashing 구조를 담당하는 노드 링
7#[derive(Debug, Default)]
8pub struct NodeRing {
9    /// 가상 노드 해시 -> 물리 노드 ID 매핑
10    ring: BTreeMap<u64, usize>,
11    /// 하나의 물리 노드당 생성할 가상 노드(vnode) 개수
12    vnodes_per_node: usize,
13}
14
15impl NodeRing {
16    /// 가상 노드(vnode) 수를 지정하여 새로운 링 생성
17    pub fn new(vnodes_per_node: usize) -> Self {
18        Self {
19            ring: BTreeMap::new(),
20            vnodes_per_node,
21        }
22    }
23
24    /// 링에 물리 노드 추가
25    /// `ShardNode::weight` 에 비례하여 vnode 수를 늘립니다.
26    /// `weight = 1.0` 이면 `vnodes_per_node` 개, `2.0` 이면 2배 등
27    pub fn add_node(&mut self, node: &ShardNode) {
28        let vnode_count = ((self.vnodes_per_node as f64) * node.weight.max(0.1)) as usize;
29        for i in 0..vnode_count {
30            let vnode_key = format!("node:{}-vnode:{}", node.id, i);
31            let hash = fnv1a_hash(vnode_key.as_bytes());
32            self.ring.insert(hash, node.id);
33        }
34    }
35
36    /// 링에서 물리 노드 제거
37    /// weight 기반으로 vnode가 많이 배정되었을 수 있으므로
38    /// MAX_WEIGHT(10.0 기준)까지 범위를 넓혀 결정론적으로 제거합니다.
39    pub fn remove_node(&mut self, node_id: usize) {
40        let max_vnodes = (self.vnodes_per_node as f64 * 10.0) as usize;
41        let mut to_remove = Vec::new();
42        for i in 0..max_vnodes {
43            let vnode_key = format!("node:{}-vnode:{}", node_id, i);
44            let hash = fnv1a_hash(vnode_key.as_bytes());
45            to_remove.push(hash);
46        }
47        for hash in to_remove {
48            self.ring.remove(&hash);
49        }
50    }
51
52    /// 데이터 해시값을 기반으로 적절한 물리 노드의 ID를 반환
53    pub fn get_node(&self, hash_val: u64) -> Option<usize> {
54        if self.ring.is_empty() {
55            return None;
56        }
57
58        // 해시값보다 크거나 같은 첫 번째 가상 노드를 탐색
59        if let Some((_, &node_id)) = self.ring.range(hash_val..).next() {
60            Some(node_id)
61        } else {
62            // 끝까지 도달했다면 링의 맨 첫 번째 요소로 돌아감(Consistent Hashing의 특성)
63            let (_, &node_id) = self.ring.iter().next().unwrap();
64            Some(node_id)
65        }
66    }
67
68    /// 현재 링이 비어있는지 확인
69    pub fn is_empty(&self) -> bool {
70        self.ring.is_empty()
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77
78    #[test]
79    fn test_node_ring_distribution() {
80        let mut ring = NodeRing::new(300); // 노드당 300개 vnode
81        let n1 = ShardNode {
82            id: 0,
83            address: "1".into(),
84            weight: 1.0,
85        };
86        let n2 = ShardNode {
87            id: 1,
88            address: "2".into(),
89            weight: 1.0,
90        };
91        let n3 = ShardNode {
92            id: 2,
93            address: "3".into(),
94            weight: 1.0,
95        };
96
97        ring.add_node(&n1);
98        ring.add_node(&n2);
99        ring.add_node(&n3);
100
101        let mut counts = vec![0; 3];
102        // 난수와 유사한 데이터 키 수만개를 배치
103        let total_keys = 30000;
104        for i in 0..total_keys {
105            let k = format!("data-{}", i);
106            let h = fnv1a_hash(k.as_bytes());
107            let node_id = ring.get_node(h).unwrap();
108            counts[node_id] += 1;
109        }
110
111        // FNV-1a 해시의 단순성으로 인해 완벽한 균등 분배는 어려움
112        // 각 노드가 최소 10%의 키는 가져가는지(비정상 쏠림 방지)를 MVP 기준으로 검증
113        let expected = total_keys as f64 / 3.0;
114        let diff = expected * 0.7; // 70% 오차 허용 (매우 느슨하게: 최소 10% 보장)
115
116        for i in 0..3 {
117            let count = counts[i] as f64;
118            assert!(
119                (count - expected).abs() <= diff,
120                "Node {} load {} is outside expected {} +- {}",
121                i,
122                count,
123                expected,
124                diff
125            );
126        }
127    }
128
129    #[test]
130    fn test_node_ring_add_remove() {
131        let mut ring = NodeRing::new(10);
132        let n1 = ShardNode {
133            id: 100,
134            address: "A".into(),
135            weight: 1.0,
136        };
137        let n2 = ShardNode {
138            id: 200,
139            address: "B".into(),
140            weight: 1.0,
141        };
142
143        ring.add_node(&n1);
144        ring.add_node(&n2);
145
146        // 100번 혹은 200번 노드만 반환되어야 함
147        let node_id = ring.get_node(12345).unwrap();
148        assert!(node_id == 100 || node_id == 200);
149
150        // 노드 100 제거
151        ring.remove_node(100);
152
153        // 이제 항상 200만 반환되어야 함
154        for i in 0..10_000 {
155            assert_eq!(ring.get_node(i).unwrap(), 200);
156        }
157
158        // 노드 200 제거
159        ring.remove_node(200);
160        assert!(ring.is_empty());
161        assert_eq!(ring.get_node(12345), None);
162    }
163}