use std::net::SocketAddr;
use std::path::Path;
use std::time::Duration;
use crate::conf::{
ConfDynSeed, ConfListen, ConfPool, ConfServer, Config, ConsistencyLevel, DataStore, HashType,
SecureServerOption, Servers, TokenList,
};
use crate::embed::error::EmbedError;
use crate::embed::hooks::{
CryptoProvider, Datastore, MemoryDatastore, MetricsSink, SeedsProvider, SimpleSeedsProvider,
};
use crate::embed::server::{Server, ServerHooks};
pub struct ServerBuilder {
pool_name: String,
pool: ConfPool,
hooks: ServerHooks,
command_extension: Option<std::sync::Arc<dyn crate::embed::CommandExtension>>,
}
impl std::fmt::Debug for ServerBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ServerBuilder")
.field("pool_name", &self.pool_name)
.field("pool", &self.pool)
.finish_non_exhaustive()
}
}
impl Default for ServerBuilder {
fn default() -> Self {
Self::new("dyn_o_mite")
}
}
impl ServerBuilder {
pub fn new(pool_name: impl Into<String>) -> Self {
Self {
pool_name: pool_name.into(),
pool: ConfPool::default(),
hooks: ServerHooks::default(),
command_extension: None,
}
}
pub fn from_config(cfg: &Config) -> Self {
let pool_name = cfg.pool_name().to_string();
Self {
pool_name,
pool: cfg.pool().clone(),
hooks: ServerHooks::default(),
command_extension: None,
}
}
pub fn from_yaml_file(path: impl AsRef<Path>) -> Result<Self, EmbedError> {
let cfg = Config::parse_file(path.as_ref())?;
Ok(Self::from_config(&cfg))
}
#[must_use]
pub fn pool_name(&self) -> &str {
&self.pool_name
}
#[must_use]
pub fn with_pool_name(mut self, name: impl Into<String>) -> Self {
self.pool_name = name.into();
self
}
#[must_use]
pub fn listen(mut self, addr: SocketAddr) -> Self {
self.pool.listen = Some(ConfListen::from_socket_addr(addr));
self
}
#[must_use]
pub fn dyn_listen(mut self, addr: SocketAddr) -> Self {
self.pool.dyn_listen = Some(ConfListen::from_socket_addr(addr));
self
}
#[must_use]
pub fn stats_listen(mut self, addr: SocketAddr) -> Self {
self.pool.stats_listen = Some(ConfListen::from_socket_addr(addr));
self
}
#[must_use]
pub fn hash(mut self, h: HashType) -> Self {
self.pool.hash = Some(h);
self
}
#[must_use]
pub fn data_store(mut self, d: DataStore) -> Self {
self.pool.data_store = Some(d.as_int());
self
}
#[must_use]
pub fn read_consistency(mut self, c: ConsistencyLevel) -> Self {
self.pool.read_consistency = Some(c.as_str().to_string());
self
}
#[must_use]
pub fn write_consistency(mut self, c: ConsistencyLevel) -> Self {
self.pool.write_consistency = Some(c.as_str().to_string());
self
}
#[must_use]
pub fn secure_server_option(mut self, opt: SecureServerOption) -> Self {
self.pool.secure_server_option = Some(opt.as_str().to_string());
self
}
#[must_use]
pub fn pem_key_file(mut self, path: impl AsRef<Path>) -> Self {
self.pool.pem_key_file = Some(path.as_ref().to_string_lossy().into_owned());
self
}
#[must_use]
pub fn servers(mut self, servers: Vec<ConfServer>) -> Self {
self.pool.servers = Some(Servers::from_vec(servers));
self
}
#[must_use]
pub fn dyn_seeds(mut self, seeds: Vec<ConfDynSeed>) -> Self {
self.pool.dyn_seeds = Some(seeds);
self
}
#[must_use]
pub fn dyn_seed_provider(mut self, name: impl Into<String>) -> Self {
self.pool.dyn_seed_provider = Some(name.into());
self
}
#[must_use]
pub fn timeout(mut self, d: Duration) -> Self {
let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
self.pool.timeout = Some(ms);
self
}
#[must_use]
pub fn auto_eject_hosts(mut self, on: bool) -> Self {
self.pool.auto_eject_hosts = Some(on);
self
}
#[must_use]
pub fn server_failure_limit(mut self, n: u32) -> Self {
self.pool.server_failure_limit = Some(i64::from(n));
self
}
#[must_use]
pub fn server_retry_timeout(mut self, d: Duration) -> Self {
let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
self.pool.server_retry_timeout = Some(ms);
self
}
#[must_use]
pub fn gossip_interval(mut self, d: Duration) -> Self {
let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
self.pool.gos_interval = Some(ms);
self
}
#[must_use]
pub fn enable_gossip(mut self, on: bool) -> Self {
self.pool.enable_gossip = Some(on);
self
}
#[must_use]
pub fn datacenter(mut self, dc: impl Into<String>) -> Self {
self.pool.datacenter = Some(dc.into());
self
}
#[must_use]
pub fn rack(mut self, rack: impl Into<String>) -> Self {
self.pool.rack = Some(rack.into());
self
}
#[must_use]
pub fn tokens_str(mut self, raw: impl AsRef<str>) -> Self {
let raw = raw.as_ref();
match TokenList::parse(raw) {
Ok(t) => {
self.pool.tokens = Some(t);
}
Err(e) => {
tracing::warn!(
raw = %raw,
error = %e,
"ServerBuilder::tokens_str: parse failed; leaving tokens unchanged. \
Use ServerBuilder::tokens(TokenList) for a hard error on bad input."
);
}
}
self
}
#[must_use]
pub fn tokens(mut self, tokens: TokenList) -> Self {
self.pool.tokens = Some(tokens);
self
}
#[must_use]
pub fn mbuf_size(mut self, n: usize) -> Self {
self.pool.mbuf_size = Some(i64::try_from(n).unwrap_or(i64::MAX));
self
}
#[must_use]
pub fn max_msgs(mut self, n: usize) -> Self {
self.pool.max_msgs = Some(i64::try_from(n).unwrap_or(i64::MAX));
self
}
#[must_use]
pub fn read_repairs_enabled(mut self, on: bool) -> Self {
self.pool.read_repairs_enabled = Some(on);
self
}
#[must_use]
pub fn client_connections(mut self, n: u32) -> Self {
self.pool.client_connections = Some(i64::from(n));
self
}
#[must_use]
pub fn datastore_connections(mut self, n: u8) -> Self {
self.pool.datastore_connections = Some(n);
self
}
#[must_use]
pub fn local_peer_connections(mut self, n: u8) -> Self {
self.pool.local_peer_connections = Some(n);
self
}
#[must_use]
pub fn remote_peer_connections(mut self, n: u8) -> Self {
self.pool.remote_peer_connections = Some(n);
self
}
#[must_use]
pub fn preconnect(mut self, on: bool) -> Self {
self.pool.preconnect = Some(on);
self
}
#[must_use]
pub fn stats_interval(mut self, d: Duration) -> Self {
let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
self.pool.stats_interval = Some(ms);
self
}
#[must_use]
pub fn datastore(mut self, ds: Box<dyn Datastore>) -> Self {
self.hooks.datastore = Some(ds);
self
}
#[must_use]
pub fn seeds_provider(mut self, sp: Box<dyn SeedsProvider>) -> Self {
self.hooks.seeds = Some(sp);
self
}
#[must_use]
pub fn crypto_provider(mut self, cp: Box<dyn CryptoProvider>) -> Self {
self.hooks.crypto = Some(cp);
self
}
#[must_use]
pub fn metrics_sink(mut self, ms: Box<dyn MetricsSink>) -> Self {
self.hooks.metrics = Some(ms);
self
}
#[must_use]
pub fn with_command_extension(
mut self,
ext: std::sync::Arc<dyn crate::embed::CommandExtension>,
) -> Self {
self.command_extension = Some(ext);
self
}
pub fn set_command_extension(
&mut self,
ext: std::sync::Arc<dyn crate::embed::CommandExtension>,
) -> &mut Self {
self.command_extension = Some(ext);
self
}
#[must_use]
pub fn command_extension(&self) -> Option<&std::sync::Arc<dyn crate::embed::CommandExtension>> {
self.command_extension.as_ref()
}
pub fn build(mut self) -> Result<Server, EmbedError> {
if self.pool.servers.is_none() {
self.pool.servers = Some(Servers::from_vec(vec![ConfServer::parse(
"127.0.0.1:1:1 stub",
)
.map_err(EmbedError::Conf)?]));
}
if self.pool.tokens.is_none() {
self.pool.tokens = Some(TokenList::parse("0").map_err(EmbedError::Conf)?);
}
let mut finalized = self.pool.clone();
finalized.apply_defaults();
finalized.validate(&self.pool_name)?;
let datastore = self
.hooks
.datastore
.unwrap_or_else(|| Box::new(MemoryDatastore::new()));
let seeds = self.hooks.seeds.unwrap_or_else(|| {
let raw = finalized
.dyn_seeds
.as_deref()
.map(<[_]>::to_vec)
.unwrap_or_default();
Box::new(SimpleSeedsProvider::new(raw))
});
let crypto = self.hooks.crypto;
let metrics = self.hooks.metrics;
let command_extension = self.command_extension;
Ok(Server::from_pool(
self.pool_name,
finalized,
ServerHooks {
datastore: Some(datastore),
seeds: Some(seeds),
crypto,
metrics,
},
command_extension,
))
}
}
impl Default for ServerHooks {
fn default() -> Self {
Self {
datastore: None,
seeds: None,
crypto: None,
metrics: None,
}
}
}