Skip to main content

dynomite/cluster/
admin_rpc.rs

1//! Cluster admin RPC surface.
2//!
3//! This module defines the trait [`ClusterAdmin`] that the
4//! `dyniak` PBC server invokes when an operator drives one of
5//! the `cluster-list`, `cluster-join`, `cluster-leave`,
6//! `cluster-plan`, or `cluster-commit` admin commands. The
7//! reference engine pulls these mutations off a dedicated
8//! gossip-state DNODE message family; the Rust port surfaces
9//! them through this trait so the same staging-then-commit
10//! semantics can be exercised from tests and from the CLI
11//! without forcing a real DNODE round-trip.
12//!
13//! The default in-process implementation is
14//! [`PoolClusterAdmin`]: it owns an `Arc<ServerPool>` and a
15//! lock-protected list of staged [`ClusterChange`] entries.
16//! `cluster_join` and `cluster_leave` push a change onto the
17//! staging list and return the [`JoinPlan`] so the caller can
18//! print it; `cluster_commit` applies every staged change in
19//! order under the pool's existing peer / auto-eject
20//! `RwLock`s and rebuilds the token continuum on the way out.
21//!
22//! # Examples
23//!
24//! ```
25//! use std::net::SocketAddr;
26//! use std::sync::Arc;
27//!
28//! use dynomite::cluster::admin_rpc::{ClusterAdmin, PoolClusterAdmin};
29//! use dynomite::cluster::peer::{Peer, PeerEndpoint};
30//! use dynomite::cluster::pool::{PoolConfig, ServerPool};
31//! use dynomite::hashkit::DynToken;
32//!
33//! let cfg = PoolConfig {
34//!     dc: "dc1".into(),
35//!     rack: "r1".into(),
36//!     ..PoolConfig::default()
37//! };
38//! let local = Peer::new(
39//!     0, PeerEndpoint::tcp("127.0.0.1".into(), 8101), "r1".into(), "dc1".into(),
40//!     vec![DynToken::from_u32(0)], true, true, false,
41//! );
42//! let pool = Arc::new(ServerPool::new(cfg, vec![local]));
43//! let admin = PoolClusterAdmin::new(pool);
44//! assert_eq!(admin.list_peers().len(), 1);
45//! let target: SocketAddr = "127.0.0.1:8102".parse().unwrap();
46//! let plan = admin.cluster_join(target).unwrap();
47//! assert!(plan.change.peer.is_some());
48//! assert_eq!(admin.cluster_plan_pending().len(), 1);
49//! admin.cluster_commit().unwrap();
50//! assert_eq!(admin.list_peers().len(), 2);
51//! ```
52
53use std::net::SocketAddr;
54use std::sync::Arc;
55use std::time::Duration;
56
57use parking_lot::Mutex;
58
59use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
60use crate::cluster::pool::ServerPool;
61use crate::hashkit::DynToken;
62use crate::net::auto_eject::AutoEject;
63
64/// Snapshot of one peer the admin layer reports back over the
65/// PBC. The shape is intentionally small and self-describing so
66/// the wire format can serialise every field with no further
67/// lookups.
68#[derive(Clone, Debug, Eq, PartialEq)]
69pub struct PeerSnapshot {
70    /// Peer index in the pool's peer array. Stable for the
71    /// lifetime of the peer.
72    pub idx: u32,
73    /// Datacenter name.
74    pub dc: String,
75    /// Rack name.
76    pub rack: String,
77    /// Hostname or numeric IP.
78    pub host: String,
79    /// TCP port.
80    pub port: u16,
81    /// Token list rendered as the same `u32` integers the engine
82    /// uses for ring lookups.
83    pub tokens: Vec<u32>,
84    /// Lifecycle state.
85    pub state: PeerState,
86    /// True for the local peer (peer index 0 in conforming
87    /// configurations).
88    pub is_local: bool,
89}
90
91/// Description of a peer to add to the cluster.
92///
93/// This is the inverse of [`PeerSnapshot`]: a snapshot describes
94/// a peer that already exists, a `PeerSpec` describes one that
95/// the admin caller wants to join.
96#[derive(Clone, Debug, Eq, PartialEq)]
97pub struct PeerSpec {
98    /// Hostname or numeric IP.
99    pub host: String,
100    /// TCP port.
101    pub port: u16,
102    /// Datacenter name.
103    pub dc: String,
104    /// Rack name.
105    pub rack: String,
106    /// Token list as `u32`s.
107    pub tokens: Vec<u32>,
108    /// True when the peer expects an encrypted dnode link.
109    pub is_secure: bool,
110}
111
112/// Direction of a staged [`ClusterChange`].
113#[derive(Copy, Clone, Debug, Eq, PartialEq)]
114pub enum ClusterChangeKind {
115    /// Add a peer specified by [`ClusterChange::peer`].
116    Add,
117    /// Remove the peer at [`ClusterChange::peer_idx`].
118    Remove,
119}
120
121/// One pending cluster-membership mutation.
122///
123/// A staged mutation is a description; it is not applied until
124/// [`ClusterAdmin::cluster_commit`] is called.
125#[derive(Clone, Debug, Eq, PartialEq)]
126pub struct ClusterChange {
127    /// Add or Remove.
128    pub kind: ClusterChangeKind,
129    /// Peer index for Remove; ignored for Add.
130    pub peer_idx: Option<u32>,
131    /// Peer description for Add; ignored for Remove.
132    pub peer: Option<PeerSpec>,
133}
134
135/// Plan returned by [`ClusterAdmin::cluster_join`] and
136/// [`ClusterAdmin::cluster_leave`].
137#[derive(Clone, Debug, Eq, PartialEq)]
138pub struct JoinPlan {
139    /// The change that was staged.
140    pub change: ClusterChange,
141}
142
143/// Errors produced by the cluster admin layer.
144#[derive(Debug, thiserror::Error)]
145#[non_exhaustive]
146pub enum ClusterError {
147    /// No peer in the pool has the requested index.
148    #[error("peer not found: idx={idx}")]
149    PeerNotFound {
150        /// Index that was not found.
151        idx: u32,
152    },
153    /// The local peer cannot leave the cluster through the admin
154    /// surface; the operator should stop the node instead.
155    #[error("cannot remove the local peer")]
156    CannotRemoveLocal,
157    /// The supplied join target already exists in the peer list.
158    #[error("peer with endpoint {addr} already exists")]
159    PeerAlreadyExists {
160        /// `host:port` of the duplicate target.
161        addr: String,
162    },
163    /// Generic precondition failure (an Add change with no
164    /// `peer` field, a Remove change with no `peer_idx`, ...).
165    #[error("invalid request: {0}")]
166    Invalid(String),
167}
168
169/// Cluster-membership admin surface.
170///
171/// The PBC server consults a `&dyn ClusterAdmin` for the
172/// `DynRpb*` admin RPCs. The default implementation is
173/// [`PoolClusterAdmin`]; a [`NoopClusterAdmin`] is provided for
174/// servers that do not expose admin operations.
175pub trait ClusterAdmin: Send + Sync + std::fmt::Debug {
176    /// Snapshot of every peer the gossip layer has seen.
177    fn list_peers(&self) -> Vec<PeerSnapshot>;
178    /// Stage a Add change for `target` and return the plan.
179    ///
180    /// # Errors
181    ///
182    /// Returns [`ClusterError::PeerAlreadyExists`] when an
183    /// existing peer already advertises the same host:port.
184    fn cluster_join(&self, target: SocketAddr) -> Result<JoinPlan, ClusterError>;
185    /// Stage a Remove change for `peer_idx` and return the plan.
186    ///
187    /// # Errors
188    ///
189    /// Returns [`ClusterError::PeerNotFound`] when no peer has
190    /// the requested index, or
191    /// [`ClusterError::CannotRemoveLocal`] when the index
192    /// points at the local node.
193    fn cluster_leave(&self, peer_idx: u32) -> Result<JoinPlan, ClusterError>;
194    /// Snapshot of every staged-but-uncommitted change.
195    fn cluster_plan_pending(&self) -> Vec<ClusterChange>;
196    /// Apply every staged change and clear the staging list.
197    ///
198    /// # Errors
199    ///
200    /// Returns the first staging error encountered. Already-
201    /// applied changes remain applied.
202    fn cluster_commit(&self) -> Result<(), ClusterError>;
203}
204
205/// `ClusterAdmin` that always reports an empty cluster and
206/// rejects mutations. Used by callers that have not wired the
207/// real admin surface.
208#[derive(Debug, Default)]
209pub struct NoopClusterAdmin;
210
211impl ClusterAdmin for NoopClusterAdmin {
212    fn list_peers(&self) -> Vec<PeerSnapshot> {
213        Vec::new()
214    }
215    fn cluster_join(&self, _target: SocketAddr) -> Result<JoinPlan, ClusterError> {
216        Err(ClusterError::Invalid(
217            "cluster admin RPC not configured on this node".into(),
218        ))
219    }
220    fn cluster_leave(&self, _peer_idx: u32) -> Result<JoinPlan, ClusterError> {
221        Err(ClusterError::Invalid(
222            "cluster admin RPC not configured on this node".into(),
223        ))
224    }
225    fn cluster_plan_pending(&self) -> Vec<ClusterChange> {
226        Vec::new()
227    }
228    fn cluster_commit(&self) -> Result<(), ClusterError> {
229        Ok(())
230    }
231}
232
233/// `ClusterAdmin` backed by an [`Arc<ServerPool>`].
234///
235/// Mutations stage into an internal [`Mutex<Vec<ClusterChange>>`];
236/// `cluster_commit` applies every staged entry under the pool's
237/// existing `RwLock`s and rebuilds the token continuum once on
238/// the way out so dispatch sees the new ring.
239#[derive(Debug)]
240pub struct PoolClusterAdmin {
241    pool: Arc<ServerPool>,
242    staged: Mutex<Vec<ClusterChange>>,
243}
244
245impl PoolClusterAdmin {
246    /// Build a fresh admin handle wrapping `pool`.
247    #[must_use]
248    pub fn new(pool: Arc<ServerPool>) -> Self {
249        Self {
250            pool,
251            staged: Mutex::new(Vec::new()),
252        }
253    }
254
255    /// Borrow the pool the handle was built over. Useful when a
256    /// caller needs to consult the topology directly (e.g. the
257    /// dispatcher) without going through the admin RPC.
258    #[must_use]
259    pub fn pool(&self) -> &Arc<ServerPool> {
260        &self.pool
261    }
262}
263
264impl ClusterAdmin for PoolClusterAdmin {
265    fn list_peers(&self) -> Vec<PeerSnapshot> {
266        let peers = self.pool.peers().read();
267        peers
268            .iter()
269            .map(|p| PeerSnapshot {
270                idx: p.idx(),
271                dc: p.dc().to_string(),
272                rack: p.rack().to_string(),
273                host: p.endpoint().host().to_string(),
274                port: p.endpoint().port(),
275                tokens: p.tokens().iter().map(DynToken::get_int).collect(),
276                state: p.state(),
277                is_local: p.is_local(),
278            })
279            .collect()
280    }
281
282    fn cluster_join(&self, target: SocketAddr) -> Result<JoinPlan, ClusterError> {
283        let host = target.ip().to_string();
284        let port = target.port();
285        let peers = self.pool.peers().read();
286        if peers
287            .iter()
288            .any(|p| p.endpoint().host() == host && p.endpoint().port() == port)
289        {
290            return Err(ClusterError::PeerAlreadyExists {
291                addr: target.to_string(),
292            });
293        }
294        let staged = self.staged.lock();
295        if staged
296            .iter()
297            .any(|c| matches!(&c.peer, Some(s) if s.host == host && s.port == port))
298        {
299            return Err(ClusterError::PeerAlreadyExists {
300                addr: target.to_string(),
301            });
302        }
303        // Inherit dc / rack from the local peer (or pool config).
304        let (dc, rack) = peers.iter().find(|p| p.is_local()).map_or_else(
305            || {
306                (
307                    self.pool.config().dc.clone(),
308                    self.pool.config().rack.clone(),
309                )
310            },
311            |p| (p.dc().to_string(), p.rack().to_string()),
312        );
313        drop(staged);
314        drop(peers);
315        let token_val = derive_token(&host, port);
316        let spec = PeerSpec {
317            host,
318            port,
319            dc,
320            rack,
321            tokens: vec![token_val],
322            is_secure: false,
323        };
324        let change = ClusterChange {
325            kind: ClusterChangeKind::Add,
326            peer_idx: None,
327            peer: Some(spec),
328        };
329        let plan = JoinPlan {
330            change: change.clone(),
331        };
332        self.staged.lock().push(change);
333        Ok(plan)
334    }
335
336    fn cluster_leave(&self, peer_idx: u32) -> Result<JoinPlan, ClusterError> {
337        let peers = self.pool.peers().read();
338        let target = peers
339            .iter()
340            .find(|p| p.idx() == peer_idx)
341            .ok_or(ClusterError::PeerNotFound { idx: peer_idx })?;
342        if target.is_local() {
343            return Err(ClusterError::CannotRemoveLocal);
344        }
345        drop(peers);
346        let change = ClusterChange {
347            kind: ClusterChangeKind::Remove,
348            peer_idx: Some(peer_idx),
349            peer: None,
350        };
351        let plan = JoinPlan {
352            change: change.clone(),
353        };
354        self.staged.lock().push(change);
355        Ok(plan)
356    }
357
358    fn cluster_plan_pending(&self) -> Vec<ClusterChange> {
359        self.staged.lock().clone()
360    }
361
362    fn cluster_commit(&self) -> Result<(), ClusterError> {
363        let mut staged = self.staged.lock();
364        if staged.is_empty() {
365            return Ok(());
366        }
367        let mut peers = self.pool.peers().write();
368        let mut auto_ejects = self.pool.auto_eject().write();
369        let local_dc = self.pool.config().dc.clone();
370        for change in staged.iter() {
371            match change.kind {
372                ClusterChangeKind::Add => {
373                    let spec = change
374                        .peer
375                        .as_ref()
376                        .ok_or_else(|| ClusterError::Invalid("Add change missing peer".into()))?;
377                    if peers.iter().any(|p| {
378                        p.endpoint().host() == spec.host && p.endpoint().port() == spec.port
379                    }) {
380                        return Err(ClusterError::PeerAlreadyExists {
381                            addr: format!("{}:{}", spec.host, spec.port),
382                        });
383                    }
384                    let new_idx = u32::try_from(peers.len()).unwrap_or(u32::MAX);
385                    let is_same_dc = spec.dc == local_dc;
386                    let new_peer = Peer::new(
387                        new_idx,
388                        PeerEndpoint::tcp(spec.host.clone(), spec.port),
389                        spec.rack.clone(),
390                        spec.dc.clone(),
391                        spec.tokens
392                            .iter()
393                            .copied()
394                            .map(DynToken::from_u32)
395                            .collect(),
396                        false,
397                        is_same_dc,
398                        spec.is_secure,
399                    );
400                    peers.push(new_peer);
401                    let template = AutoEject::new(
402                        self.pool.config().auto_eject_hosts,
403                        self.pool.config().server_failure_limit,
404                        Duration::from_millis(self.pool.config().server_retry_timeout_ms),
405                    );
406                    auto_ejects.push(template);
407                }
408                ClusterChangeKind::Remove => {
409                    let idx = change
410                        .peer_idx
411                        .ok_or_else(|| ClusterError::Invalid("Remove change missing idx".into()))?;
412                    let pos = peers
413                        .iter()
414                        .position(|p| p.idx() == idx)
415                        .ok_or(ClusterError::PeerNotFound { idx })?;
416                    if peers[pos].is_local() {
417                        return Err(ClusterError::CannotRemoveLocal);
418                    }
419                    peers.remove(pos);
420                    if pos < auto_ejects.len() {
421                        auto_ejects.remove(pos);
422                    }
423                }
424            }
425        }
426        staged.clear();
427        drop(peers);
428        drop(auto_ejects);
429        // The continuum has to be re-derived once after the
430        // batch so the dispatcher routes against the new ring.
431        self.pool.rebuild_ring();
432        Ok(())
433    }
434}
435
436/// Stable per-(host, port) token used by [`PoolClusterAdmin::cluster_join`]
437/// when the caller does not provide an explicit token list. The
438/// hash is FNV-1a over the host bytes and port; collisions with
439/// existing tokens are not avoided here because the v0 admin
440/// surface does not yet drive a rebalance pass.
441fn derive_token(host: &str, port: u16) -> u32 {
442    let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
443    for &b in host.as_bytes() {
444        hash ^= u64::from(b);
445        hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
446    }
447    for byte in port.to_be_bytes() {
448        hash ^= u64::from(byte);
449        hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
450    }
451    u32::try_from(hash & 0xffff_ffff).unwrap_or(0)
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use crate::cluster::peer::PeerEndpoint;
458    use crate::cluster::pool::PoolConfig;
459
460    fn small_pool() -> Arc<ServerPool> {
461        let cfg = PoolConfig {
462            dc: "dc1".into(),
463            rack: "r1".into(),
464            ..PoolConfig::default()
465        };
466        let local = Peer::new(
467            0,
468            PeerEndpoint::tcp("127.0.0.1".into(), 8101),
469            "r1".into(),
470            "dc1".into(),
471            vec![DynToken::from_u32(0)],
472            true,
473            true,
474            false,
475        );
476        let remote = Peer::new(
477            1,
478            PeerEndpoint::tcp("127.0.0.1".into(), 8102),
479            "r1".into(),
480            "dc1".into(),
481            vec![DynToken::from_u32(2_147_483_648)],
482            false,
483            true,
484            false,
485        );
486        Arc::new(ServerPool::new(cfg, vec![local, remote]))
487    }
488
489    #[test]
490    fn list_peers_reports_every_peer() {
491        let admin = PoolClusterAdmin::new(small_pool());
492        let snaps = admin.list_peers();
493        assert_eq!(snaps.len(), 2);
494        let local = snaps.iter().find(|s| s.is_local).expect("local snapshot");
495        assert_eq!(local.idx, 0);
496        assert_eq!(local.port, 8101);
497        let remote = snaps.iter().find(|s| !s.is_local).expect("remote snapshot");
498        assert_eq!(remote.idx, 1);
499        assert_eq!(remote.tokens, vec![2_147_483_648]);
500    }
501
502    #[test]
503    fn join_stages_and_commit_appends_peer() {
504        let admin = PoolClusterAdmin::new(small_pool());
505        let target: SocketAddr = "127.0.0.1:8103".parse().unwrap();
506        let plan = admin.cluster_join(target).expect("plan");
507        assert_eq!(plan.change.kind, ClusterChangeKind::Add);
508        assert_eq!(admin.cluster_plan_pending().len(), 1);
509        // Peer list does not include the new peer until commit.
510        assert_eq!(admin.list_peers().len(), 2);
511        admin.cluster_commit().expect("commit");
512        assert_eq!(admin.cluster_plan_pending().len(), 0);
513        let snaps = admin.list_peers();
514        assert_eq!(snaps.len(), 3);
515        assert!(snaps.iter().any(|s| s.port == 8103));
516    }
517
518    #[test]
519    fn join_rejects_duplicate_endpoint() {
520        let admin = PoolClusterAdmin::new(small_pool());
521        let target: SocketAddr = "127.0.0.1:8102".parse().unwrap();
522        let err = admin.cluster_join(target).expect_err("duplicate");
523        assert!(matches!(err, ClusterError::PeerAlreadyExists { .. }));
524    }
525
526    #[test]
527    fn join_rejects_duplicate_in_staging() {
528        let admin = PoolClusterAdmin::new(small_pool());
529        let target: SocketAddr = "127.0.0.1:8200".parse().unwrap();
530        admin.cluster_join(target).expect("first");
531        let err = admin.cluster_join(target).expect_err("staged dup");
532        assert!(matches!(err, ClusterError::PeerAlreadyExists { .. }));
533    }
534
535    #[test]
536    fn leave_stages_and_commit_removes_peer() {
537        let admin = PoolClusterAdmin::new(small_pool());
538        let plan = admin.cluster_leave(1).expect("plan");
539        assert_eq!(plan.change.kind, ClusterChangeKind::Remove);
540        assert_eq!(plan.change.peer_idx, Some(1));
541        // Pre-commit: pool unchanged.
542        assert_eq!(admin.list_peers().len(), 2);
543        admin.cluster_commit().expect("commit");
544        let snaps = admin.list_peers();
545        assert_eq!(snaps.len(), 1);
546        assert_eq!(snaps[0].idx, 0);
547    }
548
549    #[test]
550    fn leave_rejects_unknown_idx() {
551        let admin = PoolClusterAdmin::new(small_pool());
552        let err = admin.cluster_leave(99).expect_err("unknown");
553        assert!(matches!(err, ClusterError::PeerNotFound { idx: 99 }));
554    }
555
556    #[test]
557    fn leave_rejects_local_peer() {
558        let admin = PoolClusterAdmin::new(small_pool());
559        let err = admin.cluster_leave(0).expect_err("local");
560        assert!(matches!(err, ClusterError::CannotRemoveLocal));
561    }
562
563    #[test]
564    fn commit_with_empty_staging_is_ok() {
565        let admin = PoolClusterAdmin::new(small_pool());
566        admin.cluster_commit().expect("noop commit");
567        assert_eq!(admin.list_peers().len(), 2);
568    }
569
570    #[test]
571    fn commit_applies_mixed_batch_in_order() {
572        let admin = PoolClusterAdmin::new(small_pool());
573        admin.cluster_leave(1).expect("stage leave");
574        let target: SocketAddr = "10.0.0.1:8101".parse().unwrap();
575        admin.cluster_join(target).expect("stage join");
576        assert_eq!(admin.cluster_plan_pending().len(), 2);
577        admin.cluster_commit().expect("commit");
578        let snaps = admin.list_peers();
579        assert_eq!(snaps.len(), 2);
580        // The local peer is preserved; the new peer takes idx 1
581        // (the slot vacated by the removed peer is not reused;
582        // the new peer's idx is chosen as the post-removal len).
583        let new = snaps.iter().find(|s| s.host == "10.0.0.1").expect("added");
584        assert_eq!(new.port, 8101);
585        // Old remote peer is gone.
586        assert!(!snaps
587            .iter()
588            .any(|s| s.host == "127.0.0.1" && s.port == 8102));
589    }
590
591    #[test]
592    fn noop_admin_returns_empty_and_errors_on_mutations() {
593        let admin = NoopClusterAdmin;
594        assert!(admin.list_peers().is_empty());
595        assert!(admin.cluster_plan_pending().is_empty());
596        admin.cluster_commit().expect("noop commit");
597        let target: SocketAddr = "127.0.0.1:1".parse().unwrap();
598        assert!(matches!(
599            admin.cluster_join(target),
600            Err(ClusterError::Invalid(_))
601        ));
602        assert!(matches!(
603            admin.cluster_leave(0),
604            Err(ClusterError::Invalid(_))
605        ));
606    }
607
608    #[test]
609    fn derive_token_is_stable_per_endpoint() {
610        let a = derive_token("10.0.0.1", 8101);
611        let b = derive_token("10.0.0.1", 8101);
612        let c = derive_token("10.0.0.2", 8101);
613        assert_eq!(a, b);
614        assert_ne!(a, c);
615    }
616
617    #[test]
618    fn join_rebuilds_ring() {
619        // After commit the per-rack continuum should include the
620        // freshly added peer.
621        let admin = PoolClusterAdmin::new(small_pool());
622        let target: SocketAddr = "10.0.0.5:8101".parse().unwrap();
623        admin.cluster_join(target).expect("plan");
624        admin.cluster_commit().expect("commit");
625        let pool = admin.pool();
626        let topology = pool.datacenters().read();
627        let dc1 = topology.iter().find(|d| d.name() == "dc1").expect("dc1");
628        let r1 = dc1.racks().iter().find(|r| r.name() == "r1").expect("r1");
629        // The newly-added peer joins rack r1 (inherited from local)
630        // so the rack now holds the local + the previous remote +
631        // the freshly added peer = 3 distinct peer indices.
632        let entries = r1.continuums();
633        let mut idxs: Vec<u32> = entries.iter().map(|e| e.peer_idx).collect();
634        idxs.sort_unstable();
635        idxs.dedup();
636        assert_eq!(idxs.len(), 3);
637    }
638}