Skip to main content

liminal_server/cluster/
membership.rs

1//! SRV-005 R2/R3/R4: cluster membership by POLLING beamr's connection table,
2//! plus the [`start`] entry point and the [`ClusterHandle`] that owns the
3//! cluster's background resources.
4//!
5//! ## Why polling, not the connection-down hook
6//!
7//! Beamr's connection manager has a SINGLE connection-down callback slot, and
8//! the scheduler already owns it: on node down it calls
9//! `PgRegistry::purge_remote_node`, which is exactly the R6 remote-subscription
10//! cleanup this cluster needs for free. Registering our own callback would
11//! REPLACE that one and break R6. So membership never touches the hook — it
12//! derives joins and departures by diffing successive snapshots of
13//! [`ConnectionManager::connected_nodes`]:
14//!
15//! * R2/R3 join: a peer appears in `connected_nodes()` after a successful
16//!   connect or accepted handshake; the first poll that sees it logs a join and
17//!   notifies sync (which backfills its local subscriptions to the newcomer).
18//! * R3 graceful leave / R4 failure: when a peer's TCP link drops, beamr removes
19//!   it from the table (and, via the scheduler's hook, purges its remote pg
20//!   members — R6). The next poll sees it gone, logs a departure, and notifies
21//!   sync. The two observers (our poll, beamr's hook) read the same table
22//!   independently with no contention.
23
24use std::collections::BTreeSet;
25use std::net::SocketAddr;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::{Arc, Mutex};
28use std::thread::JoinHandle;
29use std::time::Duration;
30
31use beamr::atom::{Atom, AtomTable};
32use beamr::distribution::connection::{AcceptHandle, ConnectionManager};
33use beamr::scheduler::Scheduler;
34
35use crate::ServerError;
36use crate::cluster::discovery::{self, ClusterResolver};
37use crate::cluster::sync::ClusterSync;
38use crate::config::types::ClusterConfig;
39
40/// Interval between membership polls. Node-down handling for R6 does NOT depend
41/// on this cadence (beamr's own hook drives the pg purge synchronously on the
42/// drop); the poll only drives membership logging and R5 peer-join backfill, so
43/// a sub-second cadence keeps the cluster view fresh without busy-spinning.
44const POLL_INTERVAL: Duration = Duration::from_millis(250);
45
46/// A membership transition computed by diffing two connection snapshots.
47#[derive(Clone, Debug, Default, PartialEq, Eq)]
48pub struct MembershipDelta {
49    /// Peers that appeared since the previous snapshot.
50    pub joined: Vec<Atom>,
51    /// Peers that disappeared since the previous snapshot.
52    pub left: Vec<Atom>,
53}
54
55impl MembershipDelta {
56    /// True when no peer joined or left.
57    #[must_use]
58    pub fn is_empty(&self) -> bool {
59        self.joined.is_empty() && self.left.is_empty()
60    }
61}
62
63/// Tracks cluster peers by polling the distribution connection table.
64#[derive(Clone)]
65pub struct Membership {
66    inner: Arc<MembershipInner>,
67}
68
69struct MembershipInner {
70    connections: ConnectionManager,
71    atoms: Arc<AtomTable>,
72    peers: Mutex<BTreeSet<Atom>>,
73}
74
75impl std::fmt::Debug for Membership {
76    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        formatter
78            .debug_struct("Membership")
79            .field("peer_count", &self.peers().len())
80            .finish()
81    }
82}
83
84impl Membership {
85    /// Creates a membership tracker over `connections` with an empty peer set.
86    #[must_use]
87    pub fn new(connections: ConnectionManager, atoms: Arc<AtomTable>) -> Self {
88        Self {
89            inner: Arc::new(MembershipInner {
90                connections,
91                atoms,
92                peers: Mutex::new(BTreeSet::new()),
93            }),
94        }
95    }
96
97    /// The currently-tracked peers, sorted by atom index.
98    #[must_use]
99    pub fn peers(&self) -> Vec<Atom> {
100        self.lock_peers().iter().copied().collect()
101    }
102
103    /// The currently-tracked peers as resolved node-name strings.
104    #[must_use]
105    pub fn peer_names(&self) -> Vec<String> {
106        self.peers()
107            .into_iter()
108            .filter_map(|peer| self.inner.atoms.resolve(peer).map(str::to_owned))
109            .collect()
110    }
111
112    /// Polls the connection table once, updates the tracked peer set, and returns
113    /// the join/leave delta since the previous poll.
114    #[must_use]
115    pub fn poll_once(&self) -> MembershipDelta {
116        let current: BTreeSet<Atom> = self
117            .inner
118            .connections
119            .connected_nodes()
120            .into_iter()
121            .collect();
122        let mut tracked = self.lock_peers();
123        let joined: Vec<Atom> = current.difference(&tracked).copied().collect();
124        let left: Vec<Atom> = tracked.difference(&current).copied().collect();
125        *tracked = current;
126        drop(tracked);
127        MembershipDelta { joined, left }
128    }
129
130    fn name(&self, peer: Atom) -> String {
131        self.inner
132            .atoms
133            .resolve(peer)
134            .map_or_else(|| format!("<atom {peer:?}>"), str::to_owned)
135    }
136
137    fn lock_peers(&self) -> std::sync::MutexGuard<'_, BTreeSet<Atom>> {
138        self.inner
139            .peers
140            .lock()
141            .unwrap_or_else(std::sync::PoisonError::into_inner)
142    }
143}
144
145/// Owns the cluster's live background resources. Dropping it stops the membership
146/// poll loop and tears down the inbound distribution listener.
147pub struct ClusterHandle {
148    accept: AcceptHandle,
149    poll: Option<PollLoop>,
150    membership: Membership,
151    /// The runtime that drove cluster bring-up and that the inbound accept loop
152    /// keeps running on. It MUST outlive the listener: the accept and per-link
153    /// read tasks are spawned onto this runtime's handle, so dropping it would
154    /// abort them and silently stop accepting peers. Kept here so it lives for
155    /// the cluster's whole lifetime. Dropped last (fields drop in declaration
156    /// order) so the listener and poll loop wind down before the runtime does.
157    _runtime: Arc<tokio::runtime::Runtime>,
158}
159
160impl std::fmt::Debug for ClusterHandle {
161    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        formatter
163            .debug_struct("ClusterHandle")
164            .field("listen_addr", &self.accept.local_addr())
165            .field("membership", &self.membership)
166            .finish_non_exhaustive()
167    }
168}
169
170impl ClusterHandle {
171    /// The address the distribution listener bound for inbound peer links.
172    #[must_use]
173    pub fn listen_addr(&self) -> SocketAddr {
174        self.accept.local_addr()
175    }
176
177    /// The membership tracker, for inspection and tests.
178    #[must_use]
179    pub const fn membership(&self) -> &Membership {
180        &self.membership
181    }
182
183    /// Stops the poll loop and the inbound listener. Idempotent.
184    pub fn shutdown(&mut self) {
185        if let Some(poll) = self.poll.take() {
186            poll.stop();
187        }
188        self.accept.shutdown();
189    }
190}
191
192impl Drop for ClusterHandle {
193    fn drop(&mut self) {
194        self.shutdown();
195    }
196}
197
198/// The background membership poll thread and its stop flag.
199struct PollLoop {
200    stop: Arc<AtomicBool>,
201    handle: Option<JoinHandle<()>>,
202}
203
204impl PollLoop {
205    fn start(membership: Membership, sync: ClusterSync) -> Self {
206        let stop = Arc::new(AtomicBool::new(false));
207        let stop_for_thread = Arc::clone(&stop);
208        let handle = std::thread::Builder::new()
209            .name("liminal-cluster-membership".to_owned())
210            .spawn(move || {
211                run_poll_loop(&membership, &sync, &stop_for_thread);
212            })
213            .ok();
214        Self { stop, handle }
215    }
216
217    fn stop(mut self) {
218        self.stop.store(true, Ordering::SeqCst);
219        if let Some(handle) = self.handle.take() {
220            let _ = handle.join();
221        }
222    }
223}
224
225fn run_poll_loop(membership: &Membership, sync: &ClusterSync, stop: &AtomicBool) {
226    while !stop.load(Ordering::SeqCst) {
227        apply_delta(membership, sync, membership.poll_once());
228        std::thread::sleep(POLL_INTERVAL);
229    }
230}
231
232/// Logs and dispatches a single membership delta (R3/R4/R5).
233fn apply_delta(membership: &Membership, sync: &ClusterSync, delta: MembershipDelta) {
234    for peer in delta.joined {
235        let name = membership.name(peer);
236        tracing::info!(peer = %name, peers = ?membership.peer_names(), "cluster peer joined");
237        // R5: re-advertise our local subscriptions to the newcomer — a fresh
238        // pg.join only broadcasts on the insert edge, so a node that joins after
239        // our subscribers already registered would otherwise never learn them.
240        sync.on_peer_join(peer);
241    }
242    for peer in delta.left {
243        let name = membership.name(peer);
244        // R4: a lost peer is a warning; R6 cleanup of its remote pg members has
245        // already happened via beamr's connection-down hook (purge_remote_node).
246        tracing::warn!(peer = %name, peers = ?membership.peer_names(), "cluster peer left");
247        sync.on_peer_leave(peer);
248    }
249}
250
251/// Starts clustering on the channel-supervisor `scheduler` (SRV-005).
252///
253/// Steps, in order:
254/// 1. Bind the inbound distribution listener (so peers can dial us) BEFORE we
255///    dial seeds, mirroring beamr's own bring-up order.
256/// 2. Dial each configured seed (R1); an unreachable seed is non-fatal, but if
257///    seeds were configured and none was reachable we return
258///    [`ServerError::ClusterJoin`].
259/// 3. Build the membership tracker and the subscription sync, install sync as
260///    the channel-supervisor's observer, and spawn the membership poll loop.
261///
262/// `resolver` MUST be the same [`ClusterResolver`] handed to the scheduler's
263/// `DistributionConfig` (so handshake-learned names resolve everywhere).
264///
265/// # Errors
266/// Returns [`ServerError::ClusterJoin`] when the listener cannot bind or when no
267/// configured seed was reachable.
268pub fn start(
269    scheduler: &Arc<Scheduler>,
270    resolver: Arc<ClusterResolver>,
271    config: &ClusterConfig,
272    install_observer: impl FnOnce(ClusterSync),
273) -> Result<ClusterHandle, ServerError> {
274    let connections = scheduler.distribution_connections();
275    let atoms = Arc::clone(scheduler.atom_table());
276    let pg = scheduler.pg_registry();
277    let local_node = atoms.intern(&config.node_name);
278
279    // Register a synthetic dial label per seed onto the SHARED resolver the
280    // scheduler already uses, so seed dialing resolves on that same instance.
281    let labels = discovery::register_seed_labels(&resolver, &config.seed_nodes);
282
283    // A multi-thread runtime that drives cluster bring-up AND stays alive for the
284    // cluster's lifetime: the inbound accept loop and the per-link read tasks are
285    // spawned onto this runtime, so it must outlive the listener. A current-thread
286    // runtime would also deadlock the bring-up handshake (the outbound connect and
287    // the inbound accept must interleave reads/writes concurrently).
288    let runtime = Arc::new(
289        tokio::runtime::Builder::new_multi_thread()
290            .worker_threads(2)
291            .enable_all()
292            .build()
293            .map_err(|error| ServerError::ClusterJoin {
294                message: format!("failed to build cluster runtime: {error}"),
295            })?,
296    );
297    // Bind this runtime to the distribution connection manager so the accept and
298    // read lifecycle tasks run on it (and survive for the cluster's lifetime),
299    // rather than on any transient ambient runtime.
300    connections.set_runtime_handle(runtime.handle().clone());
301
302    let accept = runtime
303        .block_on(scheduler.start_distribution_listener(config.listen_address))
304        .map_err(|error| ServerError::ClusterJoin {
305            message: format!(
306                "failed to bind cluster distribution listener on {}: {error}",
307                config.listen_address
308            ),
309        })?;
310
311    let outcome = runtime.block_on(discovery::connect_seeds(
312        &connections,
313        &resolver,
314        &atoms,
315        &labels,
316    ));
317    if !outcome.is_satisfied() {
318        return Err(ServerError::ClusterJoin {
319            message: format!(
320                "no configured seed node was reachable ({} attempted)",
321                outcome.attempted
322            ),
323        });
324    }
325
326    let membership = Membership::new(connections.clone(), Arc::clone(&atoms));
327    let sync = ClusterSync::new(pg, Arc::clone(&atoms), connections, local_node, resolver);
328    install_observer(sync.clone());
329
330    // Seed the tracked set from the connections established during discovery and
331    // log the initial membership (R2), backfilling our state to each peer.
332    apply_delta(&membership, &sync, membership.poll_once());
333    tracing::info!(
334        node_name = %config.node_name,
335        peers = ?membership.peer_names(),
336        "cluster membership established"
337    );
338
339    let poll = PollLoop::start(membership.clone(), sync);
340    Ok(ClusterHandle {
341        accept,
342        poll: Some(poll),
343        membership,
344        _runtime: runtime,
345    })
346}
347
348#[cfg(test)]
349#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
350mod tests {
351    use super::{Membership, MembershipDelta};
352    use beamr::atom::AtomTable;
353    use beamr::distribution::connection::ConnectionManager;
354    use beamr::distribution::resolver::StaticResolver;
355    use std::collections::HashMap;
356    use std::sync::Arc;
357
358    fn empty_manager(atoms: &Arc<AtomTable>) -> ConnectionManager {
359        ConnectionManager::new(
360            Arc::clone(atoms),
361            Arc::new(StaticResolver::new(HashMap::new())),
362            "test-cookie",
363            "local@127.0.0.1",
364            1,
365        )
366    }
367
368    #[test]
369    fn delta_is_empty_by_default() {
370        assert!(MembershipDelta::default().is_empty());
371    }
372
373    #[test]
374    fn first_poll_of_empty_table_yields_no_peers() {
375        let atoms = Arc::new(AtomTable::with_common_atoms());
376        let membership = Membership::new(empty_manager(&atoms), Arc::clone(&atoms));
377        let delta = membership.poll_once();
378        assert!(delta.is_empty());
379        assert!(membership.peers().is_empty());
380    }
381
382    #[test]
383    fn peer_names_resolve_through_the_atom_table() {
384        let atoms = Arc::new(AtomTable::with_common_atoms());
385        let membership = Membership::new(empty_manager(&atoms), Arc::clone(&atoms));
386        // No connections, so no names — but the accessor must not panic.
387        assert!(membership.peer_names().is_empty());
388    }
389}