use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use beamr::atom::AtomTable;
use beamr::distribution::connection::ConnectionManager;
use beamr::distribution::resolver::{NodeResolver, ResolveError, ResolveFuture, Resolver};
#[derive(Debug, Default)]
pub struct ClusterResolver {
nodes: RwLock<HashMap<String, SocketAddr>>,
}
impl ClusterResolver {
#[must_use]
pub fn new() -> Self {
Self {
nodes: RwLock::new(HashMap::new()),
}
}
pub fn register(&self, name: impl Into<String>, address: SocketAddr) {
self.nodes
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(name.into(), address);
}
fn lookup(&self, name: &str) -> Option<SocketAddr> {
self.nodes
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.get(name)
.copied()
}
}
impl NodeResolver for ClusterResolver {
fn resolve<'a>(&'a self, name: &'a str) -> ResolveFuture<'a> {
let result = self.lookup(name).ok_or(ResolveError::NotFound);
Box::pin(async move { result })
}
}
#[must_use]
fn seed_label(index: usize, address: SocketAddr) -> String {
format!("seed-{index}@{address}")
}
#[must_use]
pub fn seed_resolver(seeds: &[SocketAddr]) -> (Arc<ClusterResolver>, Vec<String>) {
let resolver = Arc::new(ClusterResolver::new());
let labels = register_seed_labels(&resolver, seeds);
(resolver, labels)
}
pub fn register_seed_labels(resolver: &ClusterResolver, seeds: &[SocketAddr]) -> Vec<String> {
let mut labels = Vec::with_capacity(seeds.len());
for (index, address) in seeds.iter().enumerate() {
let label = seed_label(index, *address);
resolver.register(label.clone(), *address);
labels.push(label);
}
labels
}
#[must_use]
pub fn as_resolver(resolver: Arc<ClusterResolver>) -> Resolver {
resolver
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct SeedConnectOutcome {
pub attempted: usize,
pub connected: usize,
}
impl SeedConnectOutcome {
#[must_use]
pub const fn is_satisfied(&self) -> bool {
self.attempted == 0 || self.connected > 0
}
}
pub async fn connect_seeds(
connections: &ConnectionManager,
resolver: &Arc<ClusterResolver>,
atoms: &AtomTable,
labels: &[String],
) -> SeedConnectOutcome {
let mut outcome = SeedConnectOutcome {
attempted: labels.len(),
connected: 0,
};
for label in labels {
match connections.connect(label).await {
Ok(connection) => {
let address = connection.peer_addr();
if let Some(name) = atoms.resolve(connection.node()).map(str::to_owned) {
resolver.register(name.clone(), address);
tracing::info!(
seed_label = %label,
peer = %name,
peer_addr = %address,
"connected to cluster seed node"
);
} else {
tracing::info!(
seed_label = %label,
peer_addr = %address,
"connected to cluster seed node"
);
}
outcome.connected += 1;
}
Err(error) => {
tracing::warn!(
seed_label = %label,
error = %error,
"cluster seed node unreachable at startup; continuing with reachable seeds"
);
}
}
}
outcome
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::{ClusterResolver, SeedConnectOutcome, as_resolver, seed_label, seed_resolver};
use beamr::distribution::resolver::{NodeResolver, ResolveError};
use std::net::SocketAddr;
use std::sync::Arc;
use std::task::{Context, Poll, Wake, Waker};
struct NoopWake;
impl Wake for NoopWake {
fn wake(self: Arc<Self>) {}
}
fn resolve_now(resolver: &ClusterResolver, name: &str) -> Result<SocketAddr, ResolveError> {
let waker = Waker::from(Arc::new(NoopWake));
let mut context = Context::from_waker(&waker);
let mut future = resolver.resolve(name);
match future.as_mut().poll(&mut context) {
Poll::Ready(result) => result,
Poll::Pending => panic!("cluster resolver future should be ready immediately"),
}
}
fn socket(address: &str) -> SocketAddr {
address.parse().expect("valid socket address")
}
#[test]
fn seed_resolver_maps_each_seed_to_a_synthetic_label() {
let seeds = vec![socket("127.0.0.1:9000"), socket("127.0.0.1:9001")];
let (resolver, labels) = seed_resolver(&seeds);
assert_eq!(labels.len(), 2);
assert_eq!(labels[0], seed_label(0, seeds[0]));
assert_eq!(labels[1], seed_label(1, seeds[1]));
assert_eq!(resolve_now(&resolver, &labels[0]), Ok(seeds[0]));
assert_eq!(resolve_now(&resolver, &labels[1]), Ok(seeds[1]));
}
#[test]
fn resolver_learns_real_peer_names() {
let resolver = ClusterResolver::new();
assert_eq!(
resolve_now(&resolver, "node-b@host"),
Err(ResolveError::NotFound)
);
resolver.register("node-b@host", socket("127.0.0.1:9100"));
assert_eq!(
resolve_now(&resolver, "node-b@host"),
Ok(socket("127.0.0.1:9100"))
);
}
#[test]
fn as_resolver_coerces_to_shared_handle() {
let (resolver, _labels) = seed_resolver(&[socket("127.0.0.1:9000")]);
let shared = as_resolver(Arc::clone(&resolver));
let waker = Waker::from(Arc::new(NoopWake));
let mut context = Context::from_waker(&waker);
let mut future = shared.resolve("seed-0@127.0.0.1:9000");
let outcome = match future.as_mut().poll(&mut context) {
Poll::Ready(result) => result,
Poll::Pending => panic!("future should be ready"),
};
assert_eq!(outcome, Ok(socket("127.0.0.1:9000")));
}
#[test]
fn outcome_is_satisfied_when_no_seeds_or_some_connected() {
assert!(
SeedConnectOutcome {
attempted: 0,
connected: 0
}
.is_satisfied()
);
assert!(
SeedConnectOutcome {
attempted: 3,
connected: 1
}
.is_satisfied()
);
assert!(
!SeedConnectOutcome {
attempted: 2,
connected: 0
}
.is_satisfied()
);
}
}