Skip to main content

dynomite/cluster/
datacenter.rs

1//! Cluster topology: datacenters, racks, and the per-rack token
2//! continuum.
3//!
4//! A [`Datacenter`] owns a list of [`Rack`]s; a `Rack` owns a vector
5//! of [`Continuum`] points that map a [`DynToken`] to the index of
6//! the [`crate::cluster::peer::Peer`] that owns the token. The shape
7//! mirrors the reference engine's `struct datacenter` /
8//! `struct rack` / `struct continuum` exactly.
9//!
10//! Token ring lookups use the same upper-bound search as the
11//! reference engine's `vnode_dispatch` (the search lives in
12//! [`crate::cluster::vnode`]). The data shape stays here so the
13//! lookup can be tested against curated continua without spinning
14//! up a full pool.
15//!
16//! # Examples
17//!
18//! ```
19//! use dynomite::cluster::datacenter::{Datacenter, Rack};
20//! let mut dc = Datacenter::new("dc1".into());
21//! dc.upsert_rack("rack1".into());
22//! assert_eq!(dc.racks().len(), 1);
23//! ```
24
25use crate::hashkit::{random_slicing::RandomSlices, DynToken};
26
27/// Per-rack ring storage. Either the historical token continuum
28/// (a sorted [`Vec<Continuum>`]) or a [`RandomSlices`] table.
29/// The dispatcher consults whichever variant is present without
30/// caring which one it is; the engine produces only one shape
31/// per rack at a time.
32#[derive(Clone, Debug, Default)]
33pub enum RackRing {
34    /// Historical per-peer token continuum.
35    #[default]
36    Continuum,
37    /// Random-slicing partition table.
38    RandomSlicing(RandomSlices),
39}
40
41/// One ring point: a `(token, peer_idx)` mapping.
42#[derive(Clone, Debug)]
43pub struct Continuum {
44    /// Token at this ring position.
45    pub token: DynToken,
46    /// Index into the pool's peer array.
47    pub peer_idx: u32,
48}
49
50impl Continuum {
51    /// Construct a continuum point.
52    ///
53    /// # Examples
54    ///
55    /// ```
56    /// use dynomite::cluster::datacenter::Continuum;
57    /// use dynomite::hashkit::DynToken;
58    /// let c = Continuum::new(DynToken::from_u32(7), 3);
59    /// assert_eq!(c.peer_idx, 3);
60    /// ```
61    #[must_use]
62    pub fn new(token: DynToken, peer_idx: u32) -> Self {
63        Self { token, peer_idx }
64    }
65}
66
67/// One rack within a datacenter.
68///
69/// `continuums` is sorted by token in ascending order to support
70/// `vnode_dispatch`'s binary search; callers append continuum
71/// points and call [`Rack::sort_continuums`] once after a batch of
72/// updates.
73#[derive(Clone, Debug)]
74pub struct Rack {
75    name: String,
76    dc: String,
77    nserver_continuum: u32,
78    ncontinuum: u32,
79    continuums: Vec<Continuum>,
80    /// Optional [`RandomSlices`] table; populated when the
81    /// pool's distribution is
82    /// [`crate::conf::Distribution::RandomSlicing`].
83    /// [`Self::continuums`] stays in sync with the peer set so
84    /// the shadow distribution path can binary-search the same
85    /// rack without a second build.
86    ring: RackRing,
87}
88
89impl Rack {
90    /// Build an empty rack.
91    ///
92    /// # Examples
93    ///
94    /// ```
95    /// use dynomite::cluster::datacenter::Rack;
96    /// let r = Rack::new("rack1".into(), "dc1".into());
97    /// assert_eq!(r.name(), "rack1");
98    /// assert_eq!(r.dc(), "dc1");
99    /// assert!(r.continuums().is_empty());
100    /// ```
101    #[must_use]
102    pub fn new(name: String, dc: String) -> Self {
103        Self {
104            name,
105            dc,
106            nserver_continuum: 0,
107            ncontinuum: 0,
108            continuums: Vec::new(),
109            ring: RackRing::Continuum,
110        }
111    }
112
113    /// Rack name.
114    #[must_use]
115    pub fn name(&self) -> &str {
116        &self.name
117    }
118
119    /// Owning datacenter name.
120    #[must_use]
121    pub fn dc(&self) -> &str {
122        &self.dc
123    }
124
125    /// Borrow the continuum points (sorted by token).
126    #[must_use]
127    pub fn continuums(&self) -> &[Continuum] {
128        &self.continuums
129    }
130
131    /// Number of distinct servers ever added (mirrors the C
132    /// reference's `nserver_continuum`).
133    #[must_use]
134    pub fn nserver_continuum(&self) -> u32 {
135        self.nserver_continuum
136    }
137
138    /// Number of continuum points (mirrors `ncontinuum`).
139    #[must_use]
140    pub fn ncontinuum(&self) -> u32 {
141        self.ncontinuum
142    }
143
144    /// Append continuum points produced from one peer's tokens.
145    ///
146    /// # Examples
147    ///
148    /// ```
149    /// use dynomite::cluster::datacenter::{Continuum, Rack};
150    /// use dynomite::hashkit::DynToken;
151    /// let mut r = Rack::new("rack1".into(), "dc1".into());
152    /// r.add_peer_tokens(0, &[DynToken::from_u32(2), DynToken::from_u32(5)]);
153    /// assert_eq!(r.ncontinuum(), 2);
154    /// assert_eq!(r.nserver_continuum(), 2);
155    /// ```
156    pub fn add_peer_tokens(&mut self, peer_idx: u32, tokens: &[DynToken]) {
157        for tok in tokens {
158            self.continuums.push(Continuum::new(tok.clone(), peer_idx));
159            self.ncontinuum = self.ncontinuum.saturating_add(1);
160            self.nserver_continuum = self.nserver_continuum.saturating_add(1);
161        }
162    }
163
164    /// Sort the continuum by token (ascending). Callers must invoke
165    /// this once after a batch of [`Rack::add_peer_tokens`] calls so
166    /// that [`crate::cluster::vnode::dispatch`] can binary-search
167    /// the ring.
168    ///
169    /// # Examples
170    ///
171    /// ```
172    /// use dynomite::cluster::datacenter::Rack;
173    /// use dynomite::hashkit::DynToken;
174    /// let mut r = Rack::new("r".into(), "d".into());
175    /// r.add_peer_tokens(0, &[DynToken::from_u32(5)]);
176    /// r.add_peer_tokens(1, &[DynToken::from_u32(2)]);
177    /// r.sort_continuums();
178    /// assert_eq!(r.continuums()[0].peer_idx, 1);
179    /// ```
180    pub fn sort_continuums(&mut self) {
181        self.continuums.sort_by(|a, b| a.token.cmp(&b.token));
182    }
183
184    /// Reset all continuum state for a fresh rebuild.
185    pub fn clear_continuums(&mut self) {
186        self.continuums.clear();
187        self.ncontinuum = 0;
188        self.nserver_continuum = 0;
189        self.ring = RackRing::Continuum;
190    }
191
192    /// Borrow the rack's ring representation.
193    #[must_use]
194    pub fn ring(&self) -> &RackRing {
195        &self.ring
196    }
197
198    /// Install a [`RandomSlices`] table on this rack. The
199    /// continuum stays populated so the shadow-distribution
200    /// path (and any operator-side dump) can still walk the
201    /// vnode view.
202    pub fn set_random_slices(&mut self, slices: RandomSlices) {
203        self.ring = RackRing::RandomSlicing(slices);
204    }
205
206    /// True when the rack's live distribution is random
207    /// slicing.
208    #[must_use]
209    pub fn is_random_slicing(&self) -> bool {
210        matches!(self.ring, RackRing::RandomSlicing(_))
211    }
212
213    /// Borrow the rack's [`RandomSlices`] table when one is
214    /// installed.
215    #[must_use]
216    pub fn random_slices(&self) -> Option<&RandomSlices> {
217        match &self.ring {
218            RackRing::RandomSlicing(s) => Some(s),
219            RackRing::Continuum => None,
220        }
221    }
222}
223
224/// One datacenter.
225///
226/// Mirrors `struct datacenter`. The
227/// `preselected_rack_for_replication` field is computed by
228/// [`crate::cluster::pool::ServerPool::preselect_remote_racks`]
229/// and reproduces the reference engine's strategy of choosing one
230/// rack per remote DC for cross-DC writes.
231#[derive(Clone, Debug)]
232pub struct Datacenter {
233    name: String,
234    racks: Vec<Rack>,
235    preselected_rack_for_replication: Option<usize>,
236}
237
238impl Datacenter {
239    /// Build an empty datacenter.
240    ///
241    /// # Examples
242    ///
243    /// ```
244    /// use dynomite::cluster::datacenter::Datacenter;
245    /// let dc = Datacenter::new("dc1".into());
246    /// assert_eq!(dc.name(), "dc1");
247    /// ```
248    #[must_use]
249    pub fn new(name: String) -> Self {
250        Self {
251            name,
252            racks: Vec::new(),
253            preselected_rack_for_replication: None,
254        }
255    }
256
257    /// Datacenter name.
258    #[must_use]
259    pub fn name(&self) -> &str {
260        &self.name
261    }
262
263    /// Borrow the rack list.
264    #[must_use]
265    pub fn racks(&self) -> &[Rack] {
266        &self.racks
267    }
268
269    /// Mutable rack list.
270    pub fn racks_mut(&mut self) -> &mut [Rack] {
271        &mut self.racks
272    }
273
274    /// Find a rack by name.
275    #[must_use]
276    pub fn rack(&self, name: &str) -> Option<&Rack> {
277        self.racks.iter().find(|r| r.name() == name)
278    }
279
280    /// Mutably find a rack by name.
281    pub fn rack_mut(&mut self, name: &str) -> Option<&mut Rack> {
282        self.racks.iter_mut().find(|r| r.name() == name)
283    }
284
285    /// Find a rack and return its index.
286    #[must_use]
287    pub fn rack_idx(&self, name: &str) -> Option<usize> {
288        self.racks.iter().position(|r| r.name() == name)
289    }
290
291    /// Insert a rack if absent; return a mutable handle to the
292    /// rack regardless. Mirrors `server_get_rack`.
293    pub fn upsert_rack(&mut self, name: String) -> &mut Rack {
294        if let Some(idx) = self.rack_idx(&name) {
295            return &mut self.racks[idx];
296        }
297        let dc = self.name.clone();
298        self.racks.push(Rack::new(name, dc));
299        let last = self.racks.len() - 1;
300        &mut self.racks[last]
301    }
302
303    /// Sort racks by name (ascending). Used by
304    /// [`crate::cluster::pool::ServerPool::preselect_remote_racks`]
305    /// and mirrors the `array_sort(&dc->racks, rack_name_cmp)` call
306    /// in `preselect_remote_rack_for_replication`.
307    pub fn sort_racks(&mut self) {
308        self.racks.sort_by(|a, b| a.name().cmp(b.name()));
309    }
310
311    /// Preselected rack index for replicating writes from another
312    /// DC into this DC.
313    #[must_use]
314    pub fn preselected_rack_idx(&self) -> Option<usize> {
315        self.preselected_rack_for_replication
316    }
317
318    /// Borrow the preselected rack, if any.
319    #[must_use]
320    pub fn preselected_rack(&self) -> Option<&Rack> {
321        self.preselected_rack_for_replication
322            .and_then(|i| self.racks.get(i))
323    }
324
325    /// Set the preselected rack index (used by the pool's
326    /// [`preselect_remote_racks`](crate::cluster::pool::ServerPool::preselect_remote_racks)
327    /// pass).
328    pub fn set_preselected_rack_idx(&mut self, idx: Option<usize>) {
329        self.preselected_rack_for_replication = idx;
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336
337    #[test]
338    fn upsert_is_idempotent() {
339        let mut dc = Datacenter::new("dc1".into());
340        dc.upsert_rack("r1".into());
341        dc.upsert_rack("r1".into());
342        assert_eq!(dc.racks().len(), 1);
343    }
344
345    #[test]
346    fn rack_continuum_sorts_by_token() {
347        let mut r = Rack::new("r".into(), "d".into());
348        r.add_peer_tokens(0, &[DynToken::from_u32(9)]);
349        r.add_peer_tokens(1, &[DynToken::from_u32(3)]);
350        r.add_peer_tokens(2, &[DynToken::from_u32(6)]);
351        r.sort_continuums();
352        let idxs: Vec<u32> = r.continuums().iter().map(|c| c.peer_idx).collect();
353        assert_eq!(idxs, vec![1, 2, 0]);
354    }
355
356    #[test]
357    fn rack_clear_resets_counters() {
358        let mut r = Rack::new("r".into(), "d".into());
359        r.add_peer_tokens(0, &[DynToken::from_u32(1)]);
360        r.clear_continuums();
361        assert_eq!(r.ncontinuum(), 0);
362        assert!(r.continuums().is_empty());
363    }
364
365    #[test]
366    fn sort_racks_alphabetical() {
367        let mut dc = Datacenter::new("dc1".into());
368        dc.upsert_rack("rb".into());
369        dc.upsert_rack("ra".into());
370        dc.sort_racks();
371        assert_eq!(dc.racks()[0].name(), "ra");
372    }
373}