Skip to main content

atomr_cluster_sharding/
allocation.rs

1//! Shard allocation strategies.
2//!
3//! A strategy answers two questions:
4//!
5//! 1. **Where to place a new shard?** [`ShardAllocationStrategy::
6//!    allocate_shard`] picks one of the currently-known regions to
7//!    host a freshly-requested shard.
8//! 2. **What to rebalance?** [`ShardAllocationStrategy::rebalance`]
9//!    surfaces a list of shard ids that should migrate, given the
10//!    current allocation table and the per-region shard counts.
11
12use std::collections::HashMap;
13
14/// Pluggable shard allocation policy.
15pub trait ShardAllocationStrategy: Send + Sync + 'static {
16    /// Pick a region to host `shard_id`. `regions` lists known
17    /// candidates (region path → current shard count). Returns the
18    /// chosen region's path, or `None` if `regions` is empty.
19    fn allocate_shard(&self, shard_id: &str, regions: &HashMap<String, usize>) -> Option<String>;
20
21    /// Decide which shards should migrate. `current_allocations` is
22    /// the shard → region mapping; `regions` lists region shard
23    /// counts. Returns shard ids to hand off (the coordinator picks
24    /// the destination).
25    fn rebalance(
26        &self,
27        current_allocations: &HashMap<String, String>,
28        regions: &HashMap<String, usize>,
29    ) -> Vec<String>;
30}
31
32/// Place new shards on the region with the fewest shards, breaking
33/// ties lexicographically. Rebalances if the difference between most-
34/// and least-loaded regions exceeds `rebalance_threshold`.
35pub struct LeastShardAllocationStrategy {
36    /// Migrate at most this many shards per rebalance call.
37    pub max_simultaneous_rebalance: usize,
38    /// Rebalance only if `max_count - min_count >= rebalance_threshold`.
39    pub rebalance_threshold: usize,
40}
41
42impl Default for LeastShardAllocationStrategy {
43    fn default() -> Self {
44        Self { max_simultaneous_rebalance: 3, rebalance_threshold: 1 }
45    }
46}
47
48impl ShardAllocationStrategy for LeastShardAllocationStrategy {
49    fn allocate_shard(&self, _shard_id: &str, regions: &HashMap<String, usize>) -> Option<String> {
50        let mut entries: Vec<(&String, &usize)> = regions.iter().collect();
51        entries.sort_by(|a, b| a.1.cmp(b.1).then_with(|| a.0.cmp(b.0)));
52        entries.first().map(|(k, _)| (*k).clone())
53    }
54
55    fn rebalance(&self, current: &HashMap<String, String>, regions: &HashMap<String, usize>) -> Vec<String> {
56        if regions.len() < 2 {
57            return Vec::new();
58        }
59        let max = regions.values().max().copied().unwrap_or(0);
60        let min = regions.values().min().copied().unwrap_or(0);
61        if max.saturating_sub(min) < self.rebalance_threshold {
62            return Vec::new();
63        }
64        // Pick shard ids that live on the most-loaded region(s).
65        let mut max_regions: Vec<&String> =
66            regions.iter().filter(|(_, c)| **c == max).map(|(k, _)| k).collect();
67        max_regions.sort();
68        let mut out: Vec<String> = current
69            .iter()
70            .filter(|(_, region)| max_regions.iter().any(|r| **r == **region))
71            .map(|(shard, _)| shard.clone())
72            .collect();
73        out.sort();
74        out.truncate(self.max_simultaneous_rebalance);
75        out
76    }
77}
78
79/// Pin every shard to a specific region (useful for tests / static
80/// allocation). analog: a custom strategy returning a
81/// constant region.
82pub struct PinnedAllocationStrategy {
83    pub region: String,
84}
85
86impl ShardAllocationStrategy for PinnedAllocationStrategy {
87    fn allocate_shard(&self, _shard_id: &str, _regions: &HashMap<String, usize>) -> Option<String> {
88        Some(self.region.clone())
89    }
90
91    fn rebalance(
92        &self,
93        _current: &HashMap<String, String>,
94        _regions: &HashMap<String, usize>,
95    ) -> Vec<String> {
96        Vec::new()
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    fn regions(pairs: &[(&str, usize)]) -> HashMap<String, usize> {
105        pairs.iter().map(|(k, v)| (k.to_string(), *v)).collect()
106    }
107
108    fn allocs(pairs: &[(&str, &str)]) -> HashMap<String, String> {
109        pairs.iter().map(|(k, v)| (k.to_string(), v.to_string())).collect()
110    }
111
112    #[test]
113    fn least_shard_picks_emptiest_region() {
114        let s = LeastShardAllocationStrategy::default();
115        let r = regions(&[("r1", 5), ("r2", 1), ("r3", 3)]);
116        assert_eq!(s.allocate_shard("x", &r), Some("r2".into()));
117    }
118
119    #[test]
120    fn least_shard_picks_no_region_when_empty() {
121        let s = LeastShardAllocationStrategy::default();
122        let r = regions(&[]);
123        assert!(s.allocate_shard("x", &r).is_none());
124    }
125
126    #[test]
127    fn least_shard_breaks_ties_lexicographically() {
128        let s = LeastShardAllocationStrategy::default();
129        let r = regions(&[("r2", 1), ("r1", 1)]);
130        assert_eq!(s.allocate_shard("x", &r), Some("r1".into()));
131    }
132
133    #[test]
134    fn rebalance_returns_empty_when_balanced() {
135        let s = LeastShardAllocationStrategy::default();
136        let r = regions(&[("r1", 3), ("r2", 3)]);
137        let a = allocs(&[]);
138        assert!(s.rebalance(&a, &r).is_empty());
139    }
140
141    #[test]
142    fn rebalance_returns_shards_from_loaded_region() {
143        let s = LeastShardAllocationStrategy { max_simultaneous_rebalance: 2, rebalance_threshold: 2 };
144        let r = regions(&[("r1", 5), ("r2", 1)]);
145        let a = allocs(&[("s1", "r1"), ("s2", "r1"), ("s3", "r1"), ("s4", "r1"), ("s5", "r1"), ("s6", "r2")]);
146        let out = s.rebalance(&a, &r);
147        assert_eq!(out.len(), 2);
148        for shard in &out {
149            assert_eq!(a.get(shard), Some(&"r1".to_string()));
150        }
151    }
152
153    #[test]
154    fn pinned_always_picks_same_region() {
155        let s = PinnedAllocationStrategy { region: "fixed".into() };
156        let r = regions(&[("r1", 0), ("r2", 0)]);
157        assert_eq!(s.allocate_shard("a", &r), Some("fixed".into()));
158        assert_eq!(s.allocate_shard("b", &r), Some("fixed".into()));
159    }
160}