Skip to main content

dynomite/cluster/capability/
mod.rs

1//! Cluster-wide capability negotiation.
2//!
3//! Every node advertises a set of typed capabilities at the
4//! gossip handshake. For each capability name the cluster picks
5//! the lowest-common-denominator value: the highest local value
6//! that the peer also supports, falling back to a "floor" value
7//! when there is no overlap. The mechanism mirrors the design of
8//! `riak_core_capability` and is intended to let us land
9//! wire-format changes (e.g. dnode framing v2, AAE tree format
10//! v2) behind feature flags that flip on automatically once every
11//! peer in a mixed-version cluster reports support.
12//!
13//! The public surface is small:
14//!
15//! * [`Capability`] - the trait each capability implements; it
16//!   carries the typed value, the local supported set, and the
17//!   merge rule.
18//! * [`CapabilityRegistry`] - the per-node registry that owns
19//!   capability instances, generates the local advertisement, and
20//!   resolves negotiated values.
21//! * [`CapabilityAd`] - the wire-bound advertisement (a list of
22//!   `(name, supported_values)`).
23//! * [`NegotiatedCapabilities`] - the result of a single
24//!   negotiation, keyed by capability name.
25//!
26//! # Wire encoding
27//!
28//! [`CapabilityAd`] travels on the wire inside the dnode
29//! handshake (see [`crate::proto::dnode::Handshake`]). The
30//! encoding is a simple length-prefixed binary layout that uses
31//! only the standard library; no external codec is dragged in.
32//!
33//! # Examples
34//!
35//! ```
36//! use dynomite::cluster::capability::{Capability, CapabilityRegistry};
37//!
38//! struct Framing;
39//! impl Capability for Framing {
40//!     type Value = u32;
41//!     fn name(&self) -> &'static str { "framing" }
42//!     fn supported_values(&self) -> Vec<u32> { vec![1, 2] }
43//!     fn merge(&self, peer: &[u32]) -> Option<u32> {
44//!         self.supported_values()
45//!             .into_iter()
46//!             .filter(|v| peer.contains(v))
47//!             .max()
48//!     }
49//!     fn encode_value(&self, v: &u32) -> Vec<u8> { v.to_le_bytes().to_vec() }
50//!     fn decode_value(&self, b: &[u8]) -> Option<u32> {
51//!         <[u8; 4]>::try_from(b).ok().map(u32::from_le_bytes)
52//!     }
53//! }
54//!
55//! let mut reg = CapabilityRegistry::new();
56//! reg.register(Framing);
57//! let ad = reg.local_advertise();
58//! assert_eq!(ad.entries().len(), 1);
59//! ```
60
61mod negotiator;
62mod registry;
63
64pub use self::negotiator::NegotiatedCapabilities;
65pub use self::registry::{
66    Capability, CapabilityAd, CapabilityAdEntry, CapabilityCodecError, CapabilityRegistry,
67};
68
69/// Capability name shipped in the v0.0.1 registry.
70///
71/// The first real consumer of this name will be the dnode
72/// framing v2 work; today only v1 is implemented.
73pub const CAP_DNODE_FRAMING_VERSION: &str = "dnode_framing_version";
74
75/// Capability name for the active append-anti-entropy tree
76/// format. Reserved for future format upgrades; today only v1
77/// is implemented.
78pub const CAP_AAE_TREE_FORMAT: &str = "aae_tree_format";
79
80/// Capability name for the on-the-wire CRDT object format used
81/// by the entropy reconciliation path.
82pub const CAP_CRDT_OBJECT_FORMAT: &str = "crdt_object_format";
83
84/// Capability name for whether the peer accepts a dynamic phi
85/// threshold negotiated at runtime.
86pub const CAP_GOSSIP_PHI_NEGOTIABLE: &str = "gossip_phi_threshold_negotiable";
87
88/// Stock capability advertising the supported dnode framing
89/// versions on this build. Today the local set is `[1, 2]`; v1
90/// is the implemented framing and v2 is reserved for the next
91/// stage's wire upgrade.
92pub struct DnodeFramingVersion;
93
94impl Capability for DnodeFramingVersion {
95    type Value = u32;
96    fn name(&self) -> &'static str {
97        CAP_DNODE_FRAMING_VERSION
98    }
99    fn supported_values(&self) -> Vec<u32> {
100        vec![1, 2]
101    }
102    fn merge(&self, peer: &[u32]) -> Option<u32> {
103        self.supported_values()
104            .into_iter()
105            .filter(|v| peer.contains(v))
106            .max()
107    }
108    fn encode_value(&self, v: &u32) -> Vec<u8> {
109        v.to_le_bytes().to_vec()
110    }
111    fn decode_value(&self, b: &[u8]) -> Option<u32> {
112        <[u8; 4]>::try_from(b).ok().map(u32::from_le_bytes)
113    }
114}
115
116/// Stock capability advertising the supported AAE tree formats.
117pub struct AaeTreeFormat;
118
119impl Capability for AaeTreeFormat {
120    type Value = u32;
121    fn name(&self) -> &'static str {
122        CAP_AAE_TREE_FORMAT
123    }
124    fn supported_values(&self) -> Vec<u32> {
125        vec![1]
126    }
127    fn merge(&self, peer: &[u32]) -> Option<u32> {
128        self.supported_values()
129            .into_iter()
130            .filter(|v| peer.contains(v))
131            .max()
132    }
133    fn encode_value(&self, v: &u32) -> Vec<u8> {
134        v.to_le_bytes().to_vec()
135    }
136    fn decode_value(&self, b: &[u8]) -> Option<u32> {
137        <[u8; 4]>::try_from(b).ok().map(u32::from_le_bytes)
138    }
139}
140
141/// Stock capability advertising the supported CRDT object
142/// wire formats.
143pub struct CrdtObjectFormat;
144
145impl Capability for CrdtObjectFormat {
146    type Value = u32;
147    fn name(&self) -> &'static str {
148        CAP_CRDT_OBJECT_FORMAT
149    }
150    fn supported_values(&self) -> Vec<u32> {
151        vec![1]
152    }
153    fn merge(&self, peer: &[u32]) -> Option<u32> {
154        self.supported_values()
155            .into_iter()
156            .filter(|v| peer.contains(v))
157            .max()
158    }
159    fn encode_value(&self, v: &u32) -> Vec<u8> {
160        v.to_le_bytes().to_vec()
161    }
162    fn decode_value(&self, b: &[u8]) -> Option<u32> {
163        <[u8; 4]>::try_from(b).ok().map(u32::from_le_bytes)
164    }
165}
166
167/// Stock capability advertising whether this node will accept a
168/// dynamic phi threshold pushed by the cluster.
169pub struct GossipPhiNegotiable;
170
171impl Capability for GossipPhiNegotiable {
172    type Value = bool;
173    fn name(&self) -> &'static str {
174        CAP_GOSSIP_PHI_NEGOTIABLE
175    }
176    fn supported_values(&self) -> Vec<bool> {
177        // Lowest preference first: a peer that does not accept
178        // dynamic phi at all is the safe floor; `true` is the
179        // forward-looking value we prefer.
180        vec![false, true]
181    }
182    fn merge(&self, peer: &[bool]) -> Option<bool> {
183        // "Highest" common value: prefer `true` when both sides
184        // declare it; otherwise the only common entry is `false`.
185        if self.supported_values().contains(&true) && peer.contains(&true) {
186            Some(true)
187        } else if self.supported_values().contains(&false) && peer.contains(&false) {
188            Some(false)
189        } else {
190            None
191        }
192    }
193    fn encode_value(&self, v: &bool) -> Vec<u8> {
194        vec![u8::from(*v)]
195    }
196    fn decode_value(&self, b: &[u8]) -> Option<bool> {
197        match b {
198            [0] => Some(false),
199            [1] => Some(true),
200            _ => None,
201        }
202    }
203}
204
205/// Construct a registry pre-populated with the v0.0.1 stock
206/// capabilities: dnode framing version, AAE tree format, CRDT
207/// object format, and the dynamic-phi-threshold flag.
208///
209/// # Examples
210///
211/// ```
212/// use dynomite::cluster::capability::{default_registry, CAP_DNODE_FRAMING_VERSION};
213/// let reg = default_registry();
214/// let ad = reg.local_advertise();
215/// assert!(ad.entries().iter().any(|e| e.name() == CAP_DNODE_FRAMING_VERSION));
216/// ```
217#[must_use]
218pub fn default_registry() -> CapabilityRegistry {
219    let mut reg = CapabilityRegistry::new();
220    reg.register(DnodeFramingVersion);
221    reg.register(AaeTreeFormat);
222    reg.register(CrdtObjectFormat);
223    reg.register(GossipPhiNegotiable);
224    reg
225}