1use std::collections::BTreeSet;
25use std::net::SocketAddr;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::{Arc, Mutex};
28use std::thread::JoinHandle;
29use std::time::Duration;
30
31use beamr::atom::{Atom, AtomTable};
32use beamr::distribution::connection::{AcceptHandle, ConnectionManager};
33use beamr::scheduler::Scheduler;
34
35use crate::ServerError;
36use crate::cluster::discovery::{self, ClusterResolver};
37use crate::cluster::sync::ClusterSync;
38use crate::config::types::ClusterConfig;
39
40const POLL_INTERVAL: Duration = Duration::from_millis(250);
45
46#[derive(Clone, Debug, Default, PartialEq, Eq)]
48pub struct MembershipDelta {
49 pub joined: Vec<Atom>,
51 pub left: Vec<Atom>,
53}
54
55impl MembershipDelta {
56 #[must_use]
58 pub fn is_empty(&self) -> bool {
59 self.joined.is_empty() && self.left.is_empty()
60 }
61}
62
63#[derive(Clone)]
65pub struct Membership {
66 inner: Arc<MembershipInner>,
67}
68
69struct MembershipInner {
70 connections: ConnectionManager,
71 atoms: Arc<AtomTable>,
72 peers: Mutex<BTreeSet<Atom>>,
73}
74
75impl std::fmt::Debug for Membership {
76 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 formatter
78 .debug_struct("Membership")
79 .field("peer_count", &self.peers().len())
80 .finish()
81 }
82}
83
84impl Membership {
85 #[must_use]
87 pub fn new(connections: ConnectionManager, atoms: Arc<AtomTable>) -> Self {
88 Self {
89 inner: Arc::new(MembershipInner {
90 connections,
91 atoms,
92 peers: Mutex::new(BTreeSet::new()),
93 }),
94 }
95 }
96
97 #[must_use]
99 pub fn peers(&self) -> Vec<Atom> {
100 self.lock_peers().iter().copied().collect()
101 }
102
103 #[must_use]
105 pub fn peer_names(&self) -> Vec<String> {
106 self.peers()
107 .into_iter()
108 .filter_map(|peer| self.inner.atoms.resolve(peer).map(str::to_owned))
109 .collect()
110 }
111
112 #[must_use]
115 pub fn poll_once(&self) -> MembershipDelta {
116 let current: BTreeSet<Atom> = self
117 .inner
118 .connections
119 .connected_nodes()
120 .into_iter()
121 .collect();
122 let mut tracked = self.lock_peers();
123 let joined: Vec<Atom> = current.difference(&tracked).copied().collect();
124 let left: Vec<Atom> = tracked.difference(¤t).copied().collect();
125 *tracked = current;
126 drop(tracked);
127 MembershipDelta { joined, left }
128 }
129
130 fn name(&self, peer: Atom) -> String {
131 self.inner
132 .atoms
133 .resolve(peer)
134 .map_or_else(|| format!("<atom {peer:?}>"), str::to_owned)
135 }
136
137 fn lock_peers(&self) -> std::sync::MutexGuard<'_, BTreeSet<Atom>> {
138 self.inner
139 .peers
140 .lock()
141 .unwrap_or_else(std::sync::PoisonError::into_inner)
142 }
143}
144
145pub struct ClusterHandle {
148 accept: AcceptHandle,
149 poll: Option<PollLoop>,
150 membership: Membership,
151 _runtime: Arc<tokio::runtime::Runtime>,
158}
159
160impl std::fmt::Debug for ClusterHandle {
161 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 formatter
163 .debug_struct("ClusterHandle")
164 .field("listen_addr", &self.accept.local_addr())
165 .field("membership", &self.membership)
166 .finish_non_exhaustive()
167 }
168}
169
170impl ClusterHandle {
171 #[must_use]
173 pub fn listen_addr(&self) -> SocketAddr {
174 self.accept.local_addr()
175 }
176
177 #[must_use]
179 pub const fn membership(&self) -> &Membership {
180 &self.membership
181 }
182
183 pub fn shutdown(&mut self) {
185 if let Some(poll) = self.poll.take() {
186 poll.stop();
187 }
188 self.accept.shutdown();
189 }
190}
191
192impl Drop for ClusterHandle {
193 fn drop(&mut self) {
194 self.shutdown();
195 }
196}
197
198struct PollLoop {
200 stop: Arc<AtomicBool>,
201 handle: Option<JoinHandle<()>>,
202}
203
204impl PollLoop {
205 fn start(membership: Membership, sync: ClusterSync) -> Self {
206 let stop = Arc::new(AtomicBool::new(false));
207 let stop_for_thread = Arc::clone(&stop);
208 let handle = std::thread::Builder::new()
209 .name("liminal-cluster-membership".to_owned())
210 .spawn(move || {
211 run_poll_loop(&membership, &sync, &stop_for_thread);
212 })
213 .ok();
214 Self { stop, handle }
215 }
216
217 fn stop(mut self) {
218 self.stop.store(true, Ordering::SeqCst);
219 if let Some(handle) = self.handle.take() {
220 let _ = handle.join();
221 }
222 }
223}
224
225fn run_poll_loop(membership: &Membership, sync: &ClusterSync, stop: &AtomicBool) {
226 while !stop.load(Ordering::SeqCst) {
227 apply_delta(membership, sync, membership.poll_once());
228 std::thread::sleep(POLL_INTERVAL);
229 }
230}
231
232fn apply_delta(membership: &Membership, sync: &ClusterSync, delta: MembershipDelta) {
234 for peer in delta.joined {
235 let name = membership.name(peer);
236 tracing::info!(peer = %name, peers = ?membership.peer_names(), "cluster peer joined");
237 sync.on_peer_join(peer);
241 }
242 for peer in delta.left {
243 let name = membership.name(peer);
244 tracing::warn!(peer = %name, peers = ?membership.peer_names(), "cluster peer left");
247 sync.on_peer_leave(peer);
248 }
249}
250
251pub fn start(
269 scheduler: &Arc<Scheduler>,
270 resolver: Arc<ClusterResolver>,
271 config: &ClusterConfig,
272 install_observer: impl FnOnce(ClusterSync),
273) -> Result<ClusterHandle, ServerError> {
274 let connections = scheduler.distribution_connections();
275 let atoms = Arc::clone(scheduler.atom_table());
276 let pg = scheduler.pg_registry();
277 let local_node = atoms.intern(&config.node_name);
278
279 let labels = discovery::register_seed_labels(&resolver, &config.seed_nodes);
282
283 let runtime = Arc::new(
289 tokio::runtime::Builder::new_multi_thread()
290 .worker_threads(2)
291 .enable_all()
292 .build()
293 .map_err(|error| ServerError::ClusterJoin {
294 message: format!("failed to build cluster runtime: {error}"),
295 })?,
296 );
297 connections.set_runtime_handle(runtime.handle().clone());
301
302 let accept = runtime
303 .block_on(scheduler.start_distribution_listener(config.listen_address))
304 .map_err(|error| ServerError::ClusterJoin {
305 message: format!(
306 "failed to bind cluster distribution listener on {}: {error}",
307 config.listen_address
308 ),
309 })?;
310
311 let outcome = runtime.block_on(discovery::connect_seeds(
312 &connections,
313 &resolver,
314 &atoms,
315 &labels,
316 ));
317 if !outcome.is_satisfied() {
318 return Err(ServerError::ClusterJoin {
319 message: format!(
320 "no configured seed node was reachable ({} attempted)",
321 outcome.attempted
322 ),
323 });
324 }
325
326 let membership = Membership::new(connections.clone(), Arc::clone(&atoms));
327 let sync = ClusterSync::new(pg, Arc::clone(&atoms), connections, local_node, resolver);
328 install_observer(sync.clone());
329
330 apply_delta(&membership, &sync, membership.poll_once());
333 tracing::info!(
334 node_name = %config.node_name,
335 peers = ?membership.peer_names(),
336 "cluster membership established"
337 );
338
339 let poll = PollLoop::start(membership.clone(), sync);
340 Ok(ClusterHandle {
341 accept,
342 poll: Some(poll),
343 membership,
344 _runtime: runtime,
345 })
346}
347
348#[cfg(test)]
349#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
350mod tests {
351 use super::{Membership, MembershipDelta};
352 use beamr::atom::AtomTable;
353 use beamr::distribution::connection::ConnectionManager;
354 use beamr::distribution::resolver::StaticResolver;
355 use std::collections::HashMap;
356 use std::sync::Arc;
357
358 fn empty_manager(atoms: &Arc<AtomTable>) -> ConnectionManager {
359 ConnectionManager::new(
360 Arc::clone(atoms),
361 Arc::new(StaticResolver::new(HashMap::new())),
362 "test-cookie",
363 "local@127.0.0.1",
364 1,
365 )
366 }
367
368 #[test]
369 fn delta_is_empty_by_default() {
370 assert!(MembershipDelta::default().is_empty());
371 }
372
373 #[test]
374 fn first_poll_of_empty_table_yields_no_peers() {
375 let atoms = Arc::new(AtomTable::with_common_atoms());
376 let membership = Membership::new(empty_manager(&atoms), Arc::clone(&atoms));
377 let delta = membership.poll_once();
378 assert!(delta.is_empty());
379 assert!(membership.peers().is_empty());
380 }
381
382 #[test]
383 fn peer_names_resolve_through_the_atom_table() {
384 let atoms = Arc::new(AtomTable::with_common_atoms());
385 let membership = Membership::new(empty_manager(&atoms), Arc::clone(&atoms));
386 assert!(membership.peer_names().is_empty());
388 }
389}