Skip to main content

liminal_server/cluster/
discovery.rs

1//! SRV-005 R1: seed-node discovery over beamr distribution.
2//!
3//! Beamr does not run EPMD; distribution callers supply a [`NodeResolver`] that
4//! maps a node *name* to a socket address. A liminal operator configures seed
5//! *addresses*, not names — the peer's real distribution name is only learned
6//! from the authenticated handshake. We bridge the two with [`ClusterResolver`]:
7//!
8//! * Each configured seed gets a synthetic dial label (`seed-{i}@{addr}`) mapped
9//!   to its address. Discovery dials those labels.
10//! * [`ConnectionManager::connect`] re-keys the connection table by the name the
11//!   peer advertises in the handshake — the synthetic label is throwaway and
12//!   never becomes a table key, so it cannot collide with a real node name.
13//! * After a successful dial we learn the peer's real name and register
14//!   `real_name -> addr` in the resolver, so beamr can re-dial that peer by name
15//!   later (e.g. an outbound send after a transient drop) without us re-deriving
16//!   the address.
17//!
18//! The SAME resolver instance is handed to the channel-supervisor scheduler's
19//! [`DistributionConfig`] and used here to dial seeds, so every distribution
20//! component resolves names consistently.
21
22use std::collections::HashMap;
23use std::net::SocketAddr;
24use std::sync::{Arc, RwLock};
25
26use beamr::atom::AtomTable;
27use beamr::distribution::connection::ConnectionManager;
28use beamr::distribution::resolver::{NodeResolver, ResolveError, ResolveFuture, Resolver};
29
30/// A distribution node resolver that learns `name -> address` mappings at runtime.
31///
32/// Pre-seeded with synthetic dial labels for each configured seed and extended
33/// with each peer's real handshake name as connections are established.
34#[derive(Debug, Default)]
35pub struct ClusterResolver {
36    nodes: RwLock<HashMap<String, SocketAddr>>,
37}
38
39impl ClusterResolver {
40    /// Creates an empty resolver.
41    #[must_use]
42    pub fn new() -> Self {
43        Self {
44            nodes: RwLock::new(HashMap::new()),
45        }
46    }
47
48    /// Registers (or replaces) a `name -> address` mapping.
49    pub fn register(&self, name: impl Into<String>, address: SocketAddr) {
50        self.nodes
51            .write()
52            .unwrap_or_else(std::sync::PoisonError::into_inner)
53            .insert(name.into(), address);
54    }
55
56    fn lookup(&self, name: &str) -> Option<SocketAddr> {
57        self.nodes
58            .read()
59            .unwrap_or_else(std::sync::PoisonError::into_inner)
60            .get(name)
61            .copied()
62    }
63}
64
65impl NodeResolver for ClusterResolver {
66    fn resolve<'a>(&'a self, name: &'a str) -> ResolveFuture<'a> {
67        let result = self.lookup(name).ok_or(ResolveError::NotFound);
68        Box::pin(async move { result })
69    }
70}
71
72/// The synthetic dial label for the seed at position `index`.
73#[must_use]
74fn seed_label(index: usize, address: SocketAddr) -> String {
75    format!("seed-{index}@{address}")
76}
77
78/// Builds a [`ClusterResolver`] pre-seeded with a synthetic dial label per seed
79/// address and returns it alongside those labels (in seed order).
80///
81/// The resolver is returned as a concrete `Arc<ClusterResolver>` so the caller
82/// can both hand it to the scheduler as a [`Resolver`] and keep registering
83/// learned peer names on it; [`as_resolver`] performs the trait-object coercion.
84#[must_use]
85pub fn seed_resolver(seeds: &[SocketAddr]) -> (Arc<ClusterResolver>, Vec<String>) {
86    let resolver = Arc::new(ClusterResolver::new());
87    let labels = register_seed_labels(&resolver, seeds);
88    (resolver, labels)
89}
90
91/// Registers a synthetic dial label per seed onto an EXISTING resolver.
92///
93/// Returns those labels in seed order. Used when the resolver was already built
94/// and shared with the scheduler, so seed dialing resolves on the very same
95/// instance the scheduler uses for every other lookup.
96pub fn register_seed_labels(resolver: &ClusterResolver, seeds: &[SocketAddr]) -> Vec<String> {
97    let mut labels = Vec::with_capacity(seeds.len());
98    for (index, address) in seeds.iter().enumerate() {
99        let label = seed_label(index, *address);
100        resolver.register(label.clone(), *address);
101        labels.push(label);
102    }
103    labels
104}
105
106/// Coerces a concrete cluster resolver into the shared distribution [`Resolver`]
107/// handle the scheduler's `DistributionConfig` expects.
108#[must_use]
109pub fn as_resolver(resolver: Arc<ClusterResolver>) -> Resolver {
110    resolver
111}
112
113/// Outcome of attempting to connect to the configured seeds (R1).
114#[derive(Clone, Debug, Default, PartialEq, Eq)]
115pub struct SeedConnectOutcome {
116    /// Number of seeds dialed.
117    pub attempted: usize,
118    /// Number of seeds that completed the distribution handshake.
119    pub connected: usize,
120}
121
122impl SeedConnectOutcome {
123    /// True when at least one seed was reachable, or there were no seeds to dial
124    /// (a lone bootstrap node forms a cluster of one and is not a join failure).
125    #[must_use]
126    pub const fn is_satisfied(&self) -> bool {
127        self.attempted == 0 || self.connected > 0
128    }
129}
130
131/// Dials every seed `label` through `connections`, learning each reachable
132/// peer's real name into `resolver` (R1).
133///
134/// An unreachable seed is logged at warn level and skipped — discovery is
135/// non-fatal per-seed. The caller decides whether the aggregate outcome
136/// (no seed reachable when seeds were configured) is fatal.
137pub async fn connect_seeds(
138    connections: &ConnectionManager,
139    resolver: &Arc<ClusterResolver>,
140    atoms: &AtomTable,
141    labels: &[String],
142) -> SeedConnectOutcome {
143    let mut outcome = SeedConnectOutcome {
144        attempted: labels.len(),
145        connected: 0,
146    };
147    for label in labels {
148        match connections.connect(label).await {
149            Ok(connection) => {
150                let address = connection.peer_addr();
151                if let Some(name) = atoms.resolve(connection.node()).map(str::to_owned) {
152                    resolver.register(name.clone(), address);
153                    tracing::info!(
154                        seed_label = %label,
155                        peer = %name,
156                        peer_addr = %address,
157                        "connected to cluster seed node"
158                    );
159                } else {
160                    tracing::info!(
161                        seed_label = %label,
162                        peer_addr = %address,
163                        "connected to cluster seed node"
164                    );
165                }
166                outcome.connected += 1;
167            }
168            Err(error) => {
169                tracing::warn!(
170                    seed_label = %label,
171                    error = %error,
172                    "cluster seed node unreachable at startup; continuing with reachable seeds"
173                );
174            }
175        }
176    }
177    outcome
178}
179
180#[cfg(test)]
181#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
182mod tests {
183    use super::{ClusterResolver, SeedConnectOutcome, as_resolver, seed_label, seed_resolver};
184    use beamr::distribution::resolver::{NodeResolver, ResolveError};
185    use std::net::SocketAddr;
186    use std::sync::Arc;
187    use std::task::{Context, Poll, Wake, Waker};
188
189    struct NoopWake;
190
191    impl Wake for NoopWake {
192        fn wake(self: Arc<Self>) {}
193    }
194
195    fn resolve_now(resolver: &ClusterResolver, name: &str) -> Result<SocketAddr, ResolveError> {
196        let waker = Waker::from(Arc::new(NoopWake));
197        let mut context = Context::from_waker(&waker);
198        let mut future = resolver.resolve(name);
199        match future.as_mut().poll(&mut context) {
200            Poll::Ready(result) => result,
201            Poll::Pending => panic!("cluster resolver future should be ready immediately"),
202        }
203    }
204
205    fn socket(address: &str) -> SocketAddr {
206        address.parse().expect("valid socket address")
207    }
208
209    #[test]
210    fn seed_resolver_maps_each_seed_to_a_synthetic_label() {
211        let seeds = vec![socket("127.0.0.1:9000"), socket("127.0.0.1:9001")];
212        let (resolver, labels) = seed_resolver(&seeds);
213
214        assert_eq!(labels.len(), 2);
215        assert_eq!(labels[0], seed_label(0, seeds[0]));
216        assert_eq!(labels[1], seed_label(1, seeds[1]));
217        assert_eq!(resolve_now(&resolver, &labels[0]), Ok(seeds[0]));
218        assert_eq!(resolve_now(&resolver, &labels[1]), Ok(seeds[1]));
219    }
220
221    #[test]
222    fn resolver_learns_real_peer_names() {
223        let resolver = ClusterResolver::new();
224        assert_eq!(
225            resolve_now(&resolver, "node-b@host"),
226            Err(ResolveError::NotFound)
227        );
228        resolver.register("node-b@host", socket("127.0.0.1:9100"));
229        assert_eq!(
230            resolve_now(&resolver, "node-b@host"),
231            Ok(socket("127.0.0.1:9100"))
232        );
233    }
234
235    #[test]
236    fn as_resolver_coerces_to_shared_handle() {
237        let (resolver, _labels) = seed_resolver(&[socket("127.0.0.1:9000")]);
238        let shared = as_resolver(Arc::clone(&resolver));
239        // The coerced handle resolves the same mappings.
240        let waker = Waker::from(Arc::new(NoopWake));
241        let mut context = Context::from_waker(&waker);
242        let mut future = shared.resolve("seed-0@127.0.0.1:9000");
243        let outcome = match future.as_mut().poll(&mut context) {
244            Poll::Ready(result) => result,
245            Poll::Pending => panic!("future should be ready"),
246        };
247        assert_eq!(outcome, Ok(socket("127.0.0.1:9000")));
248    }
249
250    #[test]
251    fn outcome_is_satisfied_when_no_seeds_or_some_connected() {
252        assert!(
253            SeedConnectOutcome {
254                attempted: 0,
255                connected: 0
256            }
257            .is_satisfied()
258        );
259        assert!(
260            SeedConnectOutcome {
261                attempted: 3,
262                connected: 1
263            }
264            .is_satisfied()
265        );
266        assert!(
267            !SeedConnectOutcome {
268                attempted: 2,
269                connected: 0
270            }
271            .is_satisfied()
272        );
273    }
274}