use super::cluster::{
cut::Subscription,
partition::{self, Rejoin},
Cluster, Config,
};
use failure::Fallible;
use futures::{
future::{pending, FutureExt, TryFutureExt},
stream::{FuturesUnordered, StreamExt},
};
use std::{error::Error, future::Future, net::SocketAddr, sync::Arc, time::Duration};
use tokio::select;
use tonic::{
body::BoxBody,
codegen::{
http::{HeaderMap, Request as HttpRequest, Response as HttpResponse},
Service,
},
transport::{
server::{Router, Unimplemented},
Body, ClientTlsConfig, NamedService, Server, ServerTlsConfig,
},
};
use tracing::Span;
#[derive(Copy, Clone, Debug)]
pub struct CutDetectorConfig {
pub unstable_threshold: usize,
pub stable_threshold: usize,
pub subjects_per_observer: usize,
}
impl Default for CutDetectorConfig {
fn default() -> Self {
Self::new()
}
}
impl CutDetectorConfig {
pub const fn new() -> Self {
Self {
unstable_threshold: 4,
stable_threshold: 9,
subjects_per_observer: 10,
}
}
}
pub struct Mesh<St, R> {
cfg: Config<St>,
grpc: R,
svcs: Vec<Box<dyn MeshService>>,
}
impl Default for Mesh<Rejoin, Server> {
fn default() -> Self {
let cd = CutDetectorConfig::new();
Self {
cfg: Config {
strategy: Default::default(),
lh: (cd.unstable_threshold, cd.stable_threshold),
k: cd.subjects_per_observer,
seed: None,
meta: Default::default(),
server_tls: false,
client_tls: None,
fd_timeout: Duration::from_secs(2),
fd_strikes: 3,
},
grpc: Server::builder(),
svcs: Vec::new(),
}
}
}
impl Mesh<Rejoin, Server> {
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn low_latency() -> Self {
Self::default()
.fault_timeout(Duration::from_millis(150))
.fault_strikes(2)
}
}
impl<St, R> Mesh<St, R> {
pub fn cd_config(mut self, cfg: CutDetectorConfig) -> Self {
assert_ne!(0, cfg.unstable_threshold);
assert_ne!(0, cfg.stable_threshold);
assert_ne!(0, cfg.subjects_per_observer);
assert!(cfg.unstable_threshold <= cfg.stable_threshold);
assert!(cfg.stable_threshold <= cfg.subjects_per_observer);
self.cfg.lh.0 = cfg.unstable_threshold;
self.cfg.lh.1 = cfg.stable_threshold;
self.cfg.k = cfg.subjects_per_observer;
self
}
pub fn join_seed(mut self, addr: SocketAddr, use_tls: bool) -> Self {
self.cfg.seed = Some((addr, use_tls).into());
self
}
pub fn add_metadata<I: IntoIterator<Item = (String, Vec<u8>)>>(mut self, iter: I) -> Self {
self.cfg.meta.extend(iter);
self
}
pub fn client_tls_config(mut self, tls_config: ClientTlsConfig) -> Self {
self.cfg.client_tls = Some(Arc::new(tls_config));
self
}
pub fn part_strategy<_St: partition::Strategy>(self) -> Mesh<_St, R> {
let Mesh { cfg, grpc, svcs } = self;
let strategy = _St::default();
#[rustfmt::skip]
let Config { lh, k, seed, meta, server_tls, client_tls, fd_timeout, fd_strikes, .. } = cfg;
#[rustfmt::skip]
let cfg = Config { strategy, lh, k, seed, meta, server_tls, client_tls, fd_timeout, fd_strikes };
Mesh { cfg, grpc, svcs }
}
pub fn fault_timeout(mut self, timeout: Duration) -> Self {
self.cfg.fd_timeout = timeout;
self
}
pub fn fault_strikes(mut self, strikes: usize) -> Self {
assert!(strikes != 0);
self.cfg.fd_strikes = strikes;
self
}
pub fn add_mesh_service<S: MeshService + 'static>(mut self, svc: S) -> Self {
self.svcs.push(Box::new(svc));
self
}
}
impl<St: partition::Strategy> Mesh<St, Server> {
pub fn server_tls_config(mut self, tls_config: ServerTlsConfig) -> Self {
self.cfg.server_tls = true;
self.grpc = self.grpc.tls_config(tls_config);
self
}
pub fn concurrency_limit_per_connection(mut self, limit: usize) -> Self {
self.grpc = self.grpc.concurrency_limit_per_connection(limit);
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.grpc.timeout(timeout);
self
}
pub fn initial_stream_window_size<S: Into<Option<u32>>>(mut self, sz: S) -> Self {
self.grpc = self.grpc.initial_stream_window_size(sz);
self
}
pub fn initial_connection_window_size<S: Into<Option<u32>>>(mut self, sz: S) -> Self {
self.grpc = self.grpc.initial_connection_window_size(sz);
self
}
pub fn max_concurrent_streams<M: Into<Option<u32>>>(mut self, max: M) -> Self {
self.grpc = self.grpc.max_concurrent_streams(max);
self
}
pub fn tcp_keepalive<D: Into<Option<Duration>>>(mut self, tcp_keepalive: D) -> Self {
self.grpc = self.grpc.tcp_keepalive(tcp_keepalive.into());
self
}
pub fn tcp_nodelay(mut self, enabled: bool) -> Self {
self.grpc = self.grpc.tcp_nodelay(enabled);
self
}
pub fn trace_fn<F>(mut self, f: F) -> Self
where F: Fn(&HeaderMap) -> Span + Send + Sync + 'static {
self.grpc = self.grpc.trace_fn(f);
self
}
pub fn add_service<S>(self, svc: S) -> Mesh<St, Router<S::Service, Unimplemented>>
where
S: ExposedService + 'static,
<<S as ExposedService>::Service as Service<HttpRequest<Body>>>::Future: Send + 'static,
<<S as ExposedService>::Service as Service<HttpRequest<Body>>>::Error:
Into<Box<dyn Error + Send + Sync>> + Send,
{
#[rustfmt::skip]
let Mesh { mut cfg, mut grpc, svcs } = self.add_mesh_service(svc.clone());
svc.add_metadata(&mut *cfg.meta);
let grpc = grpc.add_service(svc.into_service());
Mesh { cfg, grpc, svcs }
}
#[inline]
pub async fn serve(self, addr: SocketAddr) -> Fallible<()> {
self.serve_with_shutdown(addr, pending()).await
}
pub async fn serve_with_shutdown<F>(mut self, addr: SocketAddr, signal: F) -> Fallible<()>
where F: Future<Output = ()> + Send {
let cluster = Arc::new(Cluster::new(self.cfg, addr));
let f_cuts = cluster.subscribe();
let w_cuts = cluster.subscribe();
let svcs: FuturesUnordered<_> = (self.svcs)
.into_iter()
.map(|s| s.accept(cluster.subscribe()))
.collect();
select! {
r = svcs.for_each(|_| async {}).then(|_| pending()) => r,
r = self.grpc
.add_service(Arc::clone(&cluster).into_service())
.serve_with_shutdown(addr, signal)
.err_into() => r,
r = Arc::clone(&cluster).detect_faults(f_cuts)
.err_into() => r,
r = St::handle_parts(Arc::clone(&cluster), w_cuts)
.err_into() => r,
}
}
}
#[doc(hidden)]
impl<St: partition::Strategy, A, B> Mesh<St, Router<A, B>>
where
A: Service<HttpRequest<Body>, Response = HttpResponse<BoxBody>> + Clone + Send + 'static,
A::Future: Send + 'static,
A::Error: Into<Box<dyn Error + Send + Sync>> + Send,
B: Service<HttpRequest<Body>, Response = HttpResponse<BoxBody>> + Clone + Send + 'static,
B::Future: Send + 'static,
B::Error: Into<Box<dyn Error + Send + Sync>> + Send,
{
pub fn add_service<S>(
self,
svc: S,
) -> Mesh<St, Router<S::Service, impl Service<HttpRequest<Body>>>>
where
S: ExposedService + 'static,
<<S as ExposedService>::Service as Service<HttpRequest<Body>>>::Future: Send + 'static,
<<S as ExposedService>::Service as Service<HttpRequest<Body>>>::Error:
Into<Box<dyn Error + Send + Sync>> + Send,
{
#[rustfmt::skip]
let Mesh { mut cfg, grpc, svcs } = self.add_mesh_service(svc.clone());
svc.add_metadata(&mut *cfg.meta);
let grpc = grpc.add_service(svc.into_service());
Mesh { cfg, grpc, svcs }
}
#[inline]
pub async fn serve(self, addr: SocketAddr) -> Fallible<()> {
self.serve_with_shutdown(addr, pending()).await
}
pub async fn serve_with_shutdown<F>(self, addr: SocketAddr, signal: F) -> Fallible<()>
where F: Future<Output = ()> + Send {
let cluster = Arc::new(Cluster::new(self.cfg, addr));
let f_cuts = cluster.subscribe();
let w_cuts = cluster.subscribe();
let svcs: FuturesUnordered<_> = (self.svcs)
.into_iter()
.map(|s| s.accept(cluster.subscribe()))
.collect();
select! {
r = svcs.for_each(|_| async {}).then(|_| pending()) => r,
r = self.grpc
.add_service(Arc::clone(&cluster).into_service())
.serve_with_shutdown(addr, signal)
.err_into() => r,
r = Arc::clone(&cluster).detect_faults(f_cuts)
.err_into() => r,
r = St::handle_parts(Arc::clone(&cluster), w_cuts)
.err_into() => r,
}
}
}
#[derive(Copy, Clone, Debug)]
pub struct GrpcService<S> {
svc: S,
}
impl<S> GrpcService<S>
where
S: Service<HttpRequest<Body>, Response = HttpResponse<BoxBody>>
+ NamedService
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn Error + Send + Sync>> + Send,
{
pub fn new(svc: S) -> Self {
Self { svc }
}
}
#[crate::async_trait]
impl<S: Send> MeshService for GrpcService<S> {
#[inline]
async fn accept(self: Box<Self>, _: Subscription) {}
}
impl<S> ExposedService for GrpcService<S>
where S: Service<HttpRequest<Body>, Response = HttpResponse<BoxBody>>
+ NamedService
+ Clone
+ Send
+ 'static
{
type Service = S;
#[inline]
fn into_service(self) -> Self::Service {
self.svc
}
}
#[crate::async_trait]
pub trait MeshService: Send {
async fn accept(self: Box<Self>, cuts: Subscription);
}
pub trait ExposedService: MeshService + Clone {
fn add_metadata<K: Extend<(String, Vec<u8>)>>(&self, _keys: &mut K) {}
type Service: Service<HttpRequest<Body>, Response = HttpResponse<BoxBody>>
+ NamedService
+ Clone
+ Send
+ 'static;
fn into_service(self) -> Self::Service;
}