use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, OnceLock, Weak};
use parking_lot::Mutex;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tokio::time::{Duration, MissedTickBehavior};
use tokio_util::sync::CancellationToken;
use crate::cluster::dispatch::{ClusterDispatcher, DispatchPlan};
use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
use crate::cluster::pool::{PoolConfig, ServerPool};
use crate::conf::{ConfDynSeed, ConfPool, Config};
use crate::embed::error::EmbedError;
use crate::embed::events::{CloseReason, ConnRoleTag, EventBus, EventStream, ServerEvent};
use crate::embed::hooks::{
CryptoProvider, Datastore, LoggingMetricsSink, MetricsSink, SeedsProvider,
};
use crate::embed::snapshots::{DatacenterSnapshot, PeerSnapshot, RingSnapshot};
use crate::events::EventManager;
use crate::hashkit::DynToken;
use crate::msg::Msg;
use crate::stats::{
describe_stats, MetricSpec, PoolField, PoolStats, ServerField, ServerStats, ServiceInfo,
Snapshot, Stats,
};
#[non_exhaustive]
pub struct ServerHooks {
pub(crate) datastore: Option<Box<dyn Datastore>>,
pub(crate) seeds: Option<Box<dyn SeedsProvider>>,
pub(crate) crypto: Option<Box<dyn CryptoProvider>>,
pub(crate) metrics: Option<Box<dyn MetricsSink>>,
}
impl ServerHooks {
#[must_use]
pub fn datastore(&self) -> Option<&dyn Datastore> {
self.datastore.as_deref()
}
#[must_use]
pub fn seeds(&self) -> Option<&dyn SeedsProvider> {
self.seeds.as_deref()
}
#[must_use]
pub fn crypto(&self) -> Option<&dyn CryptoProvider> {
self.crypto.as_deref()
}
#[must_use]
pub fn metrics(&self) -> Option<&dyn MetricsSink> {
self.metrics.as_deref()
}
}
impl std::fmt::Debug for ServerHooks {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ServerHooks")
.field("datastore", &self.datastore.is_some())
.field("seeds", &self.seeds.is_some())
.field("crypto", &self.crypto.is_some())
.field("metrics", &self.metrics.is_some())
.finish()
}
}
fn registry() -> &'static Mutex<HashMap<SocketAddr, Weak<ServerInner>>> {
static R: OnceLock<Mutex<HashMap<SocketAddr, Weak<ServerInner>>>> = OnceLock::new();
R.get_or_init(|| Mutex::new(HashMap::new()))
}
fn registry_register(addr: SocketAddr, server: &Arc<ServerInner>) {
registry().lock().insert(addr, Arc::downgrade(server));
}
fn registry_remove(addr: SocketAddr) {
registry().lock().remove(&addr);
}
fn registry_lookup(addr: SocketAddr) -> Option<Arc<ServerInner>> {
registry().lock().get(&addr).and_then(Weak::upgrade)
}
fn next_generation() -> u64 {
static G: AtomicU64 = AtomicU64::new(0);
G.fetch_add(1, Ordering::Relaxed)
}
fn next_conn_id() -> u64 {
static C: AtomicU64 = AtomicU64::new(0);
C.fetch_add(1, Ordering::Relaxed)
}
pub(crate) struct ServerInner {
pool: Arc<ServerPool>,
dispatcher: ClusterDispatcher,
stats: Arc<Stats>,
snapshot_cache: Arc<Mutex<Snapshot>>,
bus: EventBus,
events: Arc<EventManager>,
datastore: Box<dyn Datastore>,
seeds: Box<dyn SeedsProvider>,
metrics: Box<dyn MetricsSink>,
crypto: Option<Box<dyn CryptoProvider>>,
listen_addr: Option<SocketAddr>,
dyn_listen_addr: Option<SocketAddr>,
cancel: CancellationToken,
pool_name: String,
config: Mutex<ConfPool>,
generation: AtomicU64,
command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
}
impl std::fmt::Debug for ServerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ServerInner")
.field("pool_name", &self.pool_name)
.field("listen", &self.listen_addr)
.field("dyn_listen", &self.dyn_listen_addr)
.finish_non_exhaustive()
}
}
impl ServerInner {
fn dispatch_local(&self, req: Msg) -> super::hooks::BoxFuture<'_, Result<Msg, EmbedError>> {
let fut = self.datastore.dispatch(req);
Box::pin(async move {
let rsp = fut.await.map_err(|e| EmbedError::Inject(e.to_string()))?;
Ok(rsp)
})
}
}
#[derive(Debug)]
pub struct Server {
pool_name: String,
pool: ConfPool,
cluster: Arc<ServerPool>,
hooks: ServerHooks,
stats: Arc<Stats>,
command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
}
impl Server {
pub(crate) fn from_pool(
pool_name: String,
pool: ConfPool,
hooks: ServerHooks,
command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
) -> Self {
let pool_cfg = PoolConfig::from_conf(&pool_name, &pool);
let local_peer = build_local_peer(&pool, &pool_cfg);
let mut peers = vec![local_peer];
if let Some(seeds) = pool.dyn_seeds.as_ref() {
let start = u32::try_from(peers.len()).unwrap_or(0);
peers.extend(peers_from_seeds(&pool_cfg, seeds, start));
}
let server_pool_arc = Arc::new(ServerPool::new(pool_cfg.clone(), peers));
server_pool_arc.preselect_remote_racks();
let stats = Arc::new(Stats::new(
ServiceInfo {
source: pool_cfg.name.clone(),
version: env!("CARGO_PKG_VERSION").to_string(),
rack: pool_cfg.rack.clone(),
dc: pool_cfg.dc.clone(),
},
PoolStats::new(&pool_cfg.name),
ServerStats::new("backend"),
));
Self {
pool_name,
pool,
cluster: server_pool_arc,
hooks,
stats,
command_extension,
}
}
#[must_use]
pub fn command_extension(&self) -> Option<&Arc<dyn crate::embed::CommandExtension>> {
self.command_extension.as_ref()
}
#[must_use]
pub fn pool_name(&self) -> &str {
&self.pool_name
}
pub async fn start(self) -> Result<ServerHandle, EmbedError> {
let Server {
pool_name,
pool,
cluster,
mut hooks,
stats,
command_extension,
} = self;
let mut dispatcher = ClusterDispatcher::new(cluster.clone());
if let Some(ext) = command_extension.as_ref() {
dispatcher = dispatcher.with_command_extension(ext.clone());
}
let bus = EventBus::new(64);
let events = Arc::new(EventManager::new(64));
let cancel = CancellationToken::new();
let snapshot_cache = Arc::new(Mutex::new(Snapshot::default()));
let datastore = hooks
.datastore
.take()
.expect("invariant: builder always populates datastore");
let seeds = hooks
.seeds
.take()
.expect("invariant: builder always populates seeds");
let metrics = hooks
.metrics
.take()
.unwrap_or_else(|| Box::new(LoggingMetricsSink::new(pool_name.clone())));
let (listen_listener, listen_addr) = bind_listener(pool.listen.as_ref()).await?;
let (dyn_listener, dyn_listen_addr) = bind_listener(pool.dyn_listen.as_ref()).await?;
let inner = Arc::new(ServerInner {
pool: cluster.clone(),
dispatcher,
stats: stats.clone(),
snapshot_cache: snapshot_cache.clone(),
bus: bus.clone(),
events: events.clone(),
datastore,
seeds,
metrics,
crypto: hooks.crypto,
listen_addr,
dyn_listen_addr,
cancel: cancel.clone(),
pool_name: pool_name.clone(),
config: Mutex::new(pool.clone()),
generation: AtomicU64::new(0),
command_extension,
});
if let Some(addr) = inner.dyn_listen_addr {
registry_register(addr, &inner);
}
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
tasks.push(tokio::spawn(stats_loop(inner.clone(), pool.stats_interval)));
tasks.push(tokio::spawn(metrics_loop(inner.clone())));
if pool.enable_gossip.unwrap_or(false) {
tasks.push(tokio::spawn(gossip_loop(
inner.clone(),
Duration::from_millis(
u64::try_from(pool.gos_interval.unwrap_or(1_000)).unwrap_or(1_000),
),
)));
}
if let (Some(listener), Some(addr)) = (listen_listener, listen_addr) {
tasks.push(tokio::spawn(accept_loop(
inner.clone(),
listener,
addr,
ConnRoleTag::Proxy,
)));
}
if let (Some(listener), Some(addr)) = (dyn_listener, dyn_listen_addr) {
tasks.push(tokio::spawn(accept_loop(
inner.clone(),
listener,
addr,
ConnRoleTag::DnodeProxy,
)));
}
Ok(ServerHandle {
inner,
tasks: Arc::new(Mutex::new(tasks)),
})
}
}
#[derive(Clone)]
pub struct ServerHandle {
inner: Arc<ServerInner>,
tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
}
impl std::fmt::Debug for ServerHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ServerHandle")
.field("pool", &self.inner.pool_name)
.field("listen", &self.inner.listen_addr)
.field("dyn_listen", &self.inner.dyn_listen_addr)
.finish_non_exhaustive()
}
}
impl ServerHandle {
#[must_use]
pub fn crypto_provider(&self) -> Option<&dyn CryptoProvider> {
self.inner.crypto.as_deref()
}
#[must_use]
pub fn command_extension(&self) -> Option<Arc<dyn crate::embed::CommandExtension>> {
self.inner.command_extension.as_ref().map(Arc::clone)
}
#[must_use]
pub fn listen_addr(&self) -> Option<SocketAddr> {
self.inner.listen_addr
}
#[must_use]
pub fn dyn_listen_addr(&self) -> Option<SocketAddr> {
self.inner.dyn_listen_addr
}
#[must_use]
pub fn subscribe_events(&self) -> EventStream {
self.inner.bus.subscribe()
}
#[must_use]
pub fn events(&self) -> Arc<EventManager> {
Arc::clone(&self.inner.events)
}
#[must_use]
pub fn stats(&self) -> Snapshot {
self.inner.stats.snapshot()
}
#[must_use]
pub fn stats_handle(&self) -> Arc<Stats> {
Arc::clone(&self.inner.stats)
}
#[must_use]
pub fn describe_stats(&self) -> Vec<MetricSpec> {
let _ = describe_stats(); let mut out: Vec<MetricSpec> = Vec::new();
out.extend(crate::stats::POOL_CODEC.iter().copied());
out.extend(crate::stats::SERVER_CODEC.iter().copied());
out
}
#[must_use]
pub fn peers(&self) -> Vec<PeerSnapshot> {
self.inner
.pool
.peers()
.read()
.iter()
.map(PeerSnapshot::from)
.collect()
}
#[must_use]
pub fn datacenters(&self) -> Vec<DatacenterSnapshot> {
self.inner
.pool
.datacenters()
.read()
.iter()
.map(DatacenterSnapshot::from)
.collect()
}
#[must_use]
pub fn ring(&self) -> RingSnapshot {
let dcs = self.inner.pool.datacenters().read();
let mut entries: Vec<(DynToken, u32)> = Vec::new();
for dc in dcs.iter() {
for rack in dc.racks() {
for c in rack.continuums() {
entries.push((c.token.clone(), c.peer_idx));
}
}
}
RingSnapshot {
entries,
generation: self.inner.generation.load(Ordering::Relaxed),
}
}
pub async fn inject_request(&self, req: Msg) -> Result<Msg, EmbedError> {
let key: Vec<u8> = req
.keys()
.first()
.map(|kp| kp.tag_bytes().to_vec())
.unwrap_or_default();
let plan = self.inner.dispatcher.plan(&req, &key);
match plan {
DispatchPlan::Drop => {
let mut rsp = Msg::new(req.id(), crate::msg::MsgType::Unknown, false);
rsp.set_parent_id(req.id());
Ok(rsp)
}
DispatchPlan::NoTargets => Err(EmbedError::Inject(
"cluster has no quorum-eligible targets".into(),
)),
DispatchPlan::LocalDatastore => self.inner.dispatch_local(req).await,
DispatchPlan::Replicas { targets, .. } => {
self.inner.stats.pool_incr_by(PoolField::ForwardError, 0); self.inner.stats.server_incr(ServerField::ReadRequests);
let resolved: Vec<(bool, Option<SocketAddr>)> = {
let peers = self.inner.pool.peers().read();
targets
.iter()
.map(|t| {
let peer = peers.get(t.peer_idx as usize);
let is_local = peer.is_some_and(crate::cluster::peer::Peer::is_local);
let addr = peer.and_then(|p| p.endpoint().pname().parse().ok());
(is_local, addr)
})
.collect()
};
let mut last_err: Option<EmbedError> = None;
for (is_local, addr) in resolved {
if is_local {
return self.inner.dispatch_local(req).await;
}
if let Some(addr) = addr {
if let Some(remote) = registry_lookup(addr) {
let mut forwarded = Msg::new(req.id(), req.ty(), true);
forwarded.set_parent_id(req.parent_id());
match remote.datastore.dispatch(forwarded).await {
Ok(rsp) => return Ok(rsp),
Err(e) => last_err = Some(EmbedError::Inject(e.to_string())),
}
}
}
}
if let Some(e) = last_err {
return Err(e);
}
self.inner.dispatch_local(req).await
}
}
}
pub async fn reload(&self, mut cfg: Config) -> Result<(), EmbedError> {
cfg.finalize();
cfg.validate()?;
let new_pool = cfg.pool().clone();
*self.inner.config.lock() = new_pool;
let gen_id = next_generation();
self.inner.generation.store(gen_id, Ordering::Relaxed);
self.inner
.bus
.send(ServerEvent::ConfigReloaded { generation: gen_id });
tokio::task::yield_now().await;
Ok(())
}
pub async fn shutdown(&self) -> Result<(), EmbedError> {
self.inner.cancel.cancel();
if let Some(addr) = self.inner.dyn_listen_addr {
registry_remove(addr);
}
let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
for t in drained {
let _ = t.await;
}
Ok(())
}
pub async fn join(&self) {
let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
for t in drained {
let _ = t.await;
}
}
}
fn build_local_peer(pool: &ConfPool, cfg: &PoolConfig) -> Peer {
let dyn_listen = pool.dyn_listen.as_ref().map_or_else(
|| ("127.0.0.1".to_string(), 0u16),
|l| (l.name().to_string(), l.port()),
);
let tokens = pool
.tokens
.as_ref()
.map(|tl| {
tl.components()
.iter()
.map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let mut peer = Peer::new(
0,
PeerEndpoint::tcp(dyn_listen.0, dyn_listen.1),
cfg.rack.clone(),
cfg.dc.clone(),
if tokens.is_empty() {
vec![DynToken::from_u32(0)]
} else {
tokens
},
true,
true,
false,
);
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
peer.set_state(PeerState::Normal, now_secs);
peer
}
fn peers_from_seeds(cfg: &PoolConfig, seeds: &[ConfDynSeed], start_idx: u32) -> Vec<Peer> {
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
seeds
.iter()
.enumerate()
.map(|(i, s)| {
let tokens: Vec<DynToken> = s
.tokens()
.components()
.iter()
.map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
.collect();
let idx_off = u32::try_from(i).unwrap_or(0);
let mut p = Peer::new(
start_idx + idx_off,
PeerEndpoint::tcp(s.host().to_string(), s.port()),
s.rack().to_string(),
s.dc().to_string(),
if tokens.is_empty() {
vec![DynToken::from_u32(0)]
} else {
tokens
},
false,
s.dc() == cfg.dc,
false,
);
p.set_state(PeerState::Normal, now_secs);
p
})
.collect()
}
async fn bind_listener(
listen: Option<&crate::conf::ConfListen>,
) -> Result<(Option<TcpListener>, Option<SocketAddr>), EmbedError> {
let Some(l) = listen else {
return Ok((None, None));
};
let host = l.name();
let port = l.port();
if host.is_empty() {
return Ok((None, None));
}
let addr_str = format!("{host}:{port}");
let Ok(_addr) = addr_str.parse::<SocketAddr>() else {
return Ok((None, None));
};
let listener = TcpListener::bind(&addr_str).await?;
let local = listener.local_addr()?;
Ok((Some(listener), Some(local)))
}
async fn accept_loop(
inner: Arc<ServerInner>,
listener: TcpListener,
addr: SocketAddr,
role: ConnRoleTag,
) {
loop {
tokio::select! {
biased;
() = inner.cancel.cancelled() => return,
res = listener.accept() => {
let Ok((sock, peer)) = res else { return };
let conn_id = next_conn_id();
inner.bus.send(ServerEvent::ConnectionAccepted {
conn_id,
role,
local_addr: Some(addr),
});
tracing::warn!(
listen = %addr,
peer = %peer,
role = ?role,
conn_id,
"embedded listen_addr accepted a connection; embedded mode does not yet \
forward to the dispatcher; use ServerHandle::inject_request instead. \
Closing connection."
);
let bus = inner.bus.clone();
let cancel = inner.cancel.clone();
tokio::spawn(async move {
let _ = sock; let close_reason = if cancel.is_cancelled() {
CloseReason::LocalClose
} else {
CloseReason::PeerEof
};
bus.send(ServerEvent::ConnectionClosed { conn_id, reason: close_reason });
});
}
}
}
}
async fn stats_loop(inner: Arc<ServerInner>, interval_ms: Option<i64>) {
let interval =
Duration::from_millis(u64::try_from(interval_ms.unwrap_or(1_000)).unwrap_or(1_000));
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
tokio::select! {
biased;
() = inner.cancel.cancelled() => return,
_ = ticker.tick() => {
let snap = inner.stats.snapshot();
*inner.snapshot_cache.lock() = snap;
}
}
}
}
async fn metrics_loop(inner: Arc<ServerInner>) {
let interval = inner.metrics.flush_interval();
let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(50)));
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
tokio::select! {
biased;
() = inner.cancel.cancelled() => return,
_ = ticker.tick() => {
let snap = inner.snapshot_cache.lock().clone();
let _ = inner.metrics.emit(&snap).await;
}
}
}
}
async fn gossip_loop(inner: Arc<ServerInner>, interval: Duration) {
let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(20)));
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut round: u64 = 0;
let mut known: std::collections::HashSet<(String, u16)> = std::collections::HashSet::new();
{
let peers = inner.pool.peers().read();
for p in peers.iter() {
known.insert((p.endpoint().host().to_string(), p.endpoint().port()));
}
}
loop {
tokio::select! {
biased;
() = inner.cancel.cancelled() => return,
_ = ticker.tick() => {
let round_started = std::time::Instant::now();
let round_ts = std::time::SystemTime::now();
round += 1;
let seeds = inner.seeds.fetch().unwrap_or_default();
let mut added: u32 = 0;
{
let mut peers = inner.pool.peers().write();
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let cfg = inner.pool.config().clone();
for seed in &seeds {
let key = (seed.host().to_string(), seed.port());
if known.contains(&key) {
continue;
}
let next_idx = u32::try_from(peers.len()).unwrap_or(u32::MAX);
let tokens: Vec<DynToken> = seed
.tokens()
.components()
.iter()
.map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
.collect();
let mut p = Peer::new(
next_idx,
PeerEndpoint::tcp(seed.host().to_string(), seed.port()),
seed.rack().to_string(),
seed.dc().to_string(),
if tokens.is_empty() {
vec![DynToken::from_u32(0)]
} else {
tokens
},
false,
seed.dc() == cfg.dc,
false,
);
p.set_state(PeerState::Normal, now_secs);
peers.push(p);
known.insert(key);
added += 1;
}
}
if added > 0 {
rebuild_topology(&inner.pool);
let peers_now = inner.pool.peers().read();
for (idx, p) in peers_now.iter().enumerate().rev().take(added as usize) {
let _ = idx;
inner.bus.send(ServerEvent::PeerUp(p.idx()));
inner.events.publish(crate::events::ClusterEvent::PeerUp {
peer_id: p.idx(),
dc: p.dc().to_string(),
ts: round_ts,
});
}
inner.events.publish(crate::events::ClusterEvent::RingChanged {
tag: "seed-discovery".to_string(),
ts: round_ts,
});
}
let count = u32::try_from(inner.pool.peers().read().len()).unwrap_or(u32::MAX);
inner.bus.send(ServerEvent::GossipRound { round, peers: count });
inner.events.publish(crate::events::ClusterEvent::GossipRoundComplete {
duration: round_started.elapsed(),
peers_seen: count as usize,
ts: round_ts,
});
inner.stats.pool_incr(PoolField::StatsCount);
}
}
}
}
fn rebuild_topology(pool: &Arc<ServerPool>) {
use crate::cluster::Datacenter;
let peers = pool.peers().read();
let mut new_dcs: Vec<Datacenter> = Vec::new();
for p in peers.iter() {
let idx = if let Some(i) = new_dcs.iter().position(|d| d.name() == p.dc()) {
i
} else {
new_dcs.push(Datacenter::new(p.dc().to_string()));
new_dcs.len() - 1
};
new_dcs[idx].upsert_rack(p.rack().to_string());
}
drop(peers);
{
let mut dcs = pool.datacenters().write();
*dcs = new_dcs;
}
pool.rebuild_ring();
pool.preselect_remote_racks();
}