atomr_cluster_sharding/
allocation.rs1use std::collections::HashMap;
13
14pub trait ShardAllocationStrategy: Send + Sync + 'static {
16 fn allocate_shard(&self, shard_id: &str, regions: &HashMap<String, usize>) -> Option<String>;
20
21 fn rebalance(
26 &self,
27 current_allocations: &HashMap<String, String>,
28 regions: &HashMap<String, usize>,
29 ) -> Vec<String>;
30}
31
32pub struct LeastShardAllocationStrategy {
36 pub max_simultaneous_rebalance: usize,
38 pub rebalance_threshold: usize,
40}
41
42impl Default for LeastShardAllocationStrategy {
43 fn default() -> Self {
44 Self { max_simultaneous_rebalance: 3, rebalance_threshold: 1 }
45 }
46}
47
48impl ShardAllocationStrategy for LeastShardAllocationStrategy {
49 fn allocate_shard(&self, _shard_id: &str, regions: &HashMap<String, usize>) -> Option<String> {
50 let mut entries: Vec<(&String, &usize)> = regions.iter().collect();
51 entries.sort_by(|a, b| a.1.cmp(b.1).then_with(|| a.0.cmp(b.0)));
52 entries.first().map(|(k, _)| (*k).clone())
53 }
54
55 fn rebalance(&self, current: &HashMap<String, String>, regions: &HashMap<String, usize>) -> Vec<String> {
56 if regions.len() < 2 {
57 return Vec::new();
58 }
59 let max = regions.values().max().copied().unwrap_or(0);
60 let min = regions.values().min().copied().unwrap_or(0);
61 if max.saturating_sub(min) < self.rebalance_threshold {
62 return Vec::new();
63 }
64 let mut max_regions: Vec<&String> =
66 regions.iter().filter(|(_, c)| **c == max).map(|(k, _)| k).collect();
67 max_regions.sort();
68 let mut out: Vec<String> = current
69 .iter()
70 .filter(|(_, region)| max_regions.iter().any(|r| **r == **region))
71 .map(|(shard, _)| shard.clone())
72 .collect();
73 out.sort();
74 out.truncate(self.max_simultaneous_rebalance);
75 out
76 }
77}
78
79pub struct PinnedAllocationStrategy {
83 pub region: String,
84}
85
86impl ShardAllocationStrategy for PinnedAllocationStrategy {
87 fn allocate_shard(&self, _shard_id: &str, _regions: &HashMap<String, usize>) -> Option<String> {
88 Some(self.region.clone())
89 }
90
91 fn rebalance(
92 &self,
93 _current: &HashMap<String, String>,
94 _regions: &HashMap<String, usize>,
95 ) -> Vec<String> {
96 Vec::new()
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103
104 fn regions(pairs: &[(&str, usize)]) -> HashMap<String, usize> {
105 pairs.iter().map(|(k, v)| (k.to_string(), *v)).collect()
106 }
107
108 fn allocs(pairs: &[(&str, &str)]) -> HashMap<String, String> {
109 pairs.iter().map(|(k, v)| (k.to_string(), v.to_string())).collect()
110 }
111
112 #[test]
113 fn least_shard_picks_emptiest_region() {
114 let s = LeastShardAllocationStrategy::default();
115 let r = regions(&[("r1", 5), ("r2", 1), ("r3", 3)]);
116 assert_eq!(s.allocate_shard("x", &r), Some("r2".into()));
117 }
118
119 #[test]
120 fn least_shard_picks_no_region_when_empty() {
121 let s = LeastShardAllocationStrategy::default();
122 let r = regions(&[]);
123 assert!(s.allocate_shard("x", &r).is_none());
124 }
125
126 #[test]
127 fn least_shard_breaks_ties_lexicographically() {
128 let s = LeastShardAllocationStrategy::default();
129 let r = regions(&[("r2", 1), ("r1", 1)]);
130 assert_eq!(s.allocate_shard("x", &r), Some("r1".into()));
131 }
132
133 #[test]
134 fn rebalance_returns_empty_when_balanced() {
135 let s = LeastShardAllocationStrategy::default();
136 let r = regions(&[("r1", 3), ("r2", 3)]);
137 let a = allocs(&[]);
138 assert!(s.rebalance(&a, &r).is_empty());
139 }
140
141 #[test]
142 fn rebalance_returns_shards_from_loaded_region() {
143 let s = LeastShardAllocationStrategy { max_simultaneous_rebalance: 2, rebalance_threshold: 2 };
144 let r = regions(&[("r1", 5), ("r2", 1)]);
145 let a = allocs(&[("s1", "r1"), ("s2", "r1"), ("s3", "r1"), ("s4", "r1"), ("s5", "r1"), ("s6", "r2")]);
146 let out = s.rebalance(&a, &r);
147 assert_eq!(out.len(), 2);
148 for shard in &out {
149 assert_eq!(a.get(shard), Some(&"r1".to_string()));
150 }
151 }
152
153 #[test]
154 fn pinned_always_picks_same_region() {
155 let s = PinnedAllocationStrategy { region: "fixed".into() };
156 let r = regions(&[("r1", 0), ("r2", 0)]);
157 assert_eq!(s.allocate_shard("a", &r), Some("fixed".into()));
158 assert_eq!(s.allocate_shard("b", &r), Some("fixed".into()));
159 }
160}