Skip to main content

dynomite/cluster/
apl.rs

1//! Active preference list (APL) annotations for the vnode walker.
2//!
3//! Riak's `riak_core_apl:get_apl_ann/3` returns a list of
4//! `{Index, Node, Type}` triples where `Type` is `primary` (the
5//! canonical owner of the partition) or `fallback` (a node that
6//! has stepped in for a downed primary). The annotation is what
7//! lets a get / put coordinator distinguish "I have R primary
8//! responses" (the happy path) from "I have R responses but two
9//! of them came from fallbacks" (which should trigger more
10//! aggressive read-repair / handoff).
11//!
12//! This module ports that idea on top of the existing token ring.
13//! It walks a [`ClusterState`] forward from the key token,
14//! collects the canonical N successors (the *preflist*), then
15//! annotates each slot:
16//!
17//! * if the canonical owner is alive, the slot is a
18//!   [`NodeRole::Primary`];
19//! * otherwise the next alive distinct peer further in the walk
20//!   takes the slot as a [`NodeRole::Fallback`].
21//!
22//! When the cluster has fewer alive distinct peers than `n`, the
23//! walker returns a partial list rather than blocking; the
24//! coordinator decides what to do with an under-provisioned APL.
25//!
26//! The module is deliberately data-only: it operates on a
27//! [`ClusterState`] view that production wiring constructs from
28//! the live [`crate::cluster::pool::ServerPool`] and the phi-accrual
29//! [`crate::cluster::failure_detector`] state. Tests can build a
30//! `ClusterState` directly without spinning up a pool.
31//!
32//! # Examples
33//!
34//! ```
35//! use dynomite::cluster::apl::{get_apl_ann, ClusterState, NodeRole, RingPoint};
36//!
37//! // 4-peer ring at tokens 100/200/300/400.
38//! let ring = vec![
39//!     RingPoint::new(100, 0),
40//!     RingPoint::new(200, 1),
41//!     RingPoint::new(300, 2),
42//!     RingPoint::new(400, 3),
43//! ];
44//! let cluster = ClusterState::new(ring, [0u32, 1, 2, 3].into_iter().collect());
45//! let apl = get_apl_ann(&cluster, 50, 3);
46//! assert_eq!(apl.len(), 3);
47//! assert!(apl.iter().all(|p| p.role == NodeRole::Primary));
48//! ```
49
50use std::collections::HashSet;
51
52use crate::embed::events::PeerId;
53
54/// Identifier for a vnode slot on the ring.
55///
56/// In this engine the ring is a flat continuum of `(token, peer)`
57/// points; a vnode is identified by its index into that
58/// continuum. The annotated walker reports the index of the entry
59/// from which a peer was selected so the coordinator can correlate
60/// fallbacks back to the primary slot they cover.
61pub type VnodeId = u32;
62
63/// Whether an annotated peer is the canonical owner of its slot
64/// (`Primary`) or a stand-in selected because the canonical owner
65/// is currently down (`Fallback`).
66///
67/// # Examples
68///
69/// ```
70/// use dynomite::cluster::apl::NodeRole;
71/// assert_ne!(NodeRole::Primary, NodeRole::Fallback);
72/// ```
73#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
74pub enum NodeRole {
75    /// Canonical ring owner of the slot.
76    Primary,
77    /// Stand-in chosen because the canonical owner is down.
78    Fallback,
79}
80
81/// One annotated entry in the active preference list.
82#[derive(Clone, Debug, PartialEq, Eq)]
83pub struct AnnotatedPeer {
84    /// Peer that will receive the operation.
85    pub peer_id: PeerId,
86    /// Vnode (continuum index) the peer covers in this slot.
87    /// For a `Primary` this is the canonical owner's continuum
88    /// position. For a `Fallback` it is the position of the
89    /// fallback peer's first ring entry encountered during the
90    /// walk (which is always at or beyond the slot the fallback
91    /// is covering).
92    pub vnode: VnodeId,
93    /// Whether this is a canonical owner (`Primary`) or a
94    /// stand-in (`Fallback`).
95    pub role: NodeRole,
96}
97
98/// One token-ring point: a `(token, peer)` mapping.
99///
100/// The walker uses a `u64` token to keep the ring math
101/// transport-agnostic; bridge code converts the engine's
102/// [`crate::hashkit::DynToken`] continuum to this shape.
103#[derive(Clone, Copy, Debug, PartialEq, Eq)]
104pub struct RingPoint {
105    /// Token at this ring position.
106    pub token: u64,
107    /// Peer that owns the token.
108    pub peer_id: PeerId,
109}
110
111impl RingPoint {
112    /// Construct a ring point.
113    ///
114    /// # Examples
115    ///
116    /// ```
117    /// use dynomite::cluster::apl::RingPoint;
118    /// let p = RingPoint::new(100, 7);
119    /// assert_eq!(p.token, 100);
120    /// assert_eq!(p.peer_id, 7);
121    /// ```
122    #[must_use]
123    pub fn new(token: u64, peer_id: PeerId) -> Self {
124        Self { token, peer_id }
125    }
126}
127
128/// Decoupled view of cluster topology + liveness.
129///
130/// Carries enough state for the APL walker to compute an
131/// annotated preference list without consulting the live pool:
132///
133/// * a sorted-by-token continuum of `(token, peer_id)` ring
134///   points (one entry per vnode, may have several entries per
135///   peer);
136/// * the set of peers currently alive according to the failure
137///   detector.
138///
139/// The ring is held as-passed; the constructor sorts it by token.
140#[derive(Clone, Debug)]
141pub struct ClusterState {
142    ring: Vec<RingPoint>,
143    alive: HashSet<PeerId>,
144}
145
146impl ClusterState {
147    /// Build a [`ClusterState`] from a ring and a liveness set.
148    ///
149    /// The ring is sorted by token (ties broken by peer id) so
150    /// callers can pass any ordering without thinking about it.
151    /// Empty rings are accepted; `get_apl_ann` will return an
152    /// empty vector for them.
153    ///
154    /// # Examples
155    ///
156    /// ```
157    /// use dynomite::cluster::apl::{ClusterState, RingPoint};
158    /// let cs = ClusterState::new(
159    ///     vec![RingPoint::new(2, 1), RingPoint::new(1, 0)],
160    ///     [0u32, 1].into_iter().collect(),
161    /// );
162    /// assert_eq!(cs.ring()[0].token, 1);
163    /// ```
164    #[must_use]
165    pub fn new(mut ring: Vec<RingPoint>, alive: HashSet<PeerId>) -> Self {
166        ring.sort_by(|a, b| a.token.cmp(&b.token).then(a.peer_id.cmp(&b.peer_id)));
167        Self { ring, alive }
168    }
169
170    /// Read-only view of the sorted ring.
171    #[must_use]
172    pub fn ring(&self) -> &[RingPoint] {
173        &self.ring
174    }
175
176    /// True when `peer_id` is in the alive set.
177    ///
178    /// # Examples
179    ///
180    /// ```
181    /// use dynomite::cluster::apl::{ClusterState, RingPoint};
182    /// let cs = ClusterState::new(
183    ///     vec![RingPoint::new(1, 0)],
184    ///     [0u32].into_iter().collect(),
185    /// );
186    /// assert!(cs.is_alive(0));
187    /// assert!(!cs.is_alive(99));
188    /// ```
189    #[must_use]
190    pub fn is_alive(&self, peer_id: PeerId) -> bool {
191        self.alive.contains(&peer_id)
192    }
193}
194
195/// Walk N successors of `key_token` on the ring, deduplicating
196/// by peer id.
197///
198/// Returns up to `n` `(vnode_index, peer_id)` pairs in walk order.
199/// This is the canonical preflist: every entry is the first ring
200/// occurrence of a distinct peer encountered while walking
201/// forward from `key_token`. Liveness is *not* consulted; this is
202/// the input the APL annotator works on.
203///
204/// Returns an empty vector when the ring is empty or `n == 0`.
205///
206/// # Examples
207///
208/// ```
209/// use dynomite::cluster::apl::{walk_n_successors, ClusterState, RingPoint};
210/// let cs = ClusterState::new(
211///     vec![RingPoint::new(10, 0), RingPoint::new(20, 1), RingPoint::new(30, 2)],
212///     [0u32, 1, 2].into_iter().collect(),
213/// );
214/// let pre = walk_n_successors(&cs, 15, 2);
215/// assert_eq!(pre.iter().map(|p| p.1).collect::<Vec<_>>(), vec![1, 2]);
216/// ```
217#[must_use]
218pub fn walk_n_successors(
219    cluster: &ClusterState,
220    key_token: u64,
221    n: usize,
222) -> Vec<(VnodeId, PeerId)> {
223    let ring = cluster.ring();
224    if ring.is_empty() || n == 0 {
225        return Vec::new();
226    }
227    let start = primary_index(ring, key_token);
228    let mut out: Vec<(VnodeId, PeerId)> = Vec::with_capacity(n);
229    let len = ring.len();
230    for step in 0..len {
231        if out.len() >= n {
232            break;
233        }
234        let idx = (start + step) % len;
235        let pt = &ring[idx];
236        if out.iter().any(|(_, pid)| *pid == pt.peer_id) {
237            continue;
238        }
239        let vnode = u32::try_from(idx).unwrap_or(u32::MAX);
240        out.push((vnode, pt.peer_id));
241    }
242    out
243}
244
245/// Compute the annotated active preference list for `key_token`.
246///
247/// Returns up to `n` [`AnnotatedPeer`] entries in slot order:
248///
249/// * For each canonical primary in the first-`n` walk (see
250///   [`walk_n_successors`]): if the peer is alive, the slot is
251///   filled by that peer with [`NodeRole::Primary`].
252/// * If the canonical primary is down, the walker continues
253///   beyond the canonical slice and picks the next alive peer
254///   that is not already in the result; the slot is filled with
255///   [`NodeRole::Fallback`].
256/// * If the walker runs out of alive distinct peers, the result
257///   is shorter than `n`; the caller decides whether that is
258///   acceptable for the requested consistency level.
259///
260/// # Properties
261///
262/// * `primaries(&apl).len() + fallbacks(&apl).len() == apl.len()`.
263/// * `primaries(&apl)` is a subset (by peer id) of the canonical
264///   `walk_n_successors(cluster, key_token, n)` preflist.
265/// * `apl.len() <= n` and `apl.len() <= alive_distinct_peers`.
266/// * Every peer id in `apl` is unique.
267///
268/// # Examples
269///
270/// ```
271/// use dynomite::cluster::apl::{get_apl_ann, ClusterState, NodeRole, RingPoint};
272/// let cs = ClusterState::new(
273///     vec![
274///         RingPoint::new(100, 0),
275///         RingPoint::new(200, 1),
276///         RingPoint::new(300, 2),
277///         RingPoint::new(400, 3),
278///     ],
279///     // Peer 1 (the canonical second) is down.
280///     [0u32, 2, 3].into_iter().collect(),
281/// );
282/// let apl = get_apl_ann(&cs, 50, 3);
283/// assert_eq!(apl.len(), 3);
284/// assert_eq!(apl[0].peer_id, 0);
285/// assert_eq!(apl[0].role, NodeRole::Primary);
286/// assert_eq!(apl[1].peer_id, 3);
287/// assert_eq!(apl[1].role, NodeRole::Fallback);
288/// assert_eq!(apl[2].peer_id, 2);
289/// assert_eq!(apl[2].role, NodeRole::Primary);
290/// ```
291#[must_use]
292pub fn get_apl_ann(cluster: &ClusterState, key_token: u64, n: usize) -> Vec<AnnotatedPeer> {
293    let ring = cluster.ring();
294    if ring.is_empty() || n == 0 {
295        return Vec::new();
296    }
297
298    // Build the full distinct-peer walk starting at key_token.
299    let start = primary_index(ring, key_token);
300    let len = ring.len();
301    let mut walk: Vec<(VnodeId, PeerId)> = Vec::new();
302    for step in 0..len {
303        let idx = (start + step) % len;
304        let pid = ring[idx].peer_id;
305        if walk.iter().any(|(_, p)| *p == pid) {
306            continue;
307        }
308        let vnode = u32::try_from(idx).unwrap_or(u32::MAX);
309        walk.push((vnode, pid));
310    }
311
312    let canonical_len = walk.len().min(n);
313    let mut result: Vec<AnnotatedPeer> = Vec::with_capacity(canonical_len);
314    let mut next_fallback = canonical_len;
315
316    for slot in 0..canonical_len {
317        let (vnode, peer_id) = walk[slot];
318        if cluster.is_alive(peer_id) {
319            result.push(AnnotatedPeer {
320                peer_id,
321                vnode,
322                role: NodeRole::Primary,
323            });
324            continue;
325        }
326        // Canonical peer is down: pull the next alive peer from
327        // the tail of the walk. Each fallback is consumed at most
328        // once; if the tail runs dry the slot is dropped, but we
329        // keep iterating so a later canonical that *is* alive
330        // can still take its slot as Primary.
331        while next_fallback < walk.len() {
332            let (fb_vnode, fb_peer) = walk[next_fallback];
333            next_fallback += 1;
334            if !cluster.is_alive(fb_peer) {
335                continue;
336            }
337            // The walk is already deduplicated, so `fb_peer` is
338            // not in `result` as long as we have not previously
339            // promoted the same canonical id. The dedup invariant
340            // makes the explicit not-in-result check redundant,
341            // but we keep it as a defensive guard for future
342            // refactors.
343            if result.iter().any(|p| p.peer_id == fb_peer) {
344                continue;
345            }
346            result.push(AnnotatedPeer {
347                peer_id: fb_peer,
348                vnode: fb_vnode,
349                role: NodeRole::Fallback,
350            });
351            break;
352        }
353    }
354
355    result
356}
357
358/// Filter `annotated` to just the primary slots.
359///
360/// # Examples
361///
362/// ```
363/// use dynomite::cluster::apl::{primaries, AnnotatedPeer, NodeRole};
364/// let apl = vec![
365///     AnnotatedPeer { peer_id: 0, vnode: 0, role: NodeRole::Primary },
366///     AnnotatedPeer { peer_id: 1, vnode: 1, role: NodeRole::Fallback },
367/// ];
368/// assert_eq!(primaries(&apl).len(), 1);
369/// ```
370#[must_use]
371pub fn primaries(annotated: &[AnnotatedPeer]) -> Vec<&AnnotatedPeer> {
372    annotated
373        .iter()
374        .filter(|p| p.role == NodeRole::Primary)
375        .collect()
376}
377
378/// Filter `annotated` to just the fallback slots.
379///
380/// # Examples
381///
382/// ```
383/// use dynomite::cluster::apl::{fallbacks, AnnotatedPeer, NodeRole};
384/// let apl = vec![
385///     AnnotatedPeer { peer_id: 0, vnode: 0, role: NodeRole::Primary },
386///     AnnotatedPeer { peer_id: 1, vnode: 1, role: NodeRole::Fallback },
387/// ];
388/// assert_eq!(fallbacks(&apl).len(), 1);
389/// ```
390#[must_use]
391pub fn fallbacks(annotated: &[AnnotatedPeer]) -> Vec<&AnnotatedPeer> {
392    annotated
393        .iter()
394        .filter(|p| p.role == NodeRole::Fallback)
395        .collect()
396}
397
398/// Find the index into `ring` that owns `key_token`: the smallest
399/// entry whose token is greater than or equal to `key_token`,
400/// wrapping to entry 0 when the key is greater than every token.
401///
402/// Mirrors the upper-bound wraparound semantics of
403/// [`crate::cluster::vnode::dispatch`] but operates on a `u64`
404/// token slice for transport-agnostic ring math.
405fn primary_index(ring: &[RingPoint], key_token: u64) -> usize {
406    debug_assert!(!ring.is_empty(), "primary_index requires a non-empty ring");
407    match ring.binary_search_by_key(&key_token, |p| p.token) {
408        Ok(i) => i,
409        Err(i) => {
410            if i >= ring.len() {
411                0
412            } else {
413                i
414            }
415        }
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    fn ring(pairs: &[(u64, PeerId)]) -> Vec<RingPoint> {
424        pairs.iter().map(|&(t, p)| RingPoint::new(t, p)).collect()
425    }
426
427    #[test]
428    fn empty_ring_returns_empty_apl() {
429        let cs = ClusterState::new(Vec::new(), HashSet::new());
430        assert!(get_apl_ann(&cs, 100, 3).is_empty());
431        assert!(walk_n_successors(&cs, 100, 3).is_empty());
432    }
433
434    #[test]
435    fn n_zero_returns_empty_apl() {
436        let cs = ClusterState::new(ring(&[(10, 0)]), [0u32].into_iter().collect());
437        assert!(get_apl_ann(&cs, 10, 0).is_empty());
438        assert!(walk_n_successors(&cs, 10, 0).is_empty());
439    }
440
441    #[test]
442    fn walk_wraps_on_overflow() {
443        let cs = ClusterState::new(
444            ring(&[(10, 0), (20, 1), (30, 2)]),
445            [0u32, 1, 2].into_iter().collect(),
446        );
447        // Key past the last token wraps to peer 0.
448        let pre = walk_n_successors(&cs, 50, 3);
449        assert_eq!(pre.iter().map(|p| p.1).collect::<Vec<_>>(), vec![0, 1, 2]);
450    }
451
452    #[test]
453    fn walk_dedups_peers_with_multiple_vnodes() {
454        // Peer 0 has two ring entries; peer 1 has one. With n=2
455        // we should still get two distinct peers.
456        let cs = ClusterState::new(
457            ring(&[(10, 0), (20, 0), (30, 1)]),
458            [0u32, 1].into_iter().collect(),
459        );
460        let pre = walk_n_successors(&cs, 0, 2);
461        assert_eq!(pre.iter().map(|p| p.1).collect::<Vec<_>>(), vec![0, 1]);
462    }
463
464    #[test]
465    fn primaries_subset_of_canonical_walk() {
466        let cs = ClusterState::new(
467            ring(&[(10, 0), (20, 1), (30, 2), (40, 3)]),
468            [0u32, 1, 2, 3].into_iter().collect(),
469        );
470        let canonical: Vec<PeerId> = walk_n_successors(&cs, 0, 3)
471            .into_iter()
472            .map(|p| p.1)
473            .collect();
474        let apl = get_apl_ann(&cs, 0, 3);
475        for entry in primaries(&apl) {
476            assert!(canonical.contains(&entry.peer_id));
477        }
478    }
479}