Skip to main content

dynomite/cluster/
vnode.rs

1//! Token ring math: building and querying per-rack continuums.
2//!
3//! The reference engine's `vnode_update` walks the pool's peer list,
4//! pushes each peer's tokens onto the owning rack's `continuum`
5//! array, then sorts the array per rack. The dispatcher then calls
6//! `vnode_dispatch(continuums, ncontinuum, token)` to find the peer
7//! that owns a key. The function uses a left-leaning binary search:
8//!
9//! * if the search token falls outside the ring (less than the first
10//!   token or strictly greater than the last), wrap to the first
11//!   continuum point;
12//! * otherwise, find the smallest continuum entry whose token is
13//!   greater than or equal to the search token (mirrors
14//!   `(a, b]` semantics from the reference).
15//!
16//! Both behaviours are reproduced verbatim by [`dispatch`] below.
17//!
18//! # Examples
19//!
20//! ```
21//! use dynomite::cluster::vnode::dispatch;
22//! use dynomite::cluster::datacenter::{Continuum, Rack};
23//! use dynomite::hashkit::DynToken;
24//!
25//! let mut r = Rack::new("r".into(), "d".into());
26//! r.add_peer_tokens(0, &[DynToken::from_u32(10)]);
27//! r.add_peer_tokens(1, &[DynToken::from_u32(20)]);
28//! r.add_peer_tokens(2, &[DynToken::from_u32(30)]);
29//! r.sort_continuums();
30//! assert_eq!(dispatch(r.continuums(), &DynToken::from_u32(15)), Some(1));
31//! assert_eq!(dispatch(r.continuums(), &DynToken::from_u32(35)), Some(0));
32//! ```
33
34use std::cmp::Ordering;
35
36use crate::cluster::datacenter::Continuum;
37use crate::hashkit::DynToken;
38
39/// Run the reference engine's `vnode_dispatch` over `continuums`.
40///
41/// Returns the peer index for the continuum point that owns
42/// `token`, or `None` when the slice is empty.
43///
44/// # Examples
45///
46/// ```
47/// use dynomite::cluster::vnode::dispatch;
48/// use dynomite::cluster::datacenter::Continuum;
49/// use dynomite::hashkit::DynToken;
50/// let cs: [Continuum; 0] = [];
51/// assert_eq!(dispatch(&cs, &DynToken::from_u32(0)), None);
52/// ```
53#[must_use]
54pub fn dispatch(continuums: &[Continuum], token: &DynToken) -> Option<u32> {
55    let n = continuums.len();
56    if n == 0 {
57        return None;
58    }
59    let first = &continuums[0];
60    let last = &continuums[n - 1];
61
62    // Wraparound: token greater than the largest continuum token, or
63    // less than or equal to the first one. Reference returns
64    // `left->index` in either case.
65    if last.token.cmp(token) == Ordering::Less {
66        return Some(first.peer_idx);
67    }
68    if first.token.cmp(token) != Ordering::Less {
69        return Some(first.peer_idx);
70    }
71
72    // Binary search for the smallest continuum entry with token >=
73    // search token. Mirrors the reference engine's `vnode_dispatch`.
74    let mut left = 0usize;
75    let mut right = n - 1;
76    while left < right {
77        let middle = left + (right - left) / 2;
78        match continuums[middle].token.cmp(token) {
79            Ordering::Equal => return Some(continuums[middle].peer_idx),
80            Ordering::Less => left = middle + 1,
81            Ordering::Greater => right = middle,
82        }
83    }
84    Some(continuums[right].peer_idx)
85}
86
87/// Per-peer token-list shape consumed by the rebuild pass.
88///
89/// `peer_idx` is the index into the pool's peer array; `tokens`
90/// is the token list for that peer. Mirrors the data shape
91/// `vnode_update` walks but decoupled from the live pool so the
92/// rebuild can be unit-tested.
93#[derive(Clone, Debug)]
94pub struct PeerTokens<'a> {
95    /// Peer index in the pool's peer array.
96    pub peer_idx: u32,
97    /// Datacenter name.
98    pub dc: &'a str,
99    /// Rack name.
100    pub rack: &'a str,
101    /// Peer's token list.
102    pub tokens: &'a [DynToken],
103}
104
105/// Walk a list of [`PeerTokens`] and append continuum entries to
106/// the matching rack inside `dcs`.
107///
108/// Caller is responsible for invoking
109/// [`crate::cluster::datacenter::Rack::sort_continuums`] on each
110/// touched rack once the rebuild is complete (this matches the
111/// reference engine's `vnode_rack_verify_continuum`).
112///
113/// Returns the count of peers actually applied (a peer whose
114/// `(dc, rack)` is missing from `dcs` is skipped, which mirrors
115/// the reference engine's behaviour of populating dc / rack tables
116/// before calling `vnode_update`).
117///
118/// # Examples
119///
120/// ```
121/// use dynomite::cluster::datacenter::Datacenter;
122/// use dynomite::cluster::vnode::{rebuild_continuums, PeerTokens};
123/// use dynomite::hashkit::DynToken;
124///
125/// let mut dc = Datacenter::new("d".into());
126/// dc.upsert_rack("r".into());
127/// let toks = [DynToken::from_u32(7)];
128/// let count = rebuild_continuums(
129///     &mut [dc],
130///     &[PeerTokens { peer_idx: 0, dc: "d", rack: "r", tokens: &toks }],
131/// );
132/// assert_eq!(count, 1);
133/// ```
134pub fn rebuild_continuums(
135    dcs: &mut [crate::cluster::datacenter::Datacenter],
136    peers: &[PeerTokens<'_>],
137) -> usize {
138    // First, clear every rack's continuum so the walk produces a
139    // deterministic result on each call.
140    for dc in dcs.iter_mut() {
141        for rack in dc.racks_mut().iter_mut() {
142            rack.clear_continuums();
143        }
144    }
145    let mut applied = 0usize;
146    let mut touched: Vec<(usize, usize)> = Vec::new();
147    for peer in peers {
148        let Some(dc_idx) = dcs.iter().position(|d| d.name() == peer.dc) else {
149            continue;
150        };
151        let Some(rack_idx) = dcs[dc_idx].rack_idx(peer.rack) else {
152            continue;
153        };
154        let dc = &mut dcs[dc_idx];
155        let rack = &mut dc.racks_mut()[rack_idx];
156        rack.add_peer_tokens(peer.peer_idx, peer.tokens);
157        applied += 1;
158        if !touched.contains(&(dc_idx, rack_idx)) {
159            touched.push((dc_idx, rack_idx));
160        }
161    }
162    for (di, ri) in touched {
163        dcs[di].racks_mut()[ri].sort_continuums();
164    }
165    applied
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use crate::cluster::datacenter::Datacenter;
172
173    fn ring(pairs: &[(u32, u32)]) -> Vec<Continuum> {
174        pairs
175            .iter()
176            .map(|&(idx, tok)| Continuum::new(DynToken::from_u32(tok), idx))
177            .collect()
178    }
179
180    #[test]
181    fn empty_ring_returns_none() {
182        let cs: [Continuum; 0] = [];
183        assert_eq!(dispatch(&cs, &DynToken::from_u32(0)), None);
184    }
185
186    #[test]
187    fn single_token_always_resolves() {
188        let cs = ring(&[(7, 100)]);
189        assert_eq!(dispatch(&cs, &DynToken::from_u32(0)), Some(7));
190        assert_eq!(dispatch(&cs, &DynToken::from_u32(100)), Some(7));
191        assert_eq!(dispatch(&cs, &DynToken::from_u32(101)), Some(7));
192    }
193
194    #[test]
195    fn dispatch_wraps_on_overflow() {
196        let cs = ring(&[(0, 10), (1, 20), (2, 30)]);
197        assert_eq!(dispatch(&cs, &DynToken::from_u32(35)), Some(0));
198        assert_eq!(dispatch(&cs, &DynToken::from_u32(0)), Some(0));
199    }
200
201    #[test]
202    fn dispatch_finds_upper_bound() {
203        let cs = ring(&[(0, 10), (1, 20), (2, 30)]);
204        assert_eq!(dispatch(&cs, &DynToken::from_u32(11)), Some(1));
205        assert_eq!(dispatch(&cs, &DynToken::from_u32(20)), Some(1));
206        assert_eq!(dispatch(&cs, &DynToken::from_u32(21)), Some(2));
207        assert_eq!(dispatch(&cs, &DynToken::from_u32(30)), Some(2));
208    }
209
210    #[test]
211    fn rebuild_skips_unknown_dc() {
212        let mut dc = Datacenter::new("d".into());
213        dc.upsert_rack("r".into());
214        let mut dcs = vec![dc];
215        let toks = [DynToken::from_u32(1)];
216        let known = PeerTokens {
217            peer_idx: 0,
218            dc: "d",
219            rack: "r",
220            tokens: &toks,
221        };
222        let unknown = PeerTokens {
223            peer_idx: 1,
224            dc: "ghost",
225            rack: "r",
226            tokens: &toks,
227        };
228        assert_eq!(rebuild_continuums(&mut dcs, &[known, unknown]), 1);
229    }
230
231    #[test]
232    fn rebuild_clears_before_repopulating() {
233        let mut dc = Datacenter::new("d".into());
234        dc.upsert_rack("r".into());
235        let mut dcs = vec![dc];
236        let toks = [DynToken::from_u32(1)];
237        let p = PeerTokens {
238            peer_idx: 0,
239            dc: "d",
240            rack: "r",
241            tokens: &toks,
242        };
243        rebuild_continuums(&mut dcs, std::slice::from_ref(&p));
244        rebuild_continuums(&mut dcs, &[p]);
245        assert_eq!(dcs[0].racks()[0].ncontinuum(), 1);
246    }
247}