use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use parking_lot::Mutex;
use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
use crate::cluster::pool::ServerPool;
use crate::hashkit::DynToken;
use crate::net::auto_eject::AutoEject;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PeerSnapshot {
pub idx: u32,
pub dc: String,
pub rack: String,
pub host: String,
pub port: u16,
pub tokens: Vec<u32>,
pub state: PeerState,
pub is_local: bool,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PeerSpec {
pub host: String,
pub port: u16,
pub dc: String,
pub rack: String,
pub tokens: Vec<u32>,
pub is_secure: bool,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ClusterChangeKind {
Add,
Remove,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ClusterChange {
pub kind: ClusterChangeKind,
pub peer_idx: Option<u32>,
pub peer: Option<PeerSpec>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct JoinPlan {
pub change: ClusterChange,
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ClusterError {
#[error("peer not found: idx={idx}")]
PeerNotFound {
idx: u32,
},
#[error("cannot remove the local peer")]
CannotRemoveLocal,
#[error("peer with endpoint {addr} already exists")]
PeerAlreadyExists {
addr: String,
},
#[error("invalid request: {0}")]
Invalid(String),
}
pub trait ClusterAdmin: Send + Sync + std::fmt::Debug {
fn list_peers(&self) -> Vec<PeerSnapshot>;
fn cluster_join(&self, target: SocketAddr) -> Result<JoinPlan, ClusterError>;
fn cluster_leave(&self, peer_idx: u32) -> Result<JoinPlan, ClusterError>;
fn cluster_plan_pending(&self) -> Vec<ClusterChange>;
fn cluster_commit(&self) -> Result<(), ClusterError>;
}
#[derive(Debug, Default)]
pub struct NoopClusterAdmin;
impl ClusterAdmin for NoopClusterAdmin {
fn list_peers(&self) -> Vec<PeerSnapshot> {
Vec::new()
}
fn cluster_join(&self, _target: SocketAddr) -> Result<JoinPlan, ClusterError> {
Err(ClusterError::Invalid(
"cluster admin RPC not configured on this node".into(),
))
}
fn cluster_leave(&self, _peer_idx: u32) -> Result<JoinPlan, ClusterError> {
Err(ClusterError::Invalid(
"cluster admin RPC not configured on this node".into(),
))
}
fn cluster_plan_pending(&self) -> Vec<ClusterChange> {
Vec::new()
}
fn cluster_commit(&self) -> Result<(), ClusterError> {
Ok(())
}
}
#[derive(Debug)]
pub struct PoolClusterAdmin {
pool: Arc<ServerPool>,
staged: Mutex<Vec<ClusterChange>>,
}
impl PoolClusterAdmin {
#[must_use]
pub fn new(pool: Arc<ServerPool>) -> Self {
Self {
pool,
staged: Mutex::new(Vec::new()),
}
}
#[must_use]
pub fn pool(&self) -> &Arc<ServerPool> {
&self.pool
}
}
impl ClusterAdmin for PoolClusterAdmin {
fn list_peers(&self) -> Vec<PeerSnapshot> {
let peers = self.pool.peers().read();
peers
.iter()
.map(|p| PeerSnapshot {
idx: p.idx(),
dc: p.dc().to_string(),
rack: p.rack().to_string(),
host: p.endpoint().host().to_string(),
port: p.endpoint().port(),
tokens: p.tokens().iter().map(DynToken::get_int).collect(),
state: p.state(),
is_local: p.is_local(),
})
.collect()
}
fn cluster_join(&self, target: SocketAddr) -> Result<JoinPlan, ClusterError> {
let host = target.ip().to_string();
let port = target.port();
let peers = self.pool.peers().read();
if peers
.iter()
.any(|p| p.endpoint().host() == host && p.endpoint().port() == port)
{
return Err(ClusterError::PeerAlreadyExists {
addr: target.to_string(),
});
}
let staged = self.staged.lock();
if staged
.iter()
.any(|c| matches!(&c.peer, Some(s) if s.host == host && s.port == port))
{
return Err(ClusterError::PeerAlreadyExists {
addr: target.to_string(),
});
}
let (dc, rack) = peers.iter().find(|p| p.is_local()).map_or_else(
|| {
(
self.pool.config().dc.clone(),
self.pool.config().rack.clone(),
)
},
|p| (p.dc().to_string(), p.rack().to_string()),
);
drop(staged);
drop(peers);
let token_val = derive_token(&host, port);
let spec = PeerSpec {
host,
port,
dc,
rack,
tokens: vec![token_val],
is_secure: false,
};
let change = ClusterChange {
kind: ClusterChangeKind::Add,
peer_idx: None,
peer: Some(spec),
};
let plan = JoinPlan {
change: change.clone(),
};
self.staged.lock().push(change);
Ok(plan)
}
fn cluster_leave(&self, peer_idx: u32) -> Result<JoinPlan, ClusterError> {
let peers = self.pool.peers().read();
let target = peers
.iter()
.find(|p| p.idx() == peer_idx)
.ok_or(ClusterError::PeerNotFound { idx: peer_idx })?;
if target.is_local() {
return Err(ClusterError::CannotRemoveLocal);
}
drop(peers);
let change = ClusterChange {
kind: ClusterChangeKind::Remove,
peer_idx: Some(peer_idx),
peer: None,
};
let plan = JoinPlan {
change: change.clone(),
};
self.staged.lock().push(change);
Ok(plan)
}
fn cluster_plan_pending(&self) -> Vec<ClusterChange> {
self.staged.lock().clone()
}
fn cluster_commit(&self) -> Result<(), ClusterError> {
let mut staged = self.staged.lock();
if staged.is_empty() {
return Ok(());
}
let mut peers = self.pool.peers().write();
let mut auto_ejects = self.pool.auto_eject().write();
let local_dc = self.pool.config().dc.clone();
for change in staged.iter() {
match change.kind {
ClusterChangeKind::Add => {
let spec = change
.peer
.as_ref()
.ok_or_else(|| ClusterError::Invalid("Add change missing peer".into()))?;
if peers.iter().any(|p| {
p.endpoint().host() == spec.host && p.endpoint().port() == spec.port
}) {
return Err(ClusterError::PeerAlreadyExists {
addr: format!("{}:{}", spec.host, spec.port),
});
}
let new_idx = u32::try_from(peers.len()).unwrap_or(u32::MAX);
let is_same_dc = spec.dc == local_dc;
let new_peer = Peer::new(
new_idx,
PeerEndpoint::tcp(spec.host.clone(), spec.port),
spec.rack.clone(),
spec.dc.clone(),
spec.tokens
.iter()
.copied()
.map(DynToken::from_u32)
.collect(),
false,
is_same_dc,
spec.is_secure,
);
peers.push(new_peer);
let template = AutoEject::new(
self.pool.config().auto_eject_hosts,
self.pool.config().server_failure_limit,
Duration::from_millis(self.pool.config().server_retry_timeout_ms),
);
auto_ejects.push(template);
}
ClusterChangeKind::Remove => {
let idx = change
.peer_idx
.ok_or_else(|| ClusterError::Invalid("Remove change missing idx".into()))?;
let pos = peers
.iter()
.position(|p| p.idx() == idx)
.ok_or(ClusterError::PeerNotFound { idx })?;
if peers[pos].is_local() {
return Err(ClusterError::CannotRemoveLocal);
}
peers.remove(pos);
if pos < auto_ejects.len() {
auto_ejects.remove(pos);
}
}
}
}
staged.clear();
drop(peers);
drop(auto_ejects);
self.pool.rebuild_ring();
Ok(())
}
}
fn derive_token(host: &str, port: u16) -> u32 {
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for &b in host.as_bytes() {
hash ^= u64::from(b);
hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
}
for byte in port.to_be_bytes() {
hash ^= u64::from(byte);
hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
}
u32::try_from(hash & 0xffff_ffff).unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::peer::PeerEndpoint;
use crate::cluster::pool::PoolConfig;
fn small_pool() -> Arc<ServerPool> {
let cfg = PoolConfig {
dc: "dc1".into(),
rack: "r1".into(),
..PoolConfig::default()
};
let local = Peer::new(
0,
PeerEndpoint::tcp("127.0.0.1".into(), 8101),
"r1".into(),
"dc1".into(),
vec![DynToken::from_u32(0)],
true,
true,
false,
);
let remote = Peer::new(
1,
PeerEndpoint::tcp("127.0.0.1".into(), 8102),
"r1".into(),
"dc1".into(),
vec![DynToken::from_u32(2_147_483_648)],
false,
true,
false,
);
Arc::new(ServerPool::new(cfg, vec![local, remote]))
}
#[test]
fn list_peers_reports_every_peer() {
let admin = PoolClusterAdmin::new(small_pool());
let snaps = admin.list_peers();
assert_eq!(snaps.len(), 2);
let local = snaps.iter().find(|s| s.is_local).expect("local snapshot");
assert_eq!(local.idx, 0);
assert_eq!(local.port, 8101);
let remote = snaps.iter().find(|s| !s.is_local).expect("remote snapshot");
assert_eq!(remote.idx, 1);
assert_eq!(remote.tokens, vec![2_147_483_648]);
}
#[test]
fn join_stages_and_commit_appends_peer() {
let admin = PoolClusterAdmin::new(small_pool());
let target: SocketAddr = "127.0.0.1:8103".parse().unwrap();
let plan = admin.cluster_join(target).expect("plan");
assert_eq!(plan.change.kind, ClusterChangeKind::Add);
assert_eq!(admin.cluster_plan_pending().len(), 1);
assert_eq!(admin.list_peers().len(), 2);
admin.cluster_commit().expect("commit");
assert_eq!(admin.cluster_plan_pending().len(), 0);
let snaps = admin.list_peers();
assert_eq!(snaps.len(), 3);
assert!(snaps.iter().any(|s| s.port == 8103));
}
#[test]
fn join_rejects_duplicate_endpoint() {
let admin = PoolClusterAdmin::new(small_pool());
let target: SocketAddr = "127.0.0.1:8102".parse().unwrap();
let err = admin.cluster_join(target).expect_err("duplicate");
assert!(matches!(err, ClusterError::PeerAlreadyExists { .. }));
}
#[test]
fn join_rejects_duplicate_in_staging() {
let admin = PoolClusterAdmin::new(small_pool());
let target: SocketAddr = "127.0.0.1:8200".parse().unwrap();
admin.cluster_join(target).expect("first");
let err = admin.cluster_join(target).expect_err("staged dup");
assert!(matches!(err, ClusterError::PeerAlreadyExists { .. }));
}
#[test]
fn leave_stages_and_commit_removes_peer() {
let admin = PoolClusterAdmin::new(small_pool());
let plan = admin.cluster_leave(1).expect("plan");
assert_eq!(plan.change.kind, ClusterChangeKind::Remove);
assert_eq!(plan.change.peer_idx, Some(1));
assert_eq!(admin.list_peers().len(), 2);
admin.cluster_commit().expect("commit");
let snaps = admin.list_peers();
assert_eq!(snaps.len(), 1);
assert_eq!(snaps[0].idx, 0);
}
#[test]
fn leave_rejects_unknown_idx() {
let admin = PoolClusterAdmin::new(small_pool());
let err = admin.cluster_leave(99).expect_err("unknown");
assert!(matches!(err, ClusterError::PeerNotFound { idx: 99 }));
}
#[test]
fn leave_rejects_local_peer() {
let admin = PoolClusterAdmin::new(small_pool());
let err = admin.cluster_leave(0).expect_err("local");
assert!(matches!(err, ClusterError::CannotRemoveLocal));
}
#[test]
fn commit_with_empty_staging_is_ok() {
let admin = PoolClusterAdmin::new(small_pool());
admin.cluster_commit().expect("noop commit");
assert_eq!(admin.list_peers().len(), 2);
}
#[test]
fn commit_applies_mixed_batch_in_order() {
let admin = PoolClusterAdmin::new(small_pool());
admin.cluster_leave(1).expect("stage leave");
let target: SocketAddr = "10.0.0.1:8101".parse().unwrap();
admin.cluster_join(target).expect("stage join");
assert_eq!(admin.cluster_plan_pending().len(), 2);
admin.cluster_commit().expect("commit");
let snaps = admin.list_peers();
assert_eq!(snaps.len(), 2);
let new = snaps.iter().find(|s| s.host == "10.0.0.1").expect("added");
assert_eq!(new.port, 8101);
assert!(!snaps
.iter()
.any(|s| s.host == "127.0.0.1" && s.port == 8102));
}
#[test]
fn noop_admin_returns_empty_and_errors_on_mutations() {
let admin = NoopClusterAdmin;
assert!(admin.list_peers().is_empty());
assert!(admin.cluster_plan_pending().is_empty());
admin.cluster_commit().expect("noop commit");
let target: SocketAddr = "127.0.0.1:1".parse().unwrap();
assert!(matches!(
admin.cluster_join(target),
Err(ClusterError::Invalid(_))
));
assert!(matches!(
admin.cluster_leave(0),
Err(ClusterError::Invalid(_))
));
}
#[test]
fn derive_token_is_stable_per_endpoint() {
let a = derive_token("10.0.0.1", 8101);
let b = derive_token("10.0.0.1", 8101);
let c = derive_token("10.0.0.2", 8101);
assert_eq!(a, b);
assert_ne!(a, c);
}
#[test]
fn join_rebuilds_ring() {
let admin = PoolClusterAdmin::new(small_pool());
let target: SocketAddr = "10.0.0.5:8101".parse().unwrap();
admin.cluster_join(target).expect("plan");
admin.cluster_commit().expect("commit");
let pool = admin.pool();
let topology = pool.datacenters().read();
let dc1 = topology.iter().find(|d| d.name() == "dc1").expect("dc1");
let r1 = dc1.racks().iter().find(|r| r.name() == "r1").expect("r1");
let entries = r1.continuums();
let mut idxs: Vec<u32> = entries.iter().map(|e| e.peer_idx).collect();
idxs.sort_unstable();
idxs.dedup();
assert_eq!(idxs.len(), 3);
}
}