Skip to main content

dynomite/hashkit/
modula.rs

1//! Modula distribution: each live server contributes a number of
2//! continuum slots equal to its weight, and dispatch is `hash %
3//! ncontinuum`.
4//!
5//! # Examples
6//!
7//! ```
8//! use dynomite::hashkit::modula::{Continuum, ServerSpec};
9//! let c = Continuum::build(&[
10//!     ServerSpec { name: "a".into(), weight: 1 },
11//!     ServerSpec { name: "b".into(), weight: 1 },
12//! ]).unwrap();
13//! assert_eq!(c.dispatch(0).unwrap(), 0);
14//! assert_eq!(c.dispatch(1).unwrap(), 1);
15//! ```
16
17use crate::core::types::DynError;
18
19/// One continuum slot: weight unit -> server index.
20///
21/// # Examples
22///
23/// ```
24/// use dynomite::hashkit::modula::Slot;
25/// let s = Slot { server: 0 };
26/// assert_eq!(s.server, 0);
27/// ```
28#[derive(Clone, Copy, Debug, Eq, PartialEq)]
29pub struct Slot {
30    /// Index back into the original server list.
31    pub server: usize,
32}
33
34/// Sorted-by-server-order continuum.
35///
36/// # Examples
37///
38/// ```
39/// use dynomite::hashkit::modula::{Continuum, ServerSpec};
40/// let c = Continuum::build(&[ServerSpec { name: "a".into(), weight: 2 }]).unwrap();
41/// assert_eq!(c.len(), 2);
42/// ```
43#[derive(Clone, Debug, Default)]
44pub struct Continuum {
45    slots: Vec<Slot>,
46}
47
48/// Specification for one server in modula mode.
49///
50/// # Examples
51///
52/// ```
53/// use dynomite::hashkit::modula::ServerSpec;
54/// let s = ServerSpec { name: "redis-a".into(), weight: 3 };
55/// assert_eq!(s.weight, 3);
56/// ```
57#[derive(Clone, Debug)]
58pub struct ServerSpec {
59    /// Stable, unique identifier.
60    pub name: String,
61    /// Number of slots this server occupies on the continuum.
62    pub weight: u32,
63}
64
65impl Continuum {
66    /// Build the continuum from `servers`. Every server contributes
67    /// `weight` consecutive slots in declaration order.
68    ///
69    /// # Errors
70    ///
71    /// Currently never fails; the signature returns `Result` so the
72    /// distribution interface stays consistent with `ketama`.
73    ///
74    /// # Examples
75    ///
76    /// ```
77    /// use dynomite::hashkit::modula::{Continuum, ServerSpec};
78    /// let c = Continuum::build(&[
79    ///     ServerSpec { name: "a".into(), weight: 3 },
80    ///     ServerSpec { name: "b".into(), weight: 1 },
81    /// ]).unwrap();
82    /// assert_eq!(c.len(), 4);
83    /// ```
84    pub fn build(servers: &[ServerSpec]) -> Result<Self, DynError> {
85        let total: usize = servers.iter().map(|s| s.weight as usize).sum();
86        let mut slots = Vec::with_capacity(total);
87        for (idx, server) in servers.iter().enumerate() {
88            for _ in 0..server.weight {
89                slots.push(Slot { server: idx });
90            }
91        }
92        Ok(Self { slots })
93    }
94
95    /// Read-only view of the slots in their canonical order.
96    ///
97    /// # Examples
98    ///
99    /// ```
100    /// use dynomite::hashkit::modula::{Continuum, ServerSpec};
101    /// let c = Continuum::build(&[ServerSpec { name: "a".into(), weight: 2 }]).unwrap();
102    /// assert_eq!(c.slots().len(), 2);
103    /// ```
104    #[must_use]
105    pub fn slots(&self) -> &[Slot] {
106        &self.slots
107    }
108
109    /// Number of slots.
110    ///
111    /// # Examples
112    ///
113    /// ```
114    /// use dynomite::hashkit::modula::Continuum;
115    /// assert_eq!(Continuum::default().len(), 0);
116    /// ```
117    #[must_use]
118    pub fn len(&self) -> usize {
119        self.slots.len()
120    }
121
122    /// Whether the continuum is empty.
123    ///
124    /// # Examples
125    ///
126    /// ```
127    /// use dynomite::hashkit::modula::Continuum;
128    /// assert!(Continuum::default().is_empty());
129    /// ```
130    #[must_use]
131    pub fn is_empty(&self) -> bool {
132        self.slots.is_empty()
133    }
134
135    /// Map a 32-bit hash to a server index using `hash % len`.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error when the continuum is empty.
140    ///
141    /// # Examples
142    ///
143    /// ```
144    /// use dynomite::hashkit::modula::{Continuum, ServerSpec};
145    /// let c = Continuum::build(&[
146    ///     ServerSpec { name: "a".into(), weight: 1 },
147    ///     ServerSpec { name: "b".into(), weight: 1 },
148    /// ]).unwrap();
149    /// assert_eq!(c.dispatch(2).unwrap(), 0);
150    /// assert_eq!(c.dispatch(3).unwrap(), 1);
151    /// ```
152    pub fn dispatch(&self, hash: u32) -> Result<usize, DynError> {
153        if self.slots.is_empty() {
154            return Err(DynError::Generic("empty modula continuum".into()));
155        }
156        let i = (hash as usize) % self.slots.len();
157        Ok(self.slots[i].server)
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    fn equal_servers(n: usize) -> Vec<ServerSpec> {
166        (0..n)
167            .map(|i| ServerSpec {
168                name: format!("s-{i}"),
169                weight: 1,
170            })
171            .collect()
172    }
173
174    #[test]
175    fn empty_input_yields_empty_continuum() {
176        let c = Continuum::build(&[]).unwrap();
177        assert!(c.is_empty());
178        assert!(c.dispatch(0).is_err());
179    }
180
181    #[test]
182    fn equal_weight_dispatches_modulo() {
183        let c = Continuum::build(&equal_servers(4)).unwrap();
184        for h in 0u32..32 {
185            assert_eq!(c.dispatch(h).unwrap(), (h as usize) % 4);
186        }
187    }
188
189    #[test]
190    fn weights_expand_slots() {
191        let servers = vec![
192            ServerSpec {
193                name: "a".into(),
194                weight: 3,
195            },
196            ServerSpec {
197                name: "b".into(),
198                weight: 1,
199            },
200        ];
201        let c = Continuum::build(&servers).unwrap();
202        assert_eq!(c.len(), 4);
203        assert_eq!(c.dispatch(0).unwrap(), 0);
204        assert_eq!(c.dispatch(1).unwrap(), 0);
205        assert_eq!(c.dispatch(2).unwrap(), 0);
206        assert_eq!(c.dispatch(3).unwrap(), 1);
207        assert_eq!(c.dispatch(4).unwrap(), 0);
208    }
209
210    #[test]
211    fn dispatch_is_deterministic() {
212        let c = Continuum::build(&equal_servers(3)).unwrap();
213        for h in [0xdead_beef_u32, 1, 0, u32::MAX] {
214            assert_eq!(c.dispatch(h).unwrap(), c.dispatch(h).unwrap());
215        }
216    }
217}