Skip to main content

reddb_server/cluster/
membership.rs

1//! Cluster member identity, the authorized-member catalog, and the resilient
2//! three-data-member baseline (issue #988, PRD #987, ADR 0030).
3//!
4//! This is the first vertical slice of multi-writer cluster membership. It
5//! defines *who is a cluster member* as control-plane state that is distinct
6//! from *which ranges a member owns or replicates* (the per-range roles in
7//! [`clustering`](../../../.red/context/clustering.md) and ADR 0045). A node
8//! has exactly one stable [cluster member identity]; range ownership is a
9//! separate, per-range role assigned later by the rebalancer.
10//!
11//! ## What lives here
12//!
13//! * [`ClusterId`] — the cluster's own stable identity. A candidate must
14//!   present the right cluster id to join; a peer that targets a different
15//!   cluster is rejected ([`super::join`]).
16//! * [`MemberKind`] — whether a member holds user data ([`MemberKind::Data`])
17//!   or is a vote-only witness ([`MemberKind::Witness`]). The resilient
18//!   multi-writer baseline counts **data** members; witnesses are not the
19//!   recommended baseline (glossary: *Voting member*).
20//! * [`ClusterMember`] — one authorized member: its [`NodeIdentity`], its
21//!   kind, and how many user ranges it currently holds. A freshly joined data
22//!   member holds **zero** ranges — joining never moves user ranges.
23//! * [`MembershipCatalog`] — the authorized-member set for one cluster. This
24//!   is the *only* set autodetect of health and topology is allowed to range
25//!   over: an arbitrary network peer that has not joined is not a member and
26//!   is not an autodetect candidate.
27//!
28//! The join handshake itself — authenticate against a seed, verify cluster
29//! identity, reject unknown/unauthorized peers, then admit and hand back the
30//! control-plane snapshot — lives in [`super::join`].
31//!
32//! Everything here is a pure data model with no I/O, so the whole membership
33//! and join story is exercised deterministically.
34
35use std::collections::BTreeMap;
36
37use super::identity::NodeIdentity;
38
39/// The resilient baseline for a multi-writer cluster, in **data** members.
40///
41/// The glossary fixes this: *"A resilient multi-writer cluster starts with
42/// three data members; witness members are not the recommended baseline for
43/// multi-writer clustering."* Three data members give a quorum of two that
44/// survives the loss of any single member without a witness.
45pub const RESILIENT_DATA_MEMBER_BASELINE: usize = 3;
46
47/// The cluster's own stable identity.
48///
49/// Every authorized member agrees on this value, and a join candidate must
50/// present it to be admitted (see [`super::join`]). It is what makes a
51/// "wrong-cluster" join detectable: a peer that authenticates fine but targets
52/// a *different* cluster is rejected, not merged in.
53#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
54pub struct ClusterId(String);
55
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct ClusterIdError;
58
59impl std::fmt::Display for ClusterIdError {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(f, "cluster id is empty")
62    }
63}
64
65impl std::error::Error for ClusterIdError {}
66
67impl ClusterId {
68    /// Build a cluster id from an operator-provisioned value. The value must
69    /// be non-empty; a blank cluster id would let any peer "match" by
70    /// presenting nothing.
71    pub fn new(value: impl AsRef<str>) -> Result<Self, ClusterIdError> {
72        let value = value.as_ref().trim();
73        if value.is_empty() {
74            return Err(ClusterIdError);
75        }
76        Ok(Self(value.to_string()))
77    }
78
79    pub fn as_str(&self) -> &str {
80        &self.0
81    }
82}
83
84impl std::fmt::Display for ClusterId {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        f.write_str(&self.0)
87    }
88}
89
90/// Whether a member holds user data or is a vote-only witness.
91///
92/// This mirrors the election-side `MemberKind` (a witness votes but never owns
93/// a range), but it is the *cluster-membership* view: it decides whether a
94/// member counts toward the resilient **data-member** baseline. A witness is a
95/// member, but it is not a data member, so it does not move the cluster toward
96/// [`RESILIENT_DATA_MEMBER_BASELINE`].
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum MemberKind {
99    /// Holds user data; can be a range owner for some ranges and a range
100    /// replica for others.
101    Data,
102    /// Control-plane only; stores no user data and is never a range owner.
103    Witness,
104}
105
106impl MemberKind {
107    /// Does this member kind store user data (and therefore count toward the
108    /// resilient multi-writer baseline)?
109    pub fn holds_data(self) -> bool {
110        matches!(self, MemberKind::Data)
111    }
112}
113
114/// A member's lifecycle state in the cluster (issue #1000, PRD #987).
115///
116/// A member is [`Active`](Self::Active) for its whole serving life; planned
117/// removal first marks it [`Draining`](Self::Draining) via
118/// [`MembershipCatalog::begin_drain`]. The distinction drives two rules of the
119/// cluster drain flow ([`super::drain`]): a draining member stops receiving new
120/// range placements, and its ranges are scheduled off it through ordinary
121/// ownership transitions before membership is finally removed. The state is
122/// *cluster-membership* lifecycle, separate from per-range health
123/// ([`HealthClass`](super::supervisor::HealthClass)): a draining member can be
124/// perfectly healthy, and an unhealthy member is not automatically draining.
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub enum MemberState {
127    /// Fully serving: may own and replicate ranges and receive new placements.
128    Active,
129    /// Marked for planned removal: holds its current ranges until they are moved
130    /// off, but receives **no** new range placements. The terminal state before
131    /// the member is removed from the catalog.
132    Draining,
133}
134
135impl MemberState {
136    /// Whether a member in this state may receive *new* range placements. Only an
137    /// [`Active`](Self::Active) member may; a draining member is excluded so drain
138    /// never has to chase ranges it just handed back. This is the "a draining
139    /// member stops receiving new range placements" rule.
140    pub fn accepts_new_placements(self) -> bool {
141        matches!(self, MemberState::Active)
142    }
143}
144
145/// One authorized cluster member.
146///
147/// The [`NodeIdentity`] is the member's stable cluster identity — the same
148/// validated X.509 subject it authenticates and votes under. `owned_range_count`
149/// is the *per-range* role count, kept deliberately separate: a member's
150/// cluster identity does not change when ranges move on or off it, and a
151/// freshly joined data member starts at zero.
152#[derive(Debug, Clone, PartialEq, Eq)]
153pub struct ClusterMember {
154    identity: NodeIdentity,
155    kind: MemberKind,
156    state: MemberState,
157    owned_range_count: usize,
158}
159
160impl ClusterMember {
161    /// A member as it exists immediately after a successful join: authorized,
162    /// [`Active`](MemberState::Active), of the granted kind, and holding **no**
163    /// user ranges. Ranges are only assigned later by rebalancing or ownership
164    /// transitions.
165    pub fn joined_empty(identity: NodeIdentity, kind: MemberKind) -> Self {
166        Self {
167            identity,
168            kind,
169            state: MemberState::Active,
170            owned_range_count: 0,
171        }
172    }
173
174    pub fn identity(&self) -> &NodeIdentity {
175        &self.identity
176    }
177
178    pub fn kind(&self) -> MemberKind {
179        self.kind
180    }
181
182    /// This member's lifecycle state ([`Active`](MemberState::Active) or
183    /// [`Draining`](MemberState::Draining)).
184    pub fn state(&self) -> MemberState {
185        self.state
186    }
187
188    /// Is this member draining (marked for planned removal)?
189    pub fn is_draining(&self) -> bool {
190        self.state == MemberState::Draining
191    }
192
193    /// Mark this member draining. Idempotent: re-marking a draining member is a
194    /// no-op. Returns whether the state changed (false if it was already
195    /// draining), so a caller can tell a fresh drain from a repeated request.
196    pub fn begin_drain(&mut self) -> bool {
197        let changed = self.state == MemberState::Active;
198        self.state = MemberState::Draining;
199        changed
200    }
201
202    /// Whether this member may receive *new* range placements: only an active
203    /// data member can. A witness never holds user data, and a draining member is
204    /// being emptied, so neither is a placement target.
205    pub fn is_placement_eligible(&self) -> bool {
206        self.kind.holds_data() && self.state.accepts_new_placements()
207    }
208
209    /// How many user ranges this member currently owns. Distinct from cluster
210    /// membership: a member with zero ranges is still a full member.
211    pub fn owned_range_count(&self) -> usize {
212        self.owned_range_count
213    }
214
215    /// Does this member currently hold any user ranges? A just-joined member
216    /// answers `false` until the rebalancer assigns ownership.
217    pub fn holds_user_ranges(&self) -> bool {
218        self.owned_range_count > 0
219    }
220
221    /// Record that the rebalancer/ownership transitions have assigned this many
222    /// user ranges to the member. This is the *only* path that gives a member
223    /// ranges — join never does.
224    pub fn assign_ranges(&mut self, count: usize) {
225        self.owned_range_count = count;
226    }
227}
228
229/// How a candidate compared against the authorized-member set on join.
230#[derive(Debug, Clone, Copy, PartialEq, Eq)]
231pub enum AdmissionOutcome {
232    /// The candidate was not previously a member and was admitted now.
233    Admitted,
234    /// The candidate was already an authorized member; the catalog is
235    /// unchanged (join is idempotent on reconnect).
236    AlreadyMember,
237}
238
239/// The authorized-member set for one cluster — the control-plane membership
240/// catalog.
241///
242/// Membership is explicit: a node appears here only after a successful join
243/// ([`super::join`]). Autodetect of health and topology ranges over
244/// [`autodetect_candidates`](Self::autodetect_candidates) — i.e. *these
245/// members only* — never over arbitrary peers that happen to be reachable on
246/// the network.
247#[derive(Debug, Clone)]
248pub struct MembershipCatalog {
249    cluster_id: ClusterId,
250    members: BTreeMap<NodeIdentity, ClusterMember>,
251}
252
253impl MembershipCatalog {
254    /// A catalog for `cluster_id` seeded with `founders`. The founding data
255    /// members are the bootstrap set that later candidates authenticate
256    /// against; each starts empty.
257    pub fn new(cluster_id: ClusterId, founders: impl IntoIterator<Item = ClusterMember>) -> Self {
258        let members = founders
259            .into_iter()
260            .map(|m| (m.identity().clone(), m))
261            .collect();
262        Self {
263            cluster_id,
264            members,
265        }
266    }
267
268    pub fn cluster_id(&self) -> &ClusterId {
269        &self.cluster_id
270    }
271
272    /// Is `identity` an authorized member of this cluster? This is the gate
273    /// every control-plane path consults — only an authorized member's health
274    /// and topology are autodetected, and only a member may vote or own ranges.
275    pub fn is_authorized(&self, identity: &NodeIdentity) -> bool {
276        self.members.contains_key(identity)
277    }
278
279    pub fn member(&self, identity: &NodeIdentity) -> Option<&ClusterMember> {
280        self.members.get(identity)
281    }
282
283    pub fn member_mut(&mut self, identity: &NodeIdentity) -> Option<&mut ClusterMember> {
284        self.members.get_mut(identity)
285    }
286
287    /// Admit `member` as authorized. Idempotent: re-admitting an existing
288    /// member leaves the catalog (and the member's range count) untouched, so
289    /// a reconnecting member never has its ranges reset to zero.
290    pub fn admit(&mut self, member: ClusterMember) -> AdmissionOutcome {
291        if self.members.contains_key(member.identity()) {
292            return AdmissionOutcome::AlreadyMember;
293        }
294        self.members.insert(member.identity().clone(), member);
295        AdmissionOutcome::Admitted
296    }
297
298    /// Mark an authorized member draining (planned-removal flow, issue #1000).
299    /// Returns `None` if `identity` is not a member, otherwise whether the state
300    /// changed (false if it was already draining). A draining member keeps its
301    /// ranges until drain moves them off, but is no longer a placement target.
302    pub fn begin_drain(&mut self, identity: &NodeIdentity) -> Option<bool> {
303        self.members
304            .get_mut(identity)
305            .map(ClusterMember::begin_drain)
306    }
307
308    /// Remove a member from the authorized set, returning the removed
309    /// [`ClusterMember`] (or `None` if it was not a member). This is the final
310    /// step of both the planned drain and the force-remove flows; callers gate it
311    /// on the range-dependency checks in [`super::drain`] — the catalog itself
312    /// does not re-check, so a force remove of a dead member can drop it even
313    /// while ranges still nominally list it.
314    pub fn remove(&mut self, identity: &NodeIdentity) -> Option<ClusterMember> {
315        self.members.remove(identity)
316    }
317
318    /// Every authorized member, in stable identity order.
319    pub fn members(&self) -> impl Iterator<Item = &ClusterMember> {
320        self.members.values()
321    }
322
323    /// The members eligible to receive *new* range placements — active data
324    /// members only, in stable identity order. Draining members and witnesses are
325    /// excluded, so a rebalancer or a drain's replica-evacuation never targets a
326    /// member that is itself on the way out.
327    pub fn placement_eligible_members(&self) -> impl Iterator<Item = &ClusterMember> {
328        self.members().filter(|m| m.is_placement_eligible())
329    }
330
331    /// The members autodetect of health/topology is allowed to range over —
332    /// exactly the authorized members. An arbitrary network peer that has not
333    /// joined is absent here, so autodetect can never silently adopt it.
334    pub fn autodetect_candidates(&self) -> impl Iterator<Item = &ClusterMember> {
335        self.members()
336    }
337
338    /// Whether autodetect may consider `identity`. True only for authorized
339    /// members — the rule that "autodetect applies only to authorized members
340    /// after join, not arbitrary network peers".
341    pub fn is_autodetect_eligible(&self, identity: &NodeIdentity) -> bool {
342        self.is_authorized(identity)
343    }
344
345    pub fn len(&self) -> usize {
346        self.members.len()
347    }
348
349    pub fn is_empty(&self) -> bool {
350        self.members.is_empty()
351    }
352
353    /// How many **data** members the cluster currently has (witnesses
354    /// excluded). This is the number the resilient baseline is measured in.
355    pub fn data_member_count(&self) -> usize {
356        self.members().filter(|m| m.kind().holds_data()).count()
357    }
358
359    /// Assess the cluster against the resilient multi-writer baseline of
360    /// [`RESILIENT_DATA_MEMBER_BASELINE`] data members.
361    pub fn assess_baseline(&self) -> BaselineAssessment {
362        BaselineAssessment::evaluate(self.data_member_count())
363    }
364}
365
366/// How the cluster's data-member count compares to the resilient baseline.
367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
368pub struct BaselineAssessment {
369    /// The configured resilient baseline ([`RESILIENT_DATA_MEMBER_BASELINE`]).
370    pub recommended_data_members: usize,
371    /// The cluster's current data-member count.
372    pub data_members: usize,
373}
374
375impl BaselineAssessment {
376    fn evaluate(data_members: usize) -> Self {
377        Self {
378            recommended_data_members: RESILIENT_DATA_MEMBER_BASELINE,
379            data_members,
380        }
381    }
382
383    /// Does the cluster meet (or exceed) the resilient multi-writer baseline?
384    pub fn meets_baseline(&self) -> bool {
385        self.data_members >= self.recommended_data_members
386    }
387
388    /// How many more data members are needed to reach the baseline (zero once
389    /// met).
390    pub fn shortfall(&self) -> usize {
391        self.recommended_data_members
392            .saturating_sub(self.data_members)
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399
400    fn ident(cn: &str) -> NodeIdentity {
401        NodeIdentity::from_certificate_subject(cn).unwrap()
402    }
403
404    fn data_member(cn: &str) -> ClusterMember {
405        ClusterMember::joined_empty(ident(cn), MemberKind::Data)
406    }
407
408    #[test]
409    fn cluster_id_rejects_empty() {
410        assert!(ClusterId::new("   ").is_err());
411        assert_eq!(ClusterId::new(" cluster-x ").unwrap().as_str(), "cluster-x");
412    }
413
414    #[test]
415    fn member_identity_is_distinct_from_range_ownership() {
416        // A member's cluster identity is stable; assigning/removing ranges is a
417        // separate per-range role and does not change membership.
418        let mut m = data_member("CN=node-a");
419        assert!(!m.holds_user_ranges());
420        assert_eq!(m.owned_range_count(), 0);
421
422        m.assign_ranges(4);
423        assert!(m.holds_user_ranges());
424        assert_eq!(m.identity(), &ident("CN=node-a")); // identity unchanged
425    }
426
427    #[test]
428    fn data_member_count_excludes_witnesses() {
429        let cid = ClusterId::new("cluster-x").unwrap();
430        let catalog = MembershipCatalog::new(
431            cid,
432            [
433                data_member("CN=node-a"),
434                data_member("CN=node-b"),
435                ClusterMember::joined_empty(ident("CN=witness"), MemberKind::Witness),
436            ],
437        );
438        assert_eq!(catalog.len(), 3);
439        assert_eq!(catalog.data_member_count(), 2);
440    }
441
442    #[test]
443    fn three_data_members_meet_resilient_baseline() {
444        let cid = ClusterId::new("cluster-x").unwrap();
445        let catalog = MembershipCatalog::new(
446            cid,
447            [
448                data_member("CN=node-a"),
449                data_member("CN=node-b"),
450                data_member("CN=node-c"),
451            ],
452        );
453        let baseline = catalog.assess_baseline();
454        assert_eq!(baseline.recommended_data_members, 3);
455        assert!(baseline.meets_baseline());
456        assert_eq!(baseline.shortfall(), 0);
457    }
458
459    #[test]
460    fn two_data_plus_witness_does_not_meet_baseline() {
461        // A witness is not the recommended baseline: 2 data + 1 witness is
462        // below the three-data-member baseline.
463        let cid = ClusterId::new("cluster-x").unwrap();
464        let catalog = MembershipCatalog::new(
465            cid,
466            [
467                data_member("CN=node-a"),
468                data_member("CN=node-b"),
469                ClusterMember::joined_empty(ident("CN=witness"), MemberKind::Witness),
470            ],
471        );
472        let baseline = catalog.assess_baseline();
473        assert!(!baseline.meets_baseline());
474        assert_eq!(baseline.shortfall(), 1);
475    }
476
477    #[test]
478    fn admit_is_idempotent_and_preserves_ranges() {
479        let cid = ClusterId::new("cluster-x").unwrap();
480        let mut catalog = MembershipCatalog::new(cid, [data_member("CN=node-a")]);
481        catalog
482            .member_mut(&ident("CN=node-a"))
483            .unwrap()
484            .assign_ranges(3);
485
486        // Re-admitting must not reset an existing member's range count.
487        let outcome = catalog.admit(data_member("CN=node-a"));
488        assert_eq!(outcome, AdmissionOutcome::AlreadyMember);
489        assert_eq!(
490            catalog
491                .member(&ident("CN=node-a"))
492                .unwrap()
493                .owned_range_count(),
494            3
495        );
496
497        let outcome = catalog.admit(data_member("CN=node-b"));
498        assert_eq!(outcome, AdmissionOutcome::Admitted);
499        assert_eq!(catalog.len(), 2);
500    }
501
502    #[test]
503    fn autodetect_is_limited_to_authorized_members() {
504        let cid = ClusterId::new("cluster-x").unwrap();
505        let catalog = MembershipCatalog::new(cid, [data_member("CN=node-a")]);
506
507        // An authorized member is an autodetect candidate.
508        assert!(catalog.is_autodetect_eligible(&ident("CN=node-a")));
509        // An arbitrary reachable network peer that never joined is not.
510        assert!(!catalog.is_autodetect_eligible(&ident("CN=random-peer")));
511        assert_eq!(catalog.autodetect_candidates().count(), 1);
512    }
513}