1use std::collections::HashMap;
25use std::sync::{Arc, Mutex};
26
27use beamr::atom::{Atom, AtomTable};
28use beamr::distribution::connection::ConnectionManager;
29use beamr::distribution::control::{encode_pg_update_frame, encode_send_frame};
30use beamr::distribution::pg::{PgRegistry, PgUpdate, RemoteMember};
31use beamr::native::ProcessContext;
32use beamr::term::Term;
33
34use crate::cluster::discovery::ClusterResolver;
35use liminal::channel::{ClusterObserver, encode_envelope};
36use liminal::envelope::Envelope;
37
38#[derive(Clone)]
41pub struct ClusterSync {
42 inner: Arc<SyncInner>,
43}
44
45struct SyncInner {
46 pg: Arc<PgRegistry>,
47 atoms: Arc<AtomTable>,
48 connections: ConnectionManager,
49 local_node: Atom,
52 _resolver: Arc<ClusterResolver>,
56 local: Mutex<HashMap<Atom, Vec<u64>>>,
59}
60
61impl std::fmt::Debug for ClusterSync {
62 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 formatter
64 .debug_struct("ClusterSync")
65 .field("local_node", &self.inner.local_node)
66 .finish_non_exhaustive()
67 }
68}
69
70impl ClusterSync {
71 #[must_use]
75 pub fn new(
76 pg: Arc<PgRegistry>,
77 atoms: Arc<AtomTable>,
78 connections: ConnectionManager,
79 local_node: Atom,
80 resolver: Arc<ClusterResolver>,
81 ) -> Self {
82 Self {
83 inner: Arc::new(SyncInner {
84 pg,
85 atoms,
86 connections,
87 local_node,
88 _resolver: resolver,
89 local: Mutex::new(HashMap::new()),
90 }),
91 }
92 }
93
94 fn scope(&self) -> Atom {
96 self.inner.pg.default_scope()
97 }
98
99 fn group(&self, channel: &str) -> Atom {
101 self.inner.atoms.intern(channel)
102 }
103
104 #[must_use]
106 pub fn remote_targets(&self, channel: &str) -> Vec<RemoteMember> {
107 let group = self.group(channel);
108 self.inner.pg.remote_members(self.scope(), group)
109 }
110
111 fn record_local(&self, group: Atom, pid: u64) {
113 let mut local = self.lock_local();
114 let pids = local.entry(group).or_default();
115 if !pids.contains(&pid) {
116 pids.push(pid);
117 }
118 drop(local);
119 }
120
121 fn forget_local(&self, group: Atom, pid: u64) {
123 let mut local = self.lock_local();
124 if let Some(pids) = local.get_mut(&group) {
125 pids.retain(|candidate| *candidate != pid);
126 if pids.is_empty() {
127 local.remove(&group);
128 }
129 }
130 }
131
132 fn local_memberships(&self) -> Vec<(Atom, u64)> {
134 let local = self.lock_local();
135 local
136 .iter()
137 .flat_map(|(group, pids)| pids.iter().map(move |pid| (*group, *pid)))
138 .collect()
139 }
140
141 fn send_to_member(&self, member: RemoteMember, frame_bytes: &[u8]) {
143 let Some(to_pid) = Term::try_pid(member.pid_number) else {
147 tracing::warn!(
148 pid_number = member.pid_number,
149 "remote member pid out of immediate range; skipping cross-node delivery"
150 );
151 return;
152 };
153 let mut context = ProcessContext::new();
154 let Ok(payload) = context.alloc_binary(frame_bytes) else {
155 tracing::warn!("failed to allocate cross-node envelope payload");
156 return;
157 };
158 let Ok(frame) = encode_send_frame(
159 Term::atom(beamr::atom::Atom::OK),
160 to_pid,
161 payload,
162 &self.inner.atoms,
163 ) else {
164 tracing::warn!("failed to encode cross-node send frame");
165 return;
166 };
167 self.write_frame(member.node, &frame);
168 }
169
170 fn write_frame(&self, node: Atom, frame: &[u8]) {
172 let Some(connection) = self.inner.connections.get_connection(node) else {
173 return;
176 };
177 write_raw_blocking(&connection, frame);
178 }
179
180 fn backfill_member(&self, node: Atom, group: Atom, pid: u64) {
184 let update = PgUpdate::Join {
185 scope: self.scope(),
186 group,
187 pid,
188 };
189 if let Ok(frame) = encode_pg_update_frame(update, self.inner.local_node, &self.inner.atoms)
190 {
191 self.write_frame(node, &frame);
192 } else {
193 tracing::warn!("failed to encode cluster backfill frame");
194 }
195 }
196
197 fn lock_local(&self) -> std::sync::MutexGuard<'_, HashMap<Atom, Vec<u64>>> {
198 self.inner
199 .local
200 .lock()
201 .unwrap_or_else(std::sync::PoisonError::into_inner)
202 }
203}
204
205impl ClusterObserver for ClusterSync {
206 fn on_subscribe(&self, channel: &str, subscriber_pid: u64) {
207 let group = self.group(channel);
208 self.inner.pg.join(self.scope(), group, subscriber_pid);
210 self.record_local(group, subscriber_pid);
211 tracing::debug!(
212 channel = %channel,
213 pid = subscriber_pid,
214 "advertised local subscription to cluster"
215 );
216 }
217
218 fn on_unsubscribe(&self, channel: &str, subscriber_pid: u64) {
219 let group = self.group(channel);
220 self.inner.pg.leave(self.scope(), group, subscriber_pid);
221 self.forget_local(group, subscriber_pid);
222 tracing::debug!(
223 channel = %channel,
224 pid = subscriber_pid,
225 "withdrew local subscription from cluster"
226 );
227 }
228
229 fn on_publish(&self, channel: &str, envelope: &Envelope) {
230 let targets = self.remote_targets(channel);
231 if targets.is_empty() {
232 return;
233 }
234 let frame_bytes = encode_envelope(envelope);
235 for member in targets {
236 self.send_to_member(member, &frame_bytes);
237 }
238 }
239}
240
241impl ClusterSync {
242 pub fn on_peer_join(&self, node: Atom) {
244 for (group, pid) in self.local_memberships() {
245 self.backfill_member(node, group, pid);
246 }
247 }
248
249 pub fn on_peer_leave(&self, node: Atom) {
252 let name = self
253 .inner
254 .atoms
255 .resolve(node)
256 .map_or_else(|| format!("<atom {node:?}>"), str::to_owned);
257 tracing::info!(
258 peer = %name,
259 "peer departed; its remote subscriptions were purged by beamr"
260 );
261 }
262}
263
264fn write_raw_blocking(
269 connection: &Arc<beamr::distribution::connection::DistConnection>,
270 frame: &[u8],
271) {
272 let connection = Arc::clone(connection);
273 let frame = frame.to_vec();
274 let write = async move {
275 let _ = connection.write_raw(&frame).await;
276 };
277 if let Ok(handle) = tokio::runtime::Handle::try_current() {
278 if matches!(
279 handle.runtime_flavor(),
280 tokio::runtime::RuntimeFlavor::MultiThread
281 ) {
282 tokio::task::block_in_place(|| handle.block_on(write));
283 return;
284 }
285 }
286 match tokio::runtime::Builder::new_current_thread()
287 .enable_all()
288 .build()
289 {
290 Ok(runtime) => runtime.block_on(write),
291 Err(error) => tracing::warn!(error = %error, "failed to build cluster send runtime"),
292 }
293}
294
295#[cfg(test)]
296#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
297mod tests {
298 use super::ClusterSync;
299 use crate::cluster::discovery::ClusterResolver;
300 use beamr::atom::AtomTable;
301 use beamr::distribution::connection::ConnectionManager;
302 use beamr::distribution::pg::PgRegistry;
303 use beamr::distribution::resolver::StaticResolver;
304 use liminal::channel::ClusterObserver;
305 use std::collections::HashMap;
306 use std::sync::Arc;
307
308 fn sync_fixture() -> (ClusterSync, Arc<PgRegistry>, Arc<AtomTable>) {
309 let atoms = Arc::new(AtomTable::with_common_atoms());
310 let pg = Arc::new(PgRegistry::new(&atoms));
311 let connections = ConnectionManager::new(
312 Arc::clone(&atoms),
313 Arc::new(StaticResolver::new(HashMap::new())),
314 "test-cookie",
315 "local@127.0.0.1",
316 1,
317 );
318 let local_node = atoms.intern("local@127.0.0.1");
319 let resolver = Arc::new(ClusterResolver::new());
320 let sync = ClusterSync::new(
321 Arc::clone(&pg),
322 Arc::clone(&atoms),
323 connections,
324 local_node,
325 resolver,
326 );
327 (sync, pg, atoms)
328 }
329
330 #[test]
331 fn subscribe_joins_the_channel_pg_group() {
332 let (sync, pg, atoms) = sync_fixture();
333 sync.on_subscribe("orders", 42);
334 let group = atoms.intern("orders");
335 assert_eq!(pg.local_members(pg.default_scope(), group), vec![42]);
336 }
337
338 #[test]
339 fn unsubscribe_leaves_the_channel_pg_group() {
340 let (sync, pg, atoms) = sync_fixture();
341 sync.on_subscribe("orders", 42);
342 sync.on_unsubscribe("orders", 42);
343 let group = atoms.intern("orders");
344 assert!(pg.local_members(pg.default_scope(), group).is_empty());
345 }
346
347 #[test]
348 fn local_memberships_track_subscriptions_for_backfill() {
349 let (sync, _pg, _atoms) = sync_fixture();
350 sync.on_subscribe("orders", 1);
351 sync.on_subscribe("orders", 2);
352 sync.on_subscribe("events", 3);
353 let mut memberships = sync.local_memberships();
354 memberships.sort_by_key(|(group, pid)| (*group, *pid));
355 assert_eq!(memberships.len(), 3);
356 sync.on_unsubscribe("events", 3);
358 let remaining = sync.local_memberships();
359 assert_eq!(remaining.len(), 2);
360 assert!(remaining.iter().all(|(_, pid)| *pid == 1 || *pid == 2));
361 }
362
363 #[test]
364 fn remote_targets_empty_without_remote_members() {
365 let (sync, _pg, _atoms) = sync_fixture();
366 sync.on_subscribe("orders", 1);
367 assert!(sync.remote_targets("orders").is_empty());
368 }
369
370 #[test]
371 fn remote_targets_reflect_applied_remote_joins() {
372 let (sync, pg, atoms) = sync_fixture();
373 let group = atoms.intern("orders");
374 let remote_node = atoms.intern("node-b@127.0.0.1");
375 pg.apply_remote_join(pg.default_scope(), group, remote_node, 99, 0);
376 let targets = sync.remote_targets("orders");
377 assert_eq!(targets.len(), 1);
378 assert_eq!(targets[0].node, remote_node);
379 assert_eq!(targets[0].pid_number, 99);
380 }
381}