Skip to main content

liminal_server/cluster/
sync.rs

1//! SRV-005 R5/R6: cross-node subscription propagation and message fan-out.
2//!
3//! Each channel maps to a beamr process group (the channel name interned into an
4//! atom in the default pg scope). A local subscriber joining a channel joins
5//! that pg group; beamr broadcasts the membership to connected peers, so every
6//! node knows which peers hold subscribers for which channel. Publishing then
7//! consults the group's REMOTE members and forwards the published envelope to
8//! each one over the existing distribution link.
9//!
10//! * R5 propagation: [`ClusterSync::on_subscribe`] -> `pg.join`; the broadcast is
11//!   beamr's, not ours.
12//! * R5 backfill: a fresh `pg.join` only broadcasts on the insert edge, so a peer
13//!   that joins the cluster AFTER our subscribers registered would never learn
14//!   them. [`ClusterSync::on_peer_join`] re-sends a pg-join control frame for each
15//!   of our local members directly to the newcomer.
16//! * R5 delivery: [`ClusterSync::on_publish`] sends the envelope (encoded by
17//!   [`liminal::channel::encode_envelope`]) as a beamr `SEND` to each remote
18//!   member's pid. On that member's home node the frame lands in the subscriber
19//!   process's mailbox, which decodes it back into its inbox.
20//! * R6 cleanup: when a peer drops, beamr's connection-down hook calls
21//!   `purge_remote_node`, so its remote members vanish from every group with no
22//!   liminal code on the path. [`ClusterSync::on_peer_leave`] only logs.
23
24use std::collections::HashMap;
25use std::sync::{Arc, Mutex};
26
27use beamr::atom::{Atom, AtomTable};
28use beamr::distribution::connection::ConnectionManager;
29use beamr::distribution::control::{encode_pg_update_frame, encode_send_frame};
30use beamr::distribution::pg::{PgRegistry, PgUpdate, RemoteMember};
31use beamr::native::ProcessContext;
32use beamr::term::Term;
33
34use crate::cluster::discovery::ClusterResolver;
35use liminal::channel::{ClusterObserver, encode_envelope};
36use liminal::envelope::Envelope;
37
38/// Propagates channel subscriptions and fans published messages out across the
39/// cluster via beamr process groups (SRV-005 R5/R6).
40#[derive(Clone)]
41pub struct ClusterSync {
42    inner: Arc<SyncInner>,
43}
44
45struct SyncInner {
46    pg: Arc<PgRegistry>,
47    atoms: Arc<AtomTable>,
48    connections: ConnectionManager,
49    /// This node's distribution atom — the node component every locally-joined
50    /// member carries on the wire.
51    local_node: Atom,
52    /// The same resolver the scheduler uses, retained so a future enhancement can
53    /// register learned peer addresses; held to keep the discovery/scheduler
54    /// resolver identity explicit at this seam.
55    _resolver: Arc<ClusterResolver>,
56    /// Local subscriptions, keyed by channel group atom, each holding the set of
57    /// local subscriber pids. The source of truth for R5 peer-join backfill.
58    local: Mutex<HashMap<Atom, Vec<u64>>>,
59}
60
61impl std::fmt::Debug for ClusterSync {
62    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        formatter
64            .debug_struct("ClusterSync")
65            .field("local_node", &self.inner.local_node)
66            .finish_non_exhaustive()
67    }
68}
69
70impl ClusterSync {
71    /// Builds a sync over the scheduler's pg registry and distribution
72    /// connections. `local_node` is this node's distribution atom (interned from
73    /// the configured node name in the SAME atom table).
74    #[must_use]
75    pub fn new(
76        pg: Arc<PgRegistry>,
77        atoms: Arc<AtomTable>,
78        connections: ConnectionManager,
79        local_node: Atom,
80        resolver: Arc<ClusterResolver>,
81    ) -> Self {
82        Self {
83            inner: Arc::new(SyncInner {
84                pg,
85                atoms,
86                connections,
87                local_node,
88                _resolver: resolver,
89                local: Mutex::new(HashMap::new()),
90            }),
91        }
92    }
93
94    /// The default pg scope atom.
95    fn scope(&self) -> Atom {
96        self.inner.pg.default_scope()
97    }
98
99    /// Interns a channel name into its pg group atom.
100    fn group(&self, channel: &str) -> Atom {
101        self.inner.atoms.intern(channel)
102    }
103
104    /// Remote members of `channel`'s group (for publish fan-out and tests).
105    #[must_use]
106    pub fn remote_targets(&self, channel: &str) -> Vec<RemoteMember> {
107        let group = self.group(channel);
108        self.inner.pg.remote_members(self.scope(), group)
109    }
110
111    /// Records a local subscriber pid under `group` for backfill.
112    fn record_local(&self, group: Atom, pid: u64) {
113        let mut local = self.lock_local();
114        let pids = local.entry(group).or_default();
115        if !pids.contains(&pid) {
116            pids.push(pid);
117        }
118        drop(local);
119    }
120
121    /// Forgets a local subscriber pid under `group`.
122    fn forget_local(&self, group: Atom, pid: u64) {
123        let mut local = self.lock_local();
124        if let Some(pids) = local.get_mut(&group) {
125            pids.retain(|candidate| *candidate != pid);
126            if pids.is_empty() {
127                local.remove(&group);
128            }
129        }
130    }
131
132    /// A snapshot of every local `(group, pid)` membership for backfill.
133    fn local_memberships(&self) -> Vec<(Atom, u64)> {
134        let local = self.lock_local();
135        local
136            .iter()
137            .flat_map(|(group, pids)| pids.iter().map(move |pid| (*group, *pid)))
138            .collect()
139    }
140
141    /// Sends an encoded envelope to one remote member's pid as a beamr `SEND`.
142    fn send_to_member(&self, member: RemoteMember, frame_bytes: &[u8]) {
143        // The SEND control targets the member's pid_number; on the member's home
144        // node that maps to the local subscriber process, whose mailbox receives
145        // the binary payload.
146        let Some(to_pid) = Term::try_pid(member.pid_number) else {
147            tracing::warn!(
148                pid_number = member.pid_number,
149                "remote member pid out of immediate range; skipping cross-node delivery"
150            );
151            return;
152        };
153        let mut context = ProcessContext::new();
154        let Ok(payload) = context.alloc_binary(frame_bytes) else {
155            tracing::warn!("failed to allocate cross-node envelope payload");
156            return;
157        };
158        let Ok(frame) = encode_send_frame(
159            Term::atom(beamr::atom::Atom::OK),
160            to_pid,
161            payload,
162            &self.inner.atoms,
163        ) else {
164            tracing::warn!("failed to encode cross-node send frame");
165            return;
166        };
167        self.write_frame(member.node, &frame);
168    }
169
170    /// Writes a pre-encoded distribution frame to `node`'s connection, if live.
171    fn write_frame(&self, node: Atom, frame: &[u8]) {
172        let Some(connection) = self.inner.connections.get_connection(node) else {
173            // No live link: the peer departed between snapshot and send, or its
174            // membership is stale. Nothing to do — R6 cleanup removes it shortly.
175            return;
176        };
177        write_raw_blocking(&connection, frame);
178    }
179
180    /// Re-sends a pg-join control frame for one local member to a single node
181    /// (R5 backfill). Mirrors the frame beamr broadcasts on a fresh `pg.join`,
182    /// but targeted at the newcomer only.
183    fn backfill_member(&self, node: Atom, group: Atom, pid: u64) {
184        let update = PgUpdate::Join {
185            scope: self.scope(),
186            group,
187            pid,
188        };
189        if let Ok(frame) = encode_pg_update_frame(update, self.inner.local_node, &self.inner.atoms)
190        {
191            self.write_frame(node, &frame);
192        } else {
193            tracing::warn!("failed to encode cluster backfill frame");
194        }
195    }
196
197    fn lock_local(&self) -> std::sync::MutexGuard<'_, HashMap<Atom, Vec<u64>>> {
198        self.inner
199            .local
200            .lock()
201            .unwrap_or_else(std::sync::PoisonError::into_inner)
202    }
203}
204
205impl ClusterObserver for ClusterSync {
206    fn on_subscribe(&self, channel: &str, subscriber_pid: u64) {
207        let group = self.group(channel);
208        // pg.join broadcasts the membership to connected peers (R5 propagation).
209        self.inner.pg.join(self.scope(), group, subscriber_pid);
210        self.record_local(group, subscriber_pid);
211        tracing::debug!(
212            channel = %channel,
213            pid = subscriber_pid,
214            "advertised local subscription to cluster"
215        );
216    }
217
218    fn on_unsubscribe(&self, channel: &str, subscriber_pid: u64) {
219        let group = self.group(channel);
220        self.inner.pg.leave(self.scope(), group, subscriber_pid);
221        self.forget_local(group, subscriber_pid);
222        tracing::debug!(
223            channel = %channel,
224            pid = subscriber_pid,
225            "withdrew local subscription from cluster"
226        );
227    }
228
229    fn on_publish(&self, channel: &str, envelope: &Envelope) {
230        let targets = self.remote_targets(channel);
231        if targets.is_empty() {
232            return;
233        }
234        let frame_bytes = encode_envelope(envelope);
235        for member in targets {
236            self.send_to_member(member, &frame_bytes);
237        }
238    }
239}
240
241impl ClusterSync {
242    /// R5 backfill: re-advertise every local subscription to a newly-joined peer.
243    pub fn on_peer_join(&self, node: Atom) {
244        for (group, pid) in self.local_memberships() {
245            self.backfill_member(node, group, pid);
246        }
247    }
248
249    /// R6 is automatic (beamr purges the departed node's remote members via its
250    /// connection-down hook). This logs the cleanup for operators.
251    pub fn on_peer_leave(&self, node: Atom) {
252        let name = self
253            .inner
254            .atoms
255            .resolve(node)
256            .map_or_else(|| format!("<atom {node:?}>"), str::to_owned);
257        tracing::info!(
258            peer = %name,
259            "peer departed; its remote subscriptions were purged by beamr"
260        );
261    }
262}
263
264/// Writes `frame` to `connection`, driving the async write to completion off any
265/// ambient runtime. The membership poll thread that calls this owns no tokio
266/// runtime, so a fresh current-thread runtime drives the single write — the same
267/// shape as beamr's own synchronous distribution-send bridge.
268fn write_raw_blocking(
269    connection: &Arc<beamr::distribution::connection::DistConnection>,
270    frame: &[u8],
271) {
272    let connection = Arc::clone(connection);
273    let frame = frame.to_vec();
274    let write = async move {
275        let _ = connection.write_raw(&frame).await;
276    };
277    if let Ok(handle) = tokio::runtime::Handle::try_current() {
278        if matches!(
279            handle.runtime_flavor(),
280            tokio::runtime::RuntimeFlavor::MultiThread
281        ) {
282            tokio::task::block_in_place(|| handle.block_on(write));
283            return;
284        }
285    }
286    match tokio::runtime::Builder::new_current_thread()
287        .enable_all()
288        .build()
289    {
290        Ok(runtime) => runtime.block_on(write),
291        Err(error) => tracing::warn!(error = %error, "failed to build cluster send runtime"),
292    }
293}
294
295#[cfg(test)]
296#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
297mod tests {
298    use super::ClusterSync;
299    use crate::cluster::discovery::ClusterResolver;
300    use beamr::atom::AtomTable;
301    use beamr::distribution::connection::ConnectionManager;
302    use beamr::distribution::pg::PgRegistry;
303    use beamr::distribution::resolver::StaticResolver;
304    use liminal::channel::ClusterObserver;
305    use std::collections::HashMap;
306    use std::sync::Arc;
307
308    fn sync_fixture() -> (ClusterSync, Arc<PgRegistry>, Arc<AtomTable>) {
309        let atoms = Arc::new(AtomTable::with_common_atoms());
310        let pg = Arc::new(PgRegistry::new(&atoms));
311        let connections = ConnectionManager::new(
312            Arc::clone(&atoms),
313            Arc::new(StaticResolver::new(HashMap::new())),
314            "test-cookie",
315            "local@127.0.0.1",
316            1,
317        );
318        let local_node = atoms.intern("local@127.0.0.1");
319        let resolver = Arc::new(ClusterResolver::new());
320        let sync = ClusterSync::new(
321            Arc::clone(&pg),
322            Arc::clone(&atoms),
323            connections,
324            local_node,
325            resolver,
326        );
327        (sync, pg, atoms)
328    }
329
330    #[test]
331    fn subscribe_joins_the_channel_pg_group() {
332        let (sync, pg, atoms) = sync_fixture();
333        sync.on_subscribe("orders", 42);
334        let group = atoms.intern("orders");
335        assert_eq!(pg.local_members(pg.default_scope(), group), vec![42]);
336    }
337
338    #[test]
339    fn unsubscribe_leaves_the_channel_pg_group() {
340        let (sync, pg, atoms) = sync_fixture();
341        sync.on_subscribe("orders", 42);
342        sync.on_unsubscribe("orders", 42);
343        let group = atoms.intern("orders");
344        assert!(pg.local_members(pg.default_scope(), group).is_empty());
345    }
346
347    #[test]
348    fn local_memberships_track_subscriptions_for_backfill() {
349        let (sync, _pg, _atoms) = sync_fixture();
350        sync.on_subscribe("orders", 1);
351        sync.on_subscribe("orders", 2);
352        sync.on_subscribe("events", 3);
353        let mut memberships = sync.local_memberships();
354        memberships.sort_by_key(|(group, pid)| (*group, *pid));
355        assert_eq!(memberships.len(), 3);
356        // After unsubscribing the only member of a group, the group is dropped.
357        sync.on_unsubscribe("events", 3);
358        let remaining = sync.local_memberships();
359        assert_eq!(remaining.len(), 2);
360        assert!(remaining.iter().all(|(_, pid)| *pid == 1 || *pid == 2));
361    }
362
363    #[test]
364    fn remote_targets_empty_without_remote_members() {
365        let (sync, _pg, _atoms) = sync_fixture();
366        sync.on_subscribe("orders", 1);
367        assert!(sync.remote_targets("orders").is_empty());
368    }
369
370    #[test]
371    fn remote_targets_reflect_applied_remote_joins() {
372        let (sync, pg, atoms) = sync_fixture();
373        let group = atoms.intern("orders");
374        let remote_node = atoms.intern("node-b@127.0.0.1");
375        pg.apply_remote_join(pg.default_scope(), group, remote_node, 99, 0);
376        let targets = sync.remote_targets("orders");
377        assert_eq!(targets.len(), 1);
378        assert_eq!(targets[0].node, remote_node);
379        assert_eq!(targets[0].pid_number, 99);
380    }
381}