use std::collections::BTreeSet;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::time::Duration;
use beamr::atom::{Atom, AtomTable};
use beamr::distribution::connection::{AcceptHandle, ConnectionManager};
use beamr::scheduler::Scheduler;
use crate::ServerError;
use crate::cluster::discovery::{self, ClusterResolver};
use crate::cluster::sync::ClusterSync;
use crate::config::types::ClusterConfig;
const POLL_INTERVAL: Duration = Duration::from_millis(250);
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct MembershipDelta {
pub joined: Vec<Atom>,
pub left: Vec<Atom>,
}
impl MembershipDelta {
#[must_use]
pub fn is_empty(&self) -> bool {
self.joined.is_empty() && self.left.is_empty()
}
}
#[derive(Clone)]
pub struct Membership {
inner: Arc<MembershipInner>,
}
struct MembershipInner {
connections: ConnectionManager,
atoms: Arc<AtomTable>,
peers: Mutex<BTreeSet<Atom>>,
}
impl std::fmt::Debug for Membership {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("Membership")
.field("peer_count", &self.peers().len())
.finish()
}
}
impl Membership {
#[must_use]
pub fn new(connections: ConnectionManager, atoms: Arc<AtomTable>) -> Self {
Self {
inner: Arc::new(MembershipInner {
connections,
atoms,
peers: Mutex::new(BTreeSet::new()),
}),
}
}
#[must_use]
pub fn peers(&self) -> Vec<Atom> {
self.lock_peers().iter().copied().collect()
}
#[must_use]
pub fn peer_names(&self) -> Vec<String> {
self.peers()
.into_iter()
.filter_map(|peer| self.inner.atoms.resolve(peer).map(str::to_owned))
.collect()
}
#[must_use]
pub fn poll_once(&self) -> MembershipDelta {
let current: BTreeSet<Atom> = self
.inner
.connections
.connected_nodes()
.into_iter()
.collect();
let mut tracked = self.lock_peers();
let joined: Vec<Atom> = current.difference(&tracked).copied().collect();
let left: Vec<Atom> = tracked.difference(¤t).copied().collect();
*tracked = current;
drop(tracked);
MembershipDelta { joined, left }
}
fn name(&self, peer: Atom) -> String {
self.inner
.atoms
.resolve(peer)
.map_or_else(|| format!("<atom {peer:?}>"), str::to_owned)
}
fn lock_peers(&self) -> std::sync::MutexGuard<'_, BTreeSet<Atom>> {
self.inner
.peers
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
}
pub struct ClusterHandle {
accept: AcceptHandle,
poll: Option<PollLoop>,
membership: Membership,
_runtime: Arc<tokio::runtime::Runtime>,
}
impl std::fmt::Debug for ClusterHandle {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("ClusterHandle")
.field("listen_addr", &self.accept.local_addr())
.field("membership", &self.membership)
.finish_non_exhaustive()
}
}
impl ClusterHandle {
#[must_use]
pub fn listen_addr(&self) -> SocketAddr {
self.accept.local_addr()
}
#[must_use]
pub const fn membership(&self) -> &Membership {
&self.membership
}
pub fn shutdown(&mut self) {
if let Some(poll) = self.poll.take() {
poll.stop();
}
self.accept.shutdown();
}
}
impl Drop for ClusterHandle {
fn drop(&mut self) {
self.shutdown();
}
}
struct PollLoop {
stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}
impl PollLoop {
fn start(membership: Membership, sync: ClusterSync) -> Self {
let stop = Arc::new(AtomicBool::new(false));
let stop_for_thread = Arc::clone(&stop);
let handle = std::thread::Builder::new()
.name("liminal-cluster-membership".to_owned())
.spawn(move || {
run_poll_loop(&membership, &sync, &stop_for_thread);
})
.ok();
Self { stop, handle }
}
fn stop(mut self) {
self.stop.store(true, Ordering::SeqCst);
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}
fn run_poll_loop(membership: &Membership, sync: &ClusterSync, stop: &AtomicBool) {
while !stop.load(Ordering::SeqCst) {
apply_delta(membership, sync, membership.poll_once());
std::thread::sleep(POLL_INTERVAL);
}
}
fn apply_delta(membership: &Membership, sync: &ClusterSync, delta: MembershipDelta) {
for peer in delta.joined {
let name = membership.name(peer);
tracing::info!(peer = %name, peers = ?membership.peer_names(), "cluster peer joined");
sync.on_peer_join(peer);
}
for peer in delta.left {
let name = membership.name(peer);
tracing::warn!(peer = %name, peers = ?membership.peer_names(), "cluster peer left");
sync.on_peer_leave(peer);
}
}
pub fn start(
scheduler: &Arc<Scheduler>,
resolver: Arc<ClusterResolver>,
config: &ClusterConfig,
install_observer: impl FnOnce(ClusterSync),
) -> Result<ClusterHandle, ServerError> {
let connections = scheduler.distribution_connections();
let atoms = Arc::clone(scheduler.atom_table());
let pg = scheduler.pg_registry();
let local_node = atoms.intern(&config.node_name);
let labels = discovery::register_seed_labels(&resolver, &config.seed_nodes);
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.map_err(|error| ServerError::ClusterJoin {
message: format!("failed to build cluster runtime: {error}"),
})?,
);
connections.set_runtime_handle(runtime.handle().clone());
let accept = runtime
.block_on(scheduler.start_distribution_listener(config.listen_address))
.map_err(|error| ServerError::ClusterJoin {
message: format!(
"failed to bind cluster distribution listener on {}: {error}",
config.listen_address
),
})?;
let outcome = runtime.block_on(discovery::connect_seeds(
&connections,
&resolver,
&atoms,
&labels,
));
if !outcome.is_satisfied() {
return Err(ServerError::ClusterJoin {
message: format!(
"no configured seed node was reachable ({} attempted)",
outcome.attempted
),
});
}
let membership = Membership::new(connections.clone(), Arc::clone(&atoms));
let sync = ClusterSync::new(pg, Arc::clone(&atoms), connections, local_node, resolver);
install_observer(sync.clone());
apply_delta(&membership, &sync, membership.poll_once());
tracing::info!(
node_name = %config.node_name,
peers = ?membership.peer_names(),
"cluster membership established"
);
let poll = PollLoop::start(membership.clone(), sync);
Ok(ClusterHandle {
accept,
poll: Some(poll),
membership,
_runtime: runtime,
})
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::{Membership, MembershipDelta};
use beamr::atom::AtomTable;
use beamr::distribution::connection::ConnectionManager;
use beamr::distribution::resolver::StaticResolver;
use std::collections::HashMap;
use std::sync::Arc;
fn empty_manager(atoms: &Arc<AtomTable>) -> ConnectionManager {
ConnectionManager::new(
Arc::clone(atoms),
Arc::new(StaticResolver::new(HashMap::new())),
"test-cookie",
"local@127.0.0.1",
1,
)
}
#[test]
fn delta_is_empty_by_default() {
assert!(MembershipDelta::default().is_empty());
}
#[test]
fn first_poll_of_empty_table_yields_no_peers() {
let atoms = Arc::new(AtomTable::with_common_atoms());
let membership = Membership::new(empty_manager(&atoms), Arc::clone(&atoms));
let delta = membership.poll_once();
assert!(delta.is_empty());
assert!(membership.peers().is_empty());
}
#[test]
fn peer_names_resolve_through_the_atom_table() {
let atoms = Arc::new(AtomTable::with_common_atoms());
let membership = Membership::new(empty_manager(&atoms), Arc::clone(&atoms));
assert!(membership.peer_names().is_empty());
}
}