liminal_server/cluster/
discovery.rs1use std::collections::HashMap;
23use std::net::SocketAddr;
24use std::sync::{Arc, RwLock};
25
26use beamr::atom::AtomTable;
27use beamr::distribution::connection::ConnectionManager;
28use beamr::distribution::resolver::{NodeResolver, ResolveError, ResolveFuture, Resolver};
29
30#[derive(Debug, Default)]
35pub struct ClusterResolver {
36 nodes: RwLock<HashMap<String, SocketAddr>>,
37}
38
39impl ClusterResolver {
40 #[must_use]
42 pub fn new() -> Self {
43 Self {
44 nodes: RwLock::new(HashMap::new()),
45 }
46 }
47
48 pub fn register(&self, name: impl Into<String>, address: SocketAddr) {
50 self.nodes
51 .write()
52 .unwrap_or_else(std::sync::PoisonError::into_inner)
53 .insert(name.into(), address);
54 }
55
56 fn lookup(&self, name: &str) -> Option<SocketAddr> {
57 self.nodes
58 .read()
59 .unwrap_or_else(std::sync::PoisonError::into_inner)
60 .get(name)
61 .copied()
62 }
63}
64
65impl NodeResolver for ClusterResolver {
66 fn resolve<'a>(&'a self, name: &'a str) -> ResolveFuture<'a> {
67 let result = self.lookup(name).ok_or(ResolveError::NotFound);
68 Box::pin(async move { result })
69 }
70}
71
72#[must_use]
74fn seed_label(index: usize, address: SocketAddr) -> String {
75 format!("seed-{index}@{address}")
76}
77
78#[must_use]
85pub fn seed_resolver(seeds: &[SocketAddr]) -> (Arc<ClusterResolver>, Vec<String>) {
86 let resolver = Arc::new(ClusterResolver::new());
87 let labels = register_seed_labels(&resolver, seeds);
88 (resolver, labels)
89}
90
91pub fn register_seed_labels(resolver: &ClusterResolver, seeds: &[SocketAddr]) -> Vec<String> {
97 let mut labels = Vec::with_capacity(seeds.len());
98 for (index, address) in seeds.iter().enumerate() {
99 let label = seed_label(index, *address);
100 resolver.register(label.clone(), *address);
101 labels.push(label);
102 }
103 labels
104}
105
106#[must_use]
109pub fn as_resolver(resolver: Arc<ClusterResolver>) -> Resolver {
110 resolver
111}
112
113#[derive(Clone, Debug, Default, PartialEq, Eq)]
115pub struct SeedConnectOutcome {
116 pub attempted: usize,
118 pub connected: usize,
120}
121
122impl SeedConnectOutcome {
123 #[must_use]
126 pub const fn is_satisfied(&self) -> bool {
127 self.attempted == 0 || self.connected > 0
128 }
129}
130
131pub async fn connect_seeds(
138 connections: &ConnectionManager,
139 resolver: &Arc<ClusterResolver>,
140 atoms: &AtomTable,
141 labels: &[String],
142) -> SeedConnectOutcome {
143 let mut outcome = SeedConnectOutcome {
144 attempted: labels.len(),
145 connected: 0,
146 };
147 for label in labels {
148 match connections.connect(label).await {
149 Ok(connection) => {
150 let address = connection.peer_addr();
151 if let Some(name) = atoms.resolve(connection.node()).map(str::to_owned) {
152 resolver.register(name.clone(), address);
153 tracing::info!(
154 seed_label = %label,
155 peer = %name,
156 peer_addr = %address,
157 "connected to cluster seed node"
158 );
159 } else {
160 tracing::info!(
161 seed_label = %label,
162 peer_addr = %address,
163 "connected to cluster seed node"
164 );
165 }
166 outcome.connected += 1;
167 }
168 Err(error) => {
169 tracing::warn!(
170 seed_label = %label,
171 error = %error,
172 "cluster seed node unreachable at startup; continuing with reachable seeds"
173 );
174 }
175 }
176 }
177 outcome
178}
179
180#[cfg(test)]
181#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
182mod tests {
183 use super::{ClusterResolver, SeedConnectOutcome, as_resolver, seed_label, seed_resolver};
184 use beamr::distribution::resolver::{NodeResolver, ResolveError};
185 use std::net::SocketAddr;
186 use std::sync::Arc;
187 use std::task::{Context, Poll, Wake, Waker};
188
189 struct NoopWake;
190
191 impl Wake for NoopWake {
192 fn wake(self: Arc<Self>) {}
193 }
194
195 fn resolve_now(resolver: &ClusterResolver, name: &str) -> Result<SocketAddr, ResolveError> {
196 let waker = Waker::from(Arc::new(NoopWake));
197 let mut context = Context::from_waker(&waker);
198 let mut future = resolver.resolve(name);
199 match future.as_mut().poll(&mut context) {
200 Poll::Ready(result) => result,
201 Poll::Pending => panic!("cluster resolver future should be ready immediately"),
202 }
203 }
204
205 fn socket(address: &str) -> SocketAddr {
206 address.parse().expect("valid socket address")
207 }
208
209 #[test]
210 fn seed_resolver_maps_each_seed_to_a_synthetic_label() {
211 let seeds = vec![socket("127.0.0.1:9000"), socket("127.0.0.1:9001")];
212 let (resolver, labels) = seed_resolver(&seeds);
213
214 assert_eq!(labels.len(), 2);
215 assert_eq!(labels[0], seed_label(0, seeds[0]));
216 assert_eq!(labels[1], seed_label(1, seeds[1]));
217 assert_eq!(resolve_now(&resolver, &labels[0]), Ok(seeds[0]));
218 assert_eq!(resolve_now(&resolver, &labels[1]), Ok(seeds[1]));
219 }
220
221 #[test]
222 fn resolver_learns_real_peer_names() {
223 let resolver = ClusterResolver::new();
224 assert_eq!(
225 resolve_now(&resolver, "node-b@host"),
226 Err(ResolveError::NotFound)
227 );
228 resolver.register("node-b@host", socket("127.0.0.1:9100"));
229 assert_eq!(
230 resolve_now(&resolver, "node-b@host"),
231 Ok(socket("127.0.0.1:9100"))
232 );
233 }
234
235 #[test]
236 fn as_resolver_coerces_to_shared_handle() {
237 let (resolver, _labels) = seed_resolver(&[socket("127.0.0.1:9000")]);
238 let shared = as_resolver(Arc::clone(&resolver));
239 let waker = Waker::from(Arc::new(NoopWake));
241 let mut context = Context::from_waker(&waker);
242 let mut future = shared.resolve("seed-0@127.0.0.1:9000");
243 let outcome = match future.as_mut().poll(&mut context) {
244 Poll::Ready(result) => result,
245 Poll::Pending => panic!("future should be ready"),
246 };
247 assert_eq!(outcome, Ok(socket("127.0.0.1:9000")));
248 }
249
250 #[test]
251 fn outcome_is_satisfied_when_no_seeds_or_some_connected() {
252 assert!(
253 SeedConnectOutcome {
254 attempted: 0,
255 connected: 0
256 }
257 .is_satisfied()
258 );
259 assert!(
260 SeedConnectOutcome {
261 attempted: 3,
262 connected: 1
263 }
264 .is_satisfied()
265 );
266 assert!(
267 !SeedConnectOutcome {
268 attempted: 2,
269 connected: 0
270 }
271 .is_satisfied()
272 );
273 }
274}