dynomite/cluster/datacenter.rs
1//! Cluster topology: datacenters, racks, and the per-rack token
2//! continuum.
3//!
4//! A [`Datacenter`] owns a list of [`Rack`]s; a `Rack` owns a vector
5//! of [`Continuum`] points that map a [`DynToken`] to the index of
6//! the [`crate::cluster::peer::Peer`] that owns the token. The shape
7//! mirrors the reference engine's `struct datacenter` /
8//! `struct rack` / `struct continuum` exactly.
9//!
10//! Token ring lookups use the same upper-bound search as the
11//! reference engine's `vnode_dispatch` (the search lives in
12//! [`crate::cluster::vnode`]). The data shape stays here so the
13//! lookup can be tested against curated continua without spinning
14//! up a full pool.
15//!
16//! # Examples
17//!
18//! ```
19//! use dynomite::cluster::datacenter::{Datacenter, Rack};
20//! let mut dc = Datacenter::new("dc1".into());
21//! dc.upsert_rack("rack1".into());
22//! assert_eq!(dc.racks().len(), 1);
23//! ```
24
25use crate::hashkit::{random_slicing::RandomSlices, DynToken};
26
27/// Per-rack ring storage. Either the historical token continuum
28/// (a sorted [`Vec<Continuum>`]) or a [`RandomSlices`] table.
29/// The dispatcher consults whichever variant is present without
30/// caring which one it is; the engine produces only one shape
31/// per rack at a time.
32#[derive(Clone, Debug, Default)]
33pub enum RackRing {
34 /// Historical per-peer token continuum.
35 #[default]
36 Continuum,
37 /// Random-slicing partition table.
38 RandomSlicing(RandomSlices),
39}
40
41/// One ring point: a `(token, peer_idx)` mapping.
42#[derive(Clone, Debug)]
43pub struct Continuum {
44 /// Token at this ring position.
45 pub token: DynToken,
46 /// Index into the pool's peer array.
47 pub peer_idx: u32,
48}
49
50impl Continuum {
51 /// Construct a continuum point.
52 ///
53 /// # Examples
54 ///
55 /// ```
56 /// use dynomite::cluster::datacenter::Continuum;
57 /// use dynomite::hashkit::DynToken;
58 /// let c = Continuum::new(DynToken::from_u32(7), 3);
59 /// assert_eq!(c.peer_idx, 3);
60 /// ```
61 #[must_use]
62 pub fn new(token: DynToken, peer_idx: u32) -> Self {
63 Self { token, peer_idx }
64 }
65}
66
67/// One rack within a datacenter.
68///
69/// `continuums` is sorted by token in ascending order to support
70/// `vnode_dispatch`'s binary search; callers append continuum
71/// points and call [`Rack::sort_continuums`] once after a batch of
72/// updates.
73#[derive(Clone, Debug)]
74pub struct Rack {
75 name: String,
76 dc: String,
77 nserver_continuum: u32,
78 ncontinuum: u32,
79 continuums: Vec<Continuum>,
80 /// Optional [`RandomSlices`] table; populated when the
81 /// pool's distribution is
82 /// [`crate::conf::Distribution::RandomSlicing`].
83 /// [`Self::continuums`] stays in sync with the peer set so
84 /// the shadow distribution path can binary-search the same
85 /// rack without a second build.
86 ring: RackRing,
87}
88
89impl Rack {
90 /// Build an empty rack.
91 ///
92 /// # Examples
93 ///
94 /// ```
95 /// use dynomite::cluster::datacenter::Rack;
96 /// let r = Rack::new("rack1".into(), "dc1".into());
97 /// assert_eq!(r.name(), "rack1");
98 /// assert_eq!(r.dc(), "dc1");
99 /// assert!(r.continuums().is_empty());
100 /// ```
101 #[must_use]
102 pub fn new(name: String, dc: String) -> Self {
103 Self {
104 name,
105 dc,
106 nserver_continuum: 0,
107 ncontinuum: 0,
108 continuums: Vec::new(),
109 ring: RackRing::Continuum,
110 }
111 }
112
113 /// Rack name.
114 #[must_use]
115 pub fn name(&self) -> &str {
116 &self.name
117 }
118
119 /// Owning datacenter name.
120 #[must_use]
121 pub fn dc(&self) -> &str {
122 &self.dc
123 }
124
125 /// Borrow the continuum points (sorted by token).
126 #[must_use]
127 pub fn continuums(&self) -> &[Continuum] {
128 &self.continuums
129 }
130
131 /// Number of distinct servers ever added (mirrors the C
132 /// reference's `nserver_continuum`).
133 #[must_use]
134 pub fn nserver_continuum(&self) -> u32 {
135 self.nserver_continuum
136 }
137
138 /// Number of continuum points (mirrors `ncontinuum`).
139 #[must_use]
140 pub fn ncontinuum(&self) -> u32 {
141 self.ncontinuum
142 }
143
144 /// Append continuum points produced from one peer's tokens.
145 ///
146 /// # Examples
147 ///
148 /// ```
149 /// use dynomite::cluster::datacenter::{Continuum, Rack};
150 /// use dynomite::hashkit::DynToken;
151 /// let mut r = Rack::new("rack1".into(), "dc1".into());
152 /// r.add_peer_tokens(0, &[DynToken::from_u32(2), DynToken::from_u32(5)]);
153 /// assert_eq!(r.ncontinuum(), 2);
154 /// assert_eq!(r.nserver_continuum(), 2);
155 /// ```
156 pub fn add_peer_tokens(&mut self, peer_idx: u32, tokens: &[DynToken]) {
157 for tok in tokens {
158 self.continuums.push(Continuum::new(tok.clone(), peer_idx));
159 self.ncontinuum = self.ncontinuum.saturating_add(1);
160 self.nserver_continuum = self.nserver_continuum.saturating_add(1);
161 }
162 }
163
164 /// Sort the continuum by token (ascending). Callers must invoke
165 /// this once after a batch of [`Rack::add_peer_tokens`] calls so
166 /// that [`crate::cluster::vnode::dispatch`] can binary-search
167 /// the ring.
168 ///
169 /// # Examples
170 ///
171 /// ```
172 /// use dynomite::cluster::datacenter::Rack;
173 /// use dynomite::hashkit::DynToken;
174 /// let mut r = Rack::new("r".into(), "d".into());
175 /// r.add_peer_tokens(0, &[DynToken::from_u32(5)]);
176 /// r.add_peer_tokens(1, &[DynToken::from_u32(2)]);
177 /// r.sort_continuums();
178 /// assert_eq!(r.continuums()[0].peer_idx, 1);
179 /// ```
180 pub fn sort_continuums(&mut self) {
181 self.continuums.sort_by(|a, b| a.token.cmp(&b.token));
182 }
183
184 /// Reset all continuum state for a fresh rebuild.
185 pub fn clear_continuums(&mut self) {
186 self.continuums.clear();
187 self.ncontinuum = 0;
188 self.nserver_continuum = 0;
189 self.ring = RackRing::Continuum;
190 }
191
192 /// Borrow the rack's ring representation.
193 #[must_use]
194 pub fn ring(&self) -> &RackRing {
195 &self.ring
196 }
197
198 /// Install a [`RandomSlices`] table on this rack. The
199 /// continuum stays populated so the shadow-distribution
200 /// path (and any operator-side dump) can still walk the
201 /// vnode view.
202 pub fn set_random_slices(&mut self, slices: RandomSlices) {
203 self.ring = RackRing::RandomSlicing(slices);
204 }
205
206 /// True when the rack's live distribution is random
207 /// slicing.
208 #[must_use]
209 pub fn is_random_slicing(&self) -> bool {
210 matches!(self.ring, RackRing::RandomSlicing(_))
211 }
212
213 /// Borrow the rack's [`RandomSlices`] table when one is
214 /// installed.
215 #[must_use]
216 pub fn random_slices(&self) -> Option<&RandomSlices> {
217 match &self.ring {
218 RackRing::RandomSlicing(s) => Some(s),
219 RackRing::Continuum => None,
220 }
221 }
222}
223
224/// One datacenter.
225///
226/// Mirrors `struct datacenter`. The
227/// `preselected_rack_for_replication` field is computed by
228/// [`crate::cluster::pool::ServerPool::preselect_remote_racks`]
229/// and reproduces the reference engine's strategy of choosing one
230/// rack per remote DC for cross-DC writes.
231#[derive(Clone, Debug)]
232pub struct Datacenter {
233 name: String,
234 racks: Vec<Rack>,
235 preselected_rack_for_replication: Option<usize>,
236}
237
238impl Datacenter {
239 /// Build an empty datacenter.
240 ///
241 /// # Examples
242 ///
243 /// ```
244 /// use dynomite::cluster::datacenter::Datacenter;
245 /// let dc = Datacenter::new("dc1".into());
246 /// assert_eq!(dc.name(), "dc1");
247 /// ```
248 #[must_use]
249 pub fn new(name: String) -> Self {
250 Self {
251 name,
252 racks: Vec::new(),
253 preselected_rack_for_replication: None,
254 }
255 }
256
257 /// Datacenter name.
258 #[must_use]
259 pub fn name(&self) -> &str {
260 &self.name
261 }
262
263 /// Borrow the rack list.
264 #[must_use]
265 pub fn racks(&self) -> &[Rack] {
266 &self.racks
267 }
268
269 /// Mutable rack list.
270 pub fn racks_mut(&mut self) -> &mut [Rack] {
271 &mut self.racks
272 }
273
274 /// Find a rack by name.
275 #[must_use]
276 pub fn rack(&self, name: &str) -> Option<&Rack> {
277 self.racks.iter().find(|r| r.name() == name)
278 }
279
280 /// Mutably find a rack by name.
281 pub fn rack_mut(&mut self, name: &str) -> Option<&mut Rack> {
282 self.racks.iter_mut().find(|r| r.name() == name)
283 }
284
285 /// Find a rack and return its index.
286 #[must_use]
287 pub fn rack_idx(&self, name: &str) -> Option<usize> {
288 self.racks.iter().position(|r| r.name() == name)
289 }
290
291 /// Insert a rack if absent; return a mutable handle to the
292 /// rack regardless. Mirrors `server_get_rack`.
293 pub fn upsert_rack(&mut self, name: String) -> &mut Rack {
294 if let Some(idx) = self.rack_idx(&name) {
295 return &mut self.racks[idx];
296 }
297 let dc = self.name.clone();
298 self.racks.push(Rack::new(name, dc));
299 let last = self.racks.len() - 1;
300 &mut self.racks[last]
301 }
302
303 /// Sort racks by name (ascending). Used by
304 /// [`crate::cluster::pool::ServerPool::preselect_remote_racks`]
305 /// and mirrors the `array_sort(&dc->racks, rack_name_cmp)` call
306 /// in `preselect_remote_rack_for_replication`.
307 pub fn sort_racks(&mut self) {
308 self.racks.sort_by(|a, b| a.name().cmp(b.name()));
309 }
310
311 /// Preselected rack index for replicating writes from another
312 /// DC into this DC.
313 #[must_use]
314 pub fn preselected_rack_idx(&self) -> Option<usize> {
315 self.preselected_rack_for_replication
316 }
317
318 /// Borrow the preselected rack, if any.
319 #[must_use]
320 pub fn preselected_rack(&self) -> Option<&Rack> {
321 self.preselected_rack_for_replication
322 .and_then(|i| self.racks.get(i))
323 }
324
325 /// Set the preselected rack index (used by the pool's
326 /// [`preselect_remote_racks`](crate::cluster::pool::ServerPool::preselect_remote_racks)
327 /// pass).
328 pub fn set_preselected_rack_idx(&mut self, idx: Option<usize>) {
329 self.preselected_rack_for_replication = idx;
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336
337 #[test]
338 fn upsert_is_idempotent() {
339 let mut dc = Datacenter::new("dc1".into());
340 dc.upsert_rack("r1".into());
341 dc.upsert_rack("r1".into());
342 assert_eq!(dc.racks().len(), 1);
343 }
344
345 #[test]
346 fn rack_continuum_sorts_by_token() {
347 let mut r = Rack::new("r".into(), "d".into());
348 r.add_peer_tokens(0, &[DynToken::from_u32(9)]);
349 r.add_peer_tokens(1, &[DynToken::from_u32(3)]);
350 r.add_peer_tokens(2, &[DynToken::from_u32(6)]);
351 r.sort_continuums();
352 let idxs: Vec<u32> = r.continuums().iter().map(|c| c.peer_idx).collect();
353 assert_eq!(idxs, vec![1, 2, 0]);
354 }
355
356 #[test]
357 fn rack_clear_resets_counters() {
358 let mut r = Rack::new("r".into(), "d".into());
359 r.add_peer_tokens(0, &[DynToken::from_u32(1)]);
360 r.clear_continuums();
361 assert_eq!(r.ncontinuum(), 0);
362 assert!(r.continuums().is_empty());
363 }
364
365 #[test]
366 fn sort_racks_alphabetical() {
367 let mut dc = Datacenter::new("dc1".into());
368 dc.upsert_rack("rb".into());
369 dc.upsert_rack("ra".into());
370 dc.sort_racks();
371 assert_eq!(dc.racks()[0].name(), "ra");
372 }
373}