dynomite/cluster/vnode.rs
1//! Token ring math: building and querying per-rack continuums.
2//!
3//! The reference engine's `vnode_update` walks the pool's peer list,
4//! pushes each peer's tokens onto the owning rack's `continuum`
5//! array, then sorts the array per rack. The dispatcher then calls
6//! `vnode_dispatch(continuums, ncontinuum, token)` to find the peer
7//! that owns a key. The function uses a left-leaning binary search:
8//!
9//! * if the search token falls outside the ring (less than the first
10//! token or strictly greater than the last), wrap to the first
11//! continuum point;
12//! * otherwise, find the smallest continuum entry whose token is
13//! greater than or equal to the search token (mirrors
14//! `(a, b]` semantics from the reference).
15//!
16//! Both behaviours are reproduced verbatim by [`dispatch`] below.
17//!
18//! # Examples
19//!
20//! ```
21//! use dynomite::cluster::vnode::dispatch;
22//! use dynomite::cluster::datacenter::{Continuum, Rack};
23//! use dynomite::hashkit::DynToken;
24//!
25//! let mut r = Rack::new("r".into(), "d".into());
26//! r.add_peer_tokens(0, &[DynToken::from_u32(10)]);
27//! r.add_peer_tokens(1, &[DynToken::from_u32(20)]);
28//! r.add_peer_tokens(2, &[DynToken::from_u32(30)]);
29//! r.sort_continuums();
30//! assert_eq!(dispatch(r.continuums(), &DynToken::from_u32(15)), Some(1));
31//! assert_eq!(dispatch(r.continuums(), &DynToken::from_u32(35)), Some(0));
32//! ```
33
34use std::cmp::Ordering;
35
36use crate::cluster::datacenter::Continuum;
37use crate::hashkit::DynToken;
38
39/// Run the reference engine's `vnode_dispatch` over `continuums`.
40///
41/// Returns the peer index for the continuum point that owns
42/// `token`, or `None` when the slice is empty.
43///
44/// # Examples
45///
46/// ```
47/// use dynomite::cluster::vnode::dispatch;
48/// use dynomite::cluster::datacenter::Continuum;
49/// use dynomite::hashkit::DynToken;
50/// let cs: [Continuum; 0] = [];
51/// assert_eq!(dispatch(&cs, &DynToken::from_u32(0)), None);
52/// ```
53#[must_use]
54pub fn dispatch(continuums: &[Continuum], token: &DynToken) -> Option<u32> {
55 let n = continuums.len();
56 if n == 0 {
57 return None;
58 }
59 let first = &continuums[0];
60 let last = &continuums[n - 1];
61
62 // Wraparound: token greater than the largest continuum token, or
63 // less than or equal to the first one. Reference returns
64 // `left->index` in either case.
65 if last.token.cmp(token) == Ordering::Less {
66 return Some(first.peer_idx);
67 }
68 if first.token.cmp(token) != Ordering::Less {
69 return Some(first.peer_idx);
70 }
71
72 // Binary search for the smallest continuum entry with token >=
73 // search token. Mirrors the reference engine's `vnode_dispatch`.
74 let mut left = 0usize;
75 let mut right = n - 1;
76 while left < right {
77 let middle = left + (right - left) / 2;
78 match continuums[middle].token.cmp(token) {
79 Ordering::Equal => return Some(continuums[middle].peer_idx),
80 Ordering::Less => left = middle + 1,
81 Ordering::Greater => right = middle,
82 }
83 }
84 Some(continuums[right].peer_idx)
85}
86
87/// Per-peer token-list shape consumed by the rebuild pass.
88///
89/// `peer_idx` is the index into the pool's peer array; `tokens`
90/// is the token list for that peer. Mirrors the data shape
91/// `vnode_update` walks but decoupled from the live pool so the
92/// rebuild can be unit-tested.
93#[derive(Clone, Debug)]
94pub struct PeerTokens<'a> {
95 /// Peer index in the pool's peer array.
96 pub peer_idx: u32,
97 /// Datacenter name.
98 pub dc: &'a str,
99 /// Rack name.
100 pub rack: &'a str,
101 /// Peer's token list.
102 pub tokens: &'a [DynToken],
103}
104
105/// Walk a list of [`PeerTokens`] and append continuum entries to
106/// the matching rack inside `dcs`.
107///
108/// Caller is responsible for invoking
109/// [`crate::cluster::datacenter::Rack::sort_continuums`] on each
110/// touched rack once the rebuild is complete (this matches the
111/// reference engine's `vnode_rack_verify_continuum`).
112///
113/// Returns the count of peers actually applied (a peer whose
114/// `(dc, rack)` is missing from `dcs` is skipped, which mirrors
115/// the reference engine's behaviour of populating dc / rack tables
116/// before calling `vnode_update`).
117///
118/// # Examples
119///
120/// ```
121/// use dynomite::cluster::datacenter::Datacenter;
122/// use dynomite::cluster::vnode::{rebuild_continuums, PeerTokens};
123/// use dynomite::hashkit::DynToken;
124///
125/// let mut dc = Datacenter::new("d".into());
126/// dc.upsert_rack("r".into());
127/// let toks = [DynToken::from_u32(7)];
128/// let count = rebuild_continuums(
129/// &mut [dc],
130/// &[PeerTokens { peer_idx: 0, dc: "d", rack: "r", tokens: &toks }],
131/// );
132/// assert_eq!(count, 1);
133/// ```
134pub fn rebuild_continuums(
135 dcs: &mut [crate::cluster::datacenter::Datacenter],
136 peers: &[PeerTokens<'_>],
137) -> usize {
138 // First, clear every rack's continuum so the walk produces a
139 // deterministic result on each call.
140 for dc in dcs.iter_mut() {
141 for rack in dc.racks_mut().iter_mut() {
142 rack.clear_continuums();
143 }
144 }
145 let mut applied = 0usize;
146 let mut touched: Vec<(usize, usize)> = Vec::new();
147 for peer in peers {
148 let Some(dc_idx) = dcs.iter().position(|d| d.name() == peer.dc) else {
149 continue;
150 };
151 let Some(rack_idx) = dcs[dc_idx].rack_idx(peer.rack) else {
152 continue;
153 };
154 let dc = &mut dcs[dc_idx];
155 let rack = &mut dc.racks_mut()[rack_idx];
156 rack.add_peer_tokens(peer.peer_idx, peer.tokens);
157 applied += 1;
158 if !touched.contains(&(dc_idx, rack_idx)) {
159 touched.push((dc_idx, rack_idx));
160 }
161 }
162 for (di, ri) in touched {
163 dcs[di].racks_mut()[ri].sort_continuums();
164 }
165 applied
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use crate::cluster::datacenter::Datacenter;
172
173 fn ring(pairs: &[(u32, u32)]) -> Vec<Continuum> {
174 pairs
175 .iter()
176 .map(|&(idx, tok)| Continuum::new(DynToken::from_u32(tok), idx))
177 .collect()
178 }
179
180 #[test]
181 fn empty_ring_returns_none() {
182 let cs: [Continuum; 0] = [];
183 assert_eq!(dispatch(&cs, &DynToken::from_u32(0)), None);
184 }
185
186 #[test]
187 fn single_token_always_resolves() {
188 let cs = ring(&[(7, 100)]);
189 assert_eq!(dispatch(&cs, &DynToken::from_u32(0)), Some(7));
190 assert_eq!(dispatch(&cs, &DynToken::from_u32(100)), Some(7));
191 assert_eq!(dispatch(&cs, &DynToken::from_u32(101)), Some(7));
192 }
193
194 #[test]
195 fn dispatch_wraps_on_overflow() {
196 let cs = ring(&[(0, 10), (1, 20), (2, 30)]);
197 assert_eq!(dispatch(&cs, &DynToken::from_u32(35)), Some(0));
198 assert_eq!(dispatch(&cs, &DynToken::from_u32(0)), Some(0));
199 }
200
201 #[test]
202 fn dispatch_finds_upper_bound() {
203 let cs = ring(&[(0, 10), (1, 20), (2, 30)]);
204 assert_eq!(dispatch(&cs, &DynToken::from_u32(11)), Some(1));
205 assert_eq!(dispatch(&cs, &DynToken::from_u32(20)), Some(1));
206 assert_eq!(dispatch(&cs, &DynToken::from_u32(21)), Some(2));
207 assert_eq!(dispatch(&cs, &DynToken::from_u32(30)), Some(2));
208 }
209
210 #[test]
211 fn rebuild_skips_unknown_dc() {
212 let mut dc = Datacenter::new("d".into());
213 dc.upsert_rack("r".into());
214 let mut dcs = vec![dc];
215 let toks = [DynToken::from_u32(1)];
216 let known = PeerTokens {
217 peer_idx: 0,
218 dc: "d",
219 rack: "r",
220 tokens: &toks,
221 };
222 let unknown = PeerTokens {
223 peer_idx: 1,
224 dc: "ghost",
225 rack: "r",
226 tokens: &toks,
227 };
228 assert_eq!(rebuild_continuums(&mut dcs, &[known, unknown]), 1);
229 }
230
231 #[test]
232 fn rebuild_clears_before_repopulating() {
233 let mut dc = Datacenter::new("d".into());
234 dc.upsert_rack("r".into());
235 let mut dcs = vec![dc];
236 let toks = [DynToken::from_u32(1)];
237 let p = PeerTokens {
238 peer_idx: 0,
239 dc: "d",
240 rack: "r",
241 tokens: &toks,
242 };
243 rebuild_continuums(&mut dcs, std::slice::from_ref(&p));
244 rebuild_continuums(&mut dcs, &[p]);
245 assert_eq!(dcs[0].racks()[0].ncontinuum(), 1);
246 }
247}