1use std::net::SocketAddr;
54use std::sync::Arc;
55use std::time::Duration;
56
57use parking_lot::Mutex;
58
59use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
60use crate::cluster::pool::ServerPool;
61use crate::hashkit::DynToken;
62use crate::net::auto_eject::AutoEject;
63
64#[derive(Clone, Debug, Eq, PartialEq)]
69pub struct PeerSnapshot {
70 pub idx: u32,
73 pub dc: String,
75 pub rack: String,
77 pub host: String,
79 pub port: u16,
81 pub tokens: Vec<u32>,
84 pub state: PeerState,
86 pub is_local: bool,
89}
90
91#[derive(Clone, Debug, Eq, PartialEq)]
97pub struct PeerSpec {
98 pub host: String,
100 pub port: u16,
102 pub dc: String,
104 pub rack: String,
106 pub tokens: Vec<u32>,
108 pub is_secure: bool,
110}
111
112#[derive(Copy, Clone, Debug, Eq, PartialEq)]
114pub enum ClusterChangeKind {
115 Add,
117 Remove,
119}
120
121#[derive(Clone, Debug, Eq, PartialEq)]
126pub struct ClusterChange {
127 pub kind: ClusterChangeKind,
129 pub peer_idx: Option<u32>,
131 pub peer: Option<PeerSpec>,
133}
134
135#[derive(Clone, Debug, Eq, PartialEq)]
138pub struct JoinPlan {
139 pub change: ClusterChange,
141}
142
143#[derive(Debug, thiserror::Error)]
145#[non_exhaustive]
146pub enum ClusterError {
147 #[error("peer not found: idx={idx}")]
149 PeerNotFound {
150 idx: u32,
152 },
153 #[error("cannot remove the local peer")]
156 CannotRemoveLocal,
157 #[error("peer with endpoint {addr} already exists")]
159 PeerAlreadyExists {
160 addr: String,
162 },
163 #[error("invalid request: {0}")]
166 Invalid(String),
167}
168
169pub trait ClusterAdmin: Send + Sync + std::fmt::Debug {
176 fn list_peers(&self) -> Vec<PeerSnapshot>;
178 fn cluster_join(&self, target: SocketAddr) -> Result<JoinPlan, ClusterError>;
185 fn cluster_leave(&self, peer_idx: u32) -> Result<JoinPlan, ClusterError>;
194 fn cluster_plan_pending(&self) -> Vec<ClusterChange>;
196 fn cluster_commit(&self) -> Result<(), ClusterError>;
203}
204
205#[derive(Debug, Default)]
209pub struct NoopClusterAdmin;
210
211impl ClusterAdmin for NoopClusterAdmin {
212 fn list_peers(&self) -> Vec<PeerSnapshot> {
213 Vec::new()
214 }
215 fn cluster_join(&self, _target: SocketAddr) -> Result<JoinPlan, ClusterError> {
216 Err(ClusterError::Invalid(
217 "cluster admin RPC not configured on this node".into(),
218 ))
219 }
220 fn cluster_leave(&self, _peer_idx: u32) -> Result<JoinPlan, ClusterError> {
221 Err(ClusterError::Invalid(
222 "cluster admin RPC not configured on this node".into(),
223 ))
224 }
225 fn cluster_plan_pending(&self) -> Vec<ClusterChange> {
226 Vec::new()
227 }
228 fn cluster_commit(&self) -> Result<(), ClusterError> {
229 Ok(())
230 }
231}
232
233#[derive(Debug)]
240pub struct PoolClusterAdmin {
241 pool: Arc<ServerPool>,
242 staged: Mutex<Vec<ClusterChange>>,
243}
244
245impl PoolClusterAdmin {
246 #[must_use]
248 pub fn new(pool: Arc<ServerPool>) -> Self {
249 Self {
250 pool,
251 staged: Mutex::new(Vec::new()),
252 }
253 }
254
255 #[must_use]
259 pub fn pool(&self) -> &Arc<ServerPool> {
260 &self.pool
261 }
262}
263
264impl ClusterAdmin for PoolClusterAdmin {
265 fn list_peers(&self) -> Vec<PeerSnapshot> {
266 let peers = self.pool.peers().read();
267 peers
268 .iter()
269 .map(|p| PeerSnapshot {
270 idx: p.idx(),
271 dc: p.dc().to_string(),
272 rack: p.rack().to_string(),
273 host: p.endpoint().host().to_string(),
274 port: p.endpoint().port(),
275 tokens: p.tokens().iter().map(DynToken::get_int).collect(),
276 state: p.state(),
277 is_local: p.is_local(),
278 })
279 .collect()
280 }
281
282 fn cluster_join(&self, target: SocketAddr) -> Result<JoinPlan, ClusterError> {
283 let host = target.ip().to_string();
284 let port = target.port();
285 let peers = self.pool.peers().read();
286 if peers
287 .iter()
288 .any(|p| p.endpoint().host() == host && p.endpoint().port() == port)
289 {
290 return Err(ClusterError::PeerAlreadyExists {
291 addr: target.to_string(),
292 });
293 }
294 let staged = self.staged.lock();
295 if staged
296 .iter()
297 .any(|c| matches!(&c.peer, Some(s) if s.host == host && s.port == port))
298 {
299 return Err(ClusterError::PeerAlreadyExists {
300 addr: target.to_string(),
301 });
302 }
303 let (dc, rack) = peers.iter().find(|p| p.is_local()).map_or_else(
305 || {
306 (
307 self.pool.config().dc.clone(),
308 self.pool.config().rack.clone(),
309 )
310 },
311 |p| (p.dc().to_string(), p.rack().to_string()),
312 );
313 drop(staged);
314 drop(peers);
315 let token_val = derive_token(&host, port);
316 let spec = PeerSpec {
317 host,
318 port,
319 dc,
320 rack,
321 tokens: vec![token_val],
322 is_secure: false,
323 };
324 let change = ClusterChange {
325 kind: ClusterChangeKind::Add,
326 peer_idx: None,
327 peer: Some(spec),
328 };
329 let plan = JoinPlan {
330 change: change.clone(),
331 };
332 self.staged.lock().push(change);
333 Ok(plan)
334 }
335
336 fn cluster_leave(&self, peer_idx: u32) -> Result<JoinPlan, ClusterError> {
337 let peers = self.pool.peers().read();
338 let target = peers
339 .iter()
340 .find(|p| p.idx() == peer_idx)
341 .ok_or(ClusterError::PeerNotFound { idx: peer_idx })?;
342 if target.is_local() {
343 return Err(ClusterError::CannotRemoveLocal);
344 }
345 drop(peers);
346 let change = ClusterChange {
347 kind: ClusterChangeKind::Remove,
348 peer_idx: Some(peer_idx),
349 peer: None,
350 };
351 let plan = JoinPlan {
352 change: change.clone(),
353 };
354 self.staged.lock().push(change);
355 Ok(plan)
356 }
357
358 fn cluster_plan_pending(&self) -> Vec<ClusterChange> {
359 self.staged.lock().clone()
360 }
361
362 fn cluster_commit(&self) -> Result<(), ClusterError> {
363 let mut staged = self.staged.lock();
364 if staged.is_empty() {
365 return Ok(());
366 }
367 let mut peers = self.pool.peers().write();
368 let mut auto_ejects = self.pool.auto_eject().write();
369 let local_dc = self.pool.config().dc.clone();
370 for change in staged.iter() {
371 match change.kind {
372 ClusterChangeKind::Add => {
373 let spec = change
374 .peer
375 .as_ref()
376 .ok_or_else(|| ClusterError::Invalid("Add change missing peer".into()))?;
377 if peers.iter().any(|p| {
378 p.endpoint().host() == spec.host && p.endpoint().port() == spec.port
379 }) {
380 return Err(ClusterError::PeerAlreadyExists {
381 addr: format!("{}:{}", spec.host, spec.port),
382 });
383 }
384 let new_idx = u32::try_from(peers.len()).unwrap_or(u32::MAX);
385 let is_same_dc = spec.dc == local_dc;
386 let new_peer = Peer::new(
387 new_idx,
388 PeerEndpoint::tcp(spec.host.clone(), spec.port),
389 spec.rack.clone(),
390 spec.dc.clone(),
391 spec.tokens
392 .iter()
393 .copied()
394 .map(DynToken::from_u32)
395 .collect(),
396 false,
397 is_same_dc,
398 spec.is_secure,
399 );
400 peers.push(new_peer);
401 let template = AutoEject::new(
402 self.pool.config().auto_eject_hosts,
403 self.pool.config().server_failure_limit,
404 Duration::from_millis(self.pool.config().server_retry_timeout_ms),
405 );
406 auto_ejects.push(template);
407 }
408 ClusterChangeKind::Remove => {
409 let idx = change
410 .peer_idx
411 .ok_or_else(|| ClusterError::Invalid("Remove change missing idx".into()))?;
412 let pos = peers
413 .iter()
414 .position(|p| p.idx() == idx)
415 .ok_or(ClusterError::PeerNotFound { idx })?;
416 if peers[pos].is_local() {
417 return Err(ClusterError::CannotRemoveLocal);
418 }
419 peers.remove(pos);
420 if pos < auto_ejects.len() {
421 auto_ejects.remove(pos);
422 }
423 }
424 }
425 }
426 staged.clear();
427 drop(peers);
428 drop(auto_ejects);
429 self.pool.rebuild_ring();
432 Ok(())
433 }
434}
435
436fn derive_token(host: &str, port: u16) -> u32 {
442 let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
443 for &b in host.as_bytes() {
444 hash ^= u64::from(b);
445 hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
446 }
447 for byte in port.to_be_bytes() {
448 hash ^= u64::from(byte);
449 hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
450 }
451 u32::try_from(hash & 0xffff_ffff).unwrap_or(0)
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457 use crate::cluster::peer::PeerEndpoint;
458 use crate::cluster::pool::PoolConfig;
459
460 fn small_pool() -> Arc<ServerPool> {
461 let cfg = PoolConfig {
462 dc: "dc1".into(),
463 rack: "r1".into(),
464 ..PoolConfig::default()
465 };
466 let local = Peer::new(
467 0,
468 PeerEndpoint::tcp("127.0.0.1".into(), 8101),
469 "r1".into(),
470 "dc1".into(),
471 vec![DynToken::from_u32(0)],
472 true,
473 true,
474 false,
475 );
476 let remote = Peer::new(
477 1,
478 PeerEndpoint::tcp("127.0.0.1".into(), 8102),
479 "r1".into(),
480 "dc1".into(),
481 vec![DynToken::from_u32(2_147_483_648)],
482 false,
483 true,
484 false,
485 );
486 Arc::new(ServerPool::new(cfg, vec![local, remote]))
487 }
488
489 #[test]
490 fn list_peers_reports_every_peer() {
491 let admin = PoolClusterAdmin::new(small_pool());
492 let snaps = admin.list_peers();
493 assert_eq!(snaps.len(), 2);
494 let local = snaps.iter().find(|s| s.is_local).expect("local snapshot");
495 assert_eq!(local.idx, 0);
496 assert_eq!(local.port, 8101);
497 let remote = snaps.iter().find(|s| !s.is_local).expect("remote snapshot");
498 assert_eq!(remote.idx, 1);
499 assert_eq!(remote.tokens, vec![2_147_483_648]);
500 }
501
502 #[test]
503 fn join_stages_and_commit_appends_peer() {
504 let admin = PoolClusterAdmin::new(small_pool());
505 let target: SocketAddr = "127.0.0.1:8103".parse().unwrap();
506 let plan = admin.cluster_join(target).expect("plan");
507 assert_eq!(plan.change.kind, ClusterChangeKind::Add);
508 assert_eq!(admin.cluster_plan_pending().len(), 1);
509 assert_eq!(admin.list_peers().len(), 2);
511 admin.cluster_commit().expect("commit");
512 assert_eq!(admin.cluster_plan_pending().len(), 0);
513 let snaps = admin.list_peers();
514 assert_eq!(snaps.len(), 3);
515 assert!(snaps.iter().any(|s| s.port == 8103));
516 }
517
518 #[test]
519 fn join_rejects_duplicate_endpoint() {
520 let admin = PoolClusterAdmin::new(small_pool());
521 let target: SocketAddr = "127.0.0.1:8102".parse().unwrap();
522 let err = admin.cluster_join(target).expect_err("duplicate");
523 assert!(matches!(err, ClusterError::PeerAlreadyExists { .. }));
524 }
525
526 #[test]
527 fn join_rejects_duplicate_in_staging() {
528 let admin = PoolClusterAdmin::new(small_pool());
529 let target: SocketAddr = "127.0.0.1:8200".parse().unwrap();
530 admin.cluster_join(target).expect("first");
531 let err = admin.cluster_join(target).expect_err("staged dup");
532 assert!(matches!(err, ClusterError::PeerAlreadyExists { .. }));
533 }
534
535 #[test]
536 fn leave_stages_and_commit_removes_peer() {
537 let admin = PoolClusterAdmin::new(small_pool());
538 let plan = admin.cluster_leave(1).expect("plan");
539 assert_eq!(plan.change.kind, ClusterChangeKind::Remove);
540 assert_eq!(plan.change.peer_idx, Some(1));
541 assert_eq!(admin.list_peers().len(), 2);
543 admin.cluster_commit().expect("commit");
544 let snaps = admin.list_peers();
545 assert_eq!(snaps.len(), 1);
546 assert_eq!(snaps[0].idx, 0);
547 }
548
549 #[test]
550 fn leave_rejects_unknown_idx() {
551 let admin = PoolClusterAdmin::new(small_pool());
552 let err = admin.cluster_leave(99).expect_err("unknown");
553 assert!(matches!(err, ClusterError::PeerNotFound { idx: 99 }));
554 }
555
556 #[test]
557 fn leave_rejects_local_peer() {
558 let admin = PoolClusterAdmin::new(small_pool());
559 let err = admin.cluster_leave(0).expect_err("local");
560 assert!(matches!(err, ClusterError::CannotRemoveLocal));
561 }
562
563 #[test]
564 fn commit_with_empty_staging_is_ok() {
565 let admin = PoolClusterAdmin::new(small_pool());
566 admin.cluster_commit().expect("noop commit");
567 assert_eq!(admin.list_peers().len(), 2);
568 }
569
570 #[test]
571 fn commit_applies_mixed_batch_in_order() {
572 let admin = PoolClusterAdmin::new(small_pool());
573 admin.cluster_leave(1).expect("stage leave");
574 let target: SocketAddr = "10.0.0.1:8101".parse().unwrap();
575 admin.cluster_join(target).expect("stage join");
576 assert_eq!(admin.cluster_plan_pending().len(), 2);
577 admin.cluster_commit().expect("commit");
578 let snaps = admin.list_peers();
579 assert_eq!(snaps.len(), 2);
580 let new = snaps.iter().find(|s| s.host == "10.0.0.1").expect("added");
584 assert_eq!(new.port, 8101);
585 assert!(!snaps
587 .iter()
588 .any(|s| s.host == "127.0.0.1" && s.port == 8102));
589 }
590
591 #[test]
592 fn noop_admin_returns_empty_and_errors_on_mutations() {
593 let admin = NoopClusterAdmin;
594 assert!(admin.list_peers().is_empty());
595 assert!(admin.cluster_plan_pending().is_empty());
596 admin.cluster_commit().expect("noop commit");
597 let target: SocketAddr = "127.0.0.1:1".parse().unwrap();
598 assert!(matches!(
599 admin.cluster_join(target),
600 Err(ClusterError::Invalid(_))
601 ));
602 assert!(matches!(
603 admin.cluster_leave(0),
604 Err(ClusterError::Invalid(_))
605 ));
606 }
607
608 #[test]
609 fn derive_token_is_stable_per_endpoint() {
610 let a = derive_token("10.0.0.1", 8101);
611 let b = derive_token("10.0.0.1", 8101);
612 let c = derive_token("10.0.0.2", 8101);
613 assert_eq!(a, b);
614 assert_ne!(a, c);
615 }
616
617 #[test]
618 fn join_rebuilds_ring() {
619 let admin = PoolClusterAdmin::new(small_pool());
622 let target: SocketAddr = "10.0.0.5:8101".parse().unwrap();
623 admin.cluster_join(target).expect("plan");
624 admin.cluster_commit().expect("commit");
625 let pool = admin.pool();
626 let topology = pool.datacenters().read();
627 let dc1 = topology.iter().find(|d| d.name() == "dc1").expect("dc1");
628 let r1 = dc1.racks().iter().find(|r| r.name() == "r1").expect("r1");
629 let entries = r1.continuums();
633 let mut idxs: Vec<u32> = entries.iter().map(|e| e.peer_idx).collect();
634 idxs.sort_unstable();
635 idxs.dedup();
636 assert_eq!(idxs.len(), 3);
637 }
638}