dynomite/cluster/apl.rs
1//! Active preference list (APL) annotations for the vnode walker.
2//!
3//! Riak's `riak_core_apl:get_apl_ann/3` returns a list of
4//! `{Index, Node, Type}` triples where `Type` is `primary` (the
5//! canonical owner of the partition) or `fallback` (a node that
6//! has stepped in for a downed primary). The annotation is what
7//! lets a get / put coordinator distinguish "I have R primary
8//! responses" (the happy path) from "I have R responses but two
9//! of them came from fallbacks" (which should trigger more
10//! aggressive read-repair / handoff).
11//!
12//! This module ports that idea on top of the existing token ring.
13//! It walks a [`ClusterState`] forward from the key token,
14//! collects the canonical N successors (the *preflist*), then
15//! annotates each slot:
16//!
17//! * if the canonical owner is alive, the slot is a
18//! [`NodeRole::Primary`];
19//! * otherwise the next alive distinct peer further in the walk
20//! takes the slot as a [`NodeRole::Fallback`].
21//!
22//! When the cluster has fewer alive distinct peers than `n`, the
23//! walker returns a partial list rather than blocking; the
24//! coordinator decides what to do with an under-provisioned APL.
25//!
26//! The module is deliberately data-only: it operates on a
27//! [`ClusterState`] view that production wiring constructs from
28//! the live [`crate::cluster::pool::ServerPool`] and the phi-accrual
29//! [`crate::cluster::failure_detector`] state. Tests can build a
30//! `ClusterState` directly without spinning up a pool.
31//!
32//! # Examples
33//!
34//! ```
35//! use dynomite::cluster::apl::{get_apl_ann, ClusterState, NodeRole, RingPoint};
36//!
37//! // 4-peer ring at tokens 100/200/300/400.
38//! let ring = vec![
39//! RingPoint::new(100, 0),
40//! RingPoint::new(200, 1),
41//! RingPoint::new(300, 2),
42//! RingPoint::new(400, 3),
43//! ];
44//! let cluster = ClusterState::new(ring, [0u32, 1, 2, 3].into_iter().collect());
45//! let apl = get_apl_ann(&cluster, 50, 3);
46//! assert_eq!(apl.len(), 3);
47//! assert!(apl.iter().all(|p| p.role == NodeRole::Primary));
48//! ```
49
50use std::collections::HashSet;
51
52use crate::embed::events::PeerId;
53
54/// Identifier for a vnode slot on the ring.
55///
56/// In this engine the ring is a flat continuum of `(token, peer)`
57/// points; a vnode is identified by its index into that
58/// continuum. The annotated walker reports the index of the entry
59/// from which a peer was selected so the coordinator can correlate
60/// fallbacks back to the primary slot they cover.
61pub type VnodeId = u32;
62
63/// Whether an annotated peer is the canonical owner of its slot
64/// (`Primary`) or a stand-in selected because the canonical owner
65/// is currently down (`Fallback`).
66///
67/// # Examples
68///
69/// ```
70/// use dynomite::cluster::apl::NodeRole;
71/// assert_ne!(NodeRole::Primary, NodeRole::Fallback);
72/// ```
73#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
74pub enum NodeRole {
75 /// Canonical ring owner of the slot.
76 Primary,
77 /// Stand-in chosen because the canonical owner is down.
78 Fallback,
79}
80
81/// One annotated entry in the active preference list.
82#[derive(Clone, Debug, PartialEq, Eq)]
83pub struct AnnotatedPeer {
84 /// Peer that will receive the operation.
85 pub peer_id: PeerId,
86 /// Vnode (continuum index) the peer covers in this slot.
87 /// For a `Primary` this is the canonical owner's continuum
88 /// position. For a `Fallback` it is the position of the
89 /// fallback peer's first ring entry encountered during the
90 /// walk (which is always at or beyond the slot the fallback
91 /// is covering).
92 pub vnode: VnodeId,
93 /// Whether this is a canonical owner (`Primary`) or a
94 /// stand-in (`Fallback`).
95 pub role: NodeRole,
96}
97
98/// One token-ring point: a `(token, peer)` mapping.
99///
100/// The walker uses a `u64` token to keep the ring math
101/// transport-agnostic; bridge code converts the engine's
102/// [`crate::hashkit::DynToken`] continuum to this shape.
103#[derive(Clone, Copy, Debug, PartialEq, Eq)]
104pub struct RingPoint {
105 /// Token at this ring position.
106 pub token: u64,
107 /// Peer that owns the token.
108 pub peer_id: PeerId,
109}
110
111impl RingPoint {
112 /// Construct a ring point.
113 ///
114 /// # Examples
115 ///
116 /// ```
117 /// use dynomite::cluster::apl::RingPoint;
118 /// let p = RingPoint::new(100, 7);
119 /// assert_eq!(p.token, 100);
120 /// assert_eq!(p.peer_id, 7);
121 /// ```
122 #[must_use]
123 pub fn new(token: u64, peer_id: PeerId) -> Self {
124 Self { token, peer_id }
125 }
126}
127
128/// Decoupled view of cluster topology + liveness.
129///
130/// Carries enough state for the APL walker to compute an
131/// annotated preference list without consulting the live pool:
132///
133/// * a sorted-by-token continuum of `(token, peer_id)` ring
134/// points (one entry per vnode, may have several entries per
135/// peer);
136/// * the set of peers currently alive according to the failure
137/// detector.
138///
139/// The ring is held as-passed; the constructor sorts it by token.
140#[derive(Clone, Debug)]
141pub struct ClusterState {
142 ring: Vec<RingPoint>,
143 alive: HashSet<PeerId>,
144}
145
146impl ClusterState {
147 /// Build a [`ClusterState`] from a ring and a liveness set.
148 ///
149 /// The ring is sorted by token (ties broken by peer id) so
150 /// callers can pass any ordering without thinking about it.
151 /// Empty rings are accepted; `get_apl_ann` will return an
152 /// empty vector for them.
153 ///
154 /// # Examples
155 ///
156 /// ```
157 /// use dynomite::cluster::apl::{ClusterState, RingPoint};
158 /// let cs = ClusterState::new(
159 /// vec![RingPoint::new(2, 1), RingPoint::new(1, 0)],
160 /// [0u32, 1].into_iter().collect(),
161 /// );
162 /// assert_eq!(cs.ring()[0].token, 1);
163 /// ```
164 #[must_use]
165 pub fn new(mut ring: Vec<RingPoint>, alive: HashSet<PeerId>) -> Self {
166 ring.sort_by(|a, b| a.token.cmp(&b.token).then(a.peer_id.cmp(&b.peer_id)));
167 Self { ring, alive }
168 }
169
170 /// Read-only view of the sorted ring.
171 #[must_use]
172 pub fn ring(&self) -> &[RingPoint] {
173 &self.ring
174 }
175
176 /// True when `peer_id` is in the alive set.
177 ///
178 /// # Examples
179 ///
180 /// ```
181 /// use dynomite::cluster::apl::{ClusterState, RingPoint};
182 /// let cs = ClusterState::new(
183 /// vec![RingPoint::new(1, 0)],
184 /// [0u32].into_iter().collect(),
185 /// );
186 /// assert!(cs.is_alive(0));
187 /// assert!(!cs.is_alive(99));
188 /// ```
189 #[must_use]
190 pub fn is_alive(&self, peer_id: PeerId) -> bool {
191 self.alive.contains(&peer_id)
192 }
193}
194
195/// Walk N successors of `key_token` on the ring, deduplicating
196/// by peer id.
197///
198/// Returns up to `n` `(vnode_index, peer_id)` pairs in walk order.
199/// This is the canonical preflist: every entry is the first ring
200/// occurrence of a distinct peer encountered while walking
201/// forward from `key_token`. Liveness is *not* consulted; this is
202/// the input the APL annotator works on.
203///
204/// Returns an empty vector when the ring is empty or `n == 0`.
205///
206/// # Examples
207///
208/// ```
209/// use dynomite::cluster::apl::{walk_n_successors, ClusterState, RingPoint};
210/// let cs = ClusterState::new(
211/// vec![RingPoint::new(10, 0), RingPoint::new(20, 1), RingPoint::new(30, 2)],
212/// [0u32, 1, 2].into_iter().collect(),
213/// );
214/// let pre = walk_n_successors(&cs, 15, 2);
215/// assert_eq!(pre.iter().map(|p| p.1).collect::<Vec<_>>(), vec![1, 2]);
216/// ```
217#[must_use]
218pub fn walk_n_successors(
219 cluster: &ClusterState,
220 key_token: u64,
221 n: usize,
222) -> Vec<(VnodeId, PeerId)> {
223 let ring = cluster.ring();
224 if ring.is_empty() || n == 0 {
225 return Vec::new();
226 }
227 let start = primary_index(ring, key_token);
228 let mut out: Vec<(VnodeId, PeerId)> = Vec::with_capacity(n);
229 let len = ring.len();
230 for step in 0..len {
231 if out.len() >= n {
232 break;
233 }
234 let idx = (start + step) % len;
235 let pt = &ring[idx];
236 if out.iter().any(|(_, pid)| *pid == pt.peer_id) {
237 continue;
238 }
239 let vnode = u32::try_from(idx).unwrap_or(u32::MAX);
240 out.push((vnode, pt.peer_id));
241 }
242 out
243}
244
245/// Compute the annotated active preference list for `key_token`.
246///
247/// Returns up to `n` [`AnnotatedPeer`] entries in slot order:
248///
249/// * For each canonical primary in the first-`n` walk (see
250/// [`walk_n_successors`]): if the peer is alive, the slot is
251/// filled by that peer with [`NodeRole::Primary`].
252/// * If the canonical primary is down, the walker continues
253/// beyond the canonical slice and picks the next alive peer
254/// that is not already in the result; the slot is filled with
255/// [`NodeRole::Fallback`].
256/// * If the walker runs out of alive distinct peers, the result
257/// is shorter than `n`; the caller decides whether that is
258/// acceptable for the requested consistency level.
259///
260/// # Properties
261///
262/// * `primaries(&apl).len() + fallbacks(&apl).len() == apl.len()`.
263/// * `primaries(&apl)` is a subset (by peer id) of the canonical
264/// `walk_n_successors(cluster, key_token, n)` preflist.
265/// * `apl.len() <= n` and `apl.len() <= alive_distinct_peers`.
266/// * Every peer id in `apl` is unique.
267///
268/// # Examples
269///
270/// ```
271/// use dynomite::cluster::apl::{get_apl_ann, ClusterState, NodeRole, RingPoint};
272/// let cs = ClusterState::new(
273/// vec![
274/// RingPoint::new(100, 0),
275/// RingPoint::new(200, 1),
276/// RingPoint::new(300, 2),
277/// RingPoint::new(400, 3),
278/// ],
279/// // Peer 1 (the canonical second) is down.
280/// [0u32, 2, 3].into_iter().collect(),
281/// );
282/// let apl = get_apl_ann(&cs, 50, 3);
283/// assert_eq!(apl.len(), 3);
284/// assert_eq!(apl[0].peer_id, 0);
285/// assert_eq!(apl[0].role, NodeRole::Primary);
286/// assert_eq!(apl[1].peer_id, 3);
287/// assert_eq!(apl[1].role, NodeRole::Fallback);
288/// assert_eq!(apl[2].peer_id, 2);
289/// assert_eq!(apl[2].role, NodeRole::Primary);
290/// ```
291#[must_use]
292pub fn get_apl_ann(cluster: &ClusterState, key_token: u64, n: usize) -> Vec<AnnotatedPeer> {
293 let ring = cluster.ring();
294 if ring.is_empty() || n == 0 {
295 return Vec::new();
296 }
297
298 // Build the full distinct-peer walk starting at key_token.
299 let start = primary_index(ring, key_token);
300 let len = ring.len();
301 let mut walk: Vec<(VnodeId, PeerId)> = Vec::new();
302 for step in 0..len {
303 let idx = (start + step) % len;
304 let pid = ring[idx].peer_id;
305 if walk.iter().any(|(_, p)| *p == pid) {
306 continue;
307 }
308 let vnode = u32::try_from(idx).unwrap_or(u32::MAX);
309 walk.push((vnode, pid));
310 }
311
312 let canonical_len = walk.len().min(n);
313 let mut result: Vec<AnnotatedPeer> = Vec::with_capacity(canonical_len);
314 let mut next_fallback = canonical_len;
315
316 for slot in 0..canonical_len {
317 let (vnode, peer_id) = walk[slot];
318 if cluster.is_alive(peer_id) {
319 result.push(AnnotatedPeer {
320 peer_id,
321 vnode,
322 role: NodeRole::Primary,
323 });
324 continue;
325 }
326 // Canonical peer is down: pull the next alive peer from
327 // the tail of the walk. Each fallback is consumed at most
328 // once; if the tail runs dry the slot is dropped, but we
329 // keep iterating so a later canonical that *is* alive
330 // can still take its slot as Primary.
331 while next_fallback < walk.len() {
332 let (fb_vnode, fb_peer) = walk[next_fallback];
333 next_fallback += 1;
334 if !cluster.is_alive(fb_peer) {
335 continue;
336 }
337 // The walk is already deduplicated, so `fb_peer` is
338 // not in `result` as long as we have not previously
339 // promoted the same canonical id. The dedup invariant
340 // makes the explicit not-in-result check redundant,
341 // but we keep it as a defensive guard for future
342 // refactors.
343 if result.iter().any(|p| p.peer_id == fb_peer) {
344 continue;
345 }
346 result.push(AnnotatedPeer {
347 peer_id: fb_peer,
348 vnode: fb_vnode,
349 role: NodeRole::Fallback,
350 });
351 break;
352 }
353 }
354
355 result
356}
357
358/// Filter `annotated` to just the primary slots.
359///
360/// # Examples
361///
362/// ```
363/// use dynomite::cluster::apl::{primaries, AnnotatedPeer, NodeRole};
364/// let apl = vec![
365/// AnnotatedPeer { peer_id: 0, vnode: 0, role: NodeRole::Primary },
366/// AnnotatedPeer { peer_id: 1, vnode: 1, role: NodeRole::Fallback },
367/// ];
368/// assert_eq!(primaries(&apl).len(), 1);
369/// ```
370#[must_use]
371pub fn primaries(annotated: &[AnnotatedPeer]) -> Vec<&AnnotatedPeer> {
372 annotated
373 .iter()
374 .filter(|p| p.role == NodeRole::Primary)
375 .collect()
376}
377
378/// Filter `annotated` to just the fallback slots.
379///
380/// # Examples
381///
382/// ```
383/// use dynomite::cluster::apl::{fallbacks, AnnotatedPeer, NodeRole};
384/// let apl = vec![
385/// AnnotatedPeer { peer_id: 0, vnode: 0, role: NodeRole::Primary },
386/// AnnotatedPeer { peer_id: 1, vnode: 1, role: NodeRole::Fallback },
387/// ];
388/// assert_eq!(fallbacks(&apl).len(), 1);
389/// ```
390#[must_use]
391pub fn fallbacks(annotated: &[AnnotatedPeer]) -> Vec<&AnnotatedPeer> {
392 annotated
393 .iter()
394 .filter(|p| p.role == NodeRole::Fallback)
395 .collect()
396}
397
398/// Find the index into `ring` that owns `key_token`: the smallest
399/// entry whose token is greater than or equal to `key_token`,
400/// wrapping to entry 0 when the key is greater than every token.
401///
402/// Mirrors the upper-bound wraparound semantics of
403/// [`crate::cluster::vnode::dispatch`] but operates on a `u64`
404/// token slice for transport-agnostic ring math.
405fn primary_index(ring: &[RingPoint], key_token: u64) -> usize {
406 debug_assert!(!ring.is_empty(), "primary_index requires a non-empty ring");
407 match ring.binary_search_by_key(&key_token, |p| p.token) {
408 Ok(i) => i,
409 Err(i) => {
410 if i >= ring.len() {
411 0
412 } else {
413 i
414 }
415 }
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 fn ring(pairs: &[(u64, PeerId)]) -> Vec<RingPoint> {
424 pairs.iter().map(|&(t, p)| RingPoint::new(t, p)).collect()
425 }
426
427 #[test]
428 fn empty_ring_returns_empty_apl() {
429 let cs = ClusterState::new(Vec::new(), HashSet::new());
430 assert!(get_apl_ann(&cs, 100, 3).is_empty());
431 assert!(walk_n_successors(&cs, 100, 3).is_empty());
432 }
433
434 #[test]
435 fn n_zero_returns_empty_apl() {
436 let cs = ClusterState::new(ring(&[(10, 0)]), [0u32].into_iter().collect());
437 assert!(get_apl_ann(&cs, 10, 0).is_empty());
438 assert!(walk_n_successors(&cs, 10, 0).is_empty());
439 }
440
441 #[test]
442 fn walk_wraps_on_overflow() {
443 let cs = ClusterState::new(
444 ring(&[(10, 0), (20, 1), (30, 2)]),
445 [0u32, 1, 2].into_iter().collect(),
446 );
447 // Key past the last token wraps to peer 0.
448 let pre = walk_n_successors(&cs, 50, 3);
449 assert_eq!(pre.iter().map(|p| p.1).collect::<Vec<_>>(), vec![0, 1, 2]);
450 }
451
452 #[test]
453 fn walk_dedups_peers_with_multiple_vnodes() {
454 // Peer 0 has two ring entries; peer 1 has one. With n=2
455 // we should still get two distinct peers.
456 let cs = ClusterState::new(
457 ring(&[(10, 0), (20, 0), (30, 1)]),
458 [0u32, 1].into_iter().collect(),
459 );
460 let pre = walk_n_successors(&cs, 0, 2);
461 assert_eq!(pre.iter().map(|p| p.1).collect::<Vec<_>>(), vec![0, 1]);
462 }
463
464 #[test]
465 fn primaries_subset_of_canonical_walk() {
466 let cs = ClusterState::new(
467 ring(&[(10, 0), (20, 1), (30, 2), (40, 3)]),
468 [0u32, 1, 2, 3].into_iter().collect(),
469 );
470 let canonical: Vec<PeerId> = walk_n_successors(&cs, 0, 3)
471 .into_iter()
472 .map(|p| p.1)
473 .collect();
474 let apl = get_apl_ann(&cs, 0, 3);
475 for entry in primaries(&apl) {
476 assert!(canonical.contains(&entry.peer_id));
477 }
478 }
479}