use super::cluster::{
cut::{Closed, Subscription},
Cluster, Config,
};
use bytes::Bytes;
use futures::{
future::{pending, FutureExt, TryFutureExt},
stream::{FuturesUnordered, StreamExt},
};
use std::{
convert::Infallible, error, future::Future, net::SocketAddr, result, sync::Arc, time::Duration,
};
use thiserror::Error;
use tokio::select;
use tonic::{
body::BoxBody,
codegen::{
http::{Request, Response},
Service,
},
transport::{
self,
server::{Router, Routes},
Body, ClientTlsConfig, NamedService, Server, ServerTlsConfig,
},
};
use tower_layer::Layer;
use tracing::Span;
pub type Result = result::Result<(), Error>;
#[derive(Debug, Error)]
pub enum Error {
#[error("mesh: {}", .0)]
Transport(#[from] transport::Error),
#[error("mesh: task closed")]
Closed(#[from] Closed),
}
#[derive(Copy, Clone, Debug)]
pub struct CutDetectorConfig {
pub unstable_threshold: usize,
pub stable_threshold: usize,
pub subjects_per_observer: usize,
}
impl Default for CutDetectorConfig {
fn default() -> Self {
Self::new()
}
}
impl CutDetectorConfig {
pub const fn new() -> Self {
Self {
unstable_threshold: 4,
stable_threshold: 9,
subjects_per_observer: 10,
}
}
}
pub struct Mesh<R> {
cfg: Config,
grpc: R,
svcs: Vec<Box<dyn MeshService>>,
}
impl Default for Mesh<Server> {
fn default() -> Self {
let cd = CutDetectorConfig::new();
Self {
cfg: Config {
lh: (cd.unstable_threshold, cd.stable_threshold),
k: cd.subjects_per_observer,
seed: None,
meta: Default::default(),
server_tls: false,
client_tls: None,
fd_timeout: Duration::from_secs(2),
fd_strikes: 3,
},
grpc: Server::builder(),
svcs: Vec::new(),
}
}
}
impl Mesh<Server> {
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn low_latency() -> Self {
Self::default()
.fault_timeout(Duration::from_millis(150))
.fault_strikes(2)
}
}
impl<R> Mesh<R> {
pub fn cd_config(mut self, cfg: CutDetectorConfig) -> Self {
assert_ne!(0, cfg.unstable_threshold);
assert_ne!(0, cfg.stable_threshold);
assert_ne!(0, cfg.subjects_per_observer);
assert!(cfg.unstable_threshold <= cfg.stable_threshold);
assert!(cfg.stable_threshold <= cfg.subjects_per_observer);
self.cfg.lh.0 = cfg.unstable_threshold;
self.cfg.lh.1 = cfg.stable_threshold;
self.cfg.k = cfg.subjects_per_observer;
self
}
pub fn join_seed(mut self, addr: SocketAddr, use_tls: bool) -> Self {
self.cfg.seed = Some((addr, use_tls).into());
self
}
pub fn add_metadata<I: IntoIterator<Item = (String, Vec<u8>)>>(mut self, iter: I) -> Self {
self.cfg.meta.extend(iter);
self
}
pub fn client_tls_config(mut self, tls_config: ClientTlsConfig) -> Self {
self.cfg.client_tls = Some(Arc::new(tls_config));
self
}
pub fn fault_timeout(mut self, timeout: Duration) -> Self {
self.cfg.fd_timeout = timeout;
self
}
pub fn fault_strikes(mut self, strikes: usize) -> Self {
assert!(strikes != 0);
self.cfg.fd_strikes = strikes;
self
}
pub fn add_mesh_service<S: MeshService + 'static>(mut self, svc: S) -> Self {
self.svcs.push(Box::new(svc));
self
}
}
impl<L> Mesh<Server<L>> {
pub fn server_tls_config(mut self, tls_config: ServerTlsConfig) -> Self {
self.cfg.server_tls = true;
self.grpc = self.grpc.tls_config(tls_config).unwrap();
self
}
pub fn concurrency_limit_per_connection(mut self, limit: usize) -> Self {
self.grpc = self.grpc.concurrency_limit_per_connection(limit);
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.grpc = self.grpc.timeout(timeout);
self
}
pub fn initial_stream_window_size<S: Into<Option<u32>>>(mut self, sz: S) -> Self {
self.grpc = self.grpc.initial_stream_window_size(sz);
self
}
pub fn initial_connection_window_size<S: Into<Option<u32>>>(mut self, sz: S) -> Self {
self.grpc = self.grpc.initial_connection_window_size(sz);
self
}
pub fn max_concurrent_streams<M: Into<Option<u32>>>(mut self, max: M) -> Self {
self.grpc = self.grpc.max_concurrent_streams(max);
self
}
pub fn tcp_keepalive<D: Into<Option<Duration>>>(mut self, tcp_keepalive: D) -> Self {
self.grpc = self.grpc.tcp_keepalive(tcp_keepalive.into());
self
}
pub fn tcp_nodelay(mut self, enabled: bool) -> Self {
self.grpc = self.grpc.tcp_nodelay(enabled);
self
}
pub fn trace_fn<F>(mut self, f: F) -> Self
where F: Fn(&http::Request<()>) -> Span + Send + Sync + 'static {
self.grpc = self.grpc.trace_fn(f);
self
}
pub fn add_service<S>(self, svc: S) -> Mesh<Router<L>>
where
S: ExposedService + 'static,
<<S as ExposedService>::Service as Service<Request<Body>>>::Future: Send + 'static,
L: Clone,
{
#[rustfmt::skip]
let Mesh { mut cfg, mut grpc, svcs } = self.add_mesh_service(svc.clone());
svc.add_metadata(&mut *cfg.meta);
let grpc = grpc.add_service(svc.into_service());
Mesh { cfg, grpc, svcs }
}
#[inline]
pub async fn serve<Resp>(self, addr: SocketAddr) -> Result
where
L: Layer<Routes> + Clone,
L::Service: Service<Request<Body>, Response = Response<Resp>> + Clone + Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
Into<Box<dyn error::Error + Send + Sync>> + Send,
Resp: http_body::Body<Data = Bytes> + Send + 'static,
Resp::Error: Into<Box<dyn error::Error + Send + Sync>>,
{
self.serve_with_shutdown(addr, pending()).await
}
pub async fn serve_with_shutdown<Resp, F>(mut self, addr: SocketAddr, signal: F) -> Result
where
F: Future<Output = ()> + Send,
L: Layer<Routes> + Clone,
L::Service: Service<Request<Body>, Response = Response<Resp>> + Clone + Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
Into<Box<dyn error::Error + Send + Sync>> + Send,
Resp: http_body::Body<Data = Bytes> + Send + 'static,
Resp::Error: Into<Box<dyn error::Error + Send + Sync>>,
{
let cluster = Arc::new(Cluster::new(self.cfg, addr));
select! {
r = self.svcs.into_iter()
.map(|s| s.accept(cluster.subscribe()))
.collect::<FuturesUnordered<_>>()
.for_each(|_| async {})
.then(|_| pending()) => r,
r = Arc::clone(&cluster)
.detect_faults(cluster.subscribe())
.err_into() => r,
r = Arc::clone(&cluster)
.handle_parts(cluster.subscribe())
.err_into() => r,
r = self.grpc
.add_service(cluster.into_service())
.serve_with_shutdown(addr, signal)
.err_into() => r,
}
}
}
#[doc(hidden)]
impl<L> Mesh<Router<L>> {
pub fn add_service<S>(self, svc: S) -> Mesh<Router<L>>
where
S: ExposedService + 'static,
<<S as ExposedService>::Service as Service<Request<Body>>>::Future: Send + 'static,
L: Clone,
{
#[rustfmt::skip]
let Mesh { mut cfg, grpc, svcs } = self.add_mesh_service(svc.clone());
svc.add_metadata(&mut *cfg.meta);
let grpc = grpc.add_service(svc.into_service());
Mesh { cfg, grpc, svcs }
}
#[inline]
pub async fn serve<Resp>(self, addr: SocketAddr) -> Result
where
L: Layer<Routes> + Clone,
L::Service: Service<Request<Body>, Response = Response<Resp>> + Clone + Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
Into<Box<dyn error::Error + Send + Sync>> + Send,
Resp: http_body::Body<Data = Bytes> + Send + 'static,
Resp::Error: Into<Box<dyn error::Error + Send + Sync>>,
{
self.serve_with_shutdown(addr, pending()).await
}
pub async fn serve_with_shutdown<Resp, F>(self, addr: SocketAddr, signal: F) -> Result
where
F: Future<Output = ()> + Send,
L: Layer<Routes> + Clone,
L::Service: Service<Request<Body>, Response = Response<Resp>> + Clone + Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
Into<Box<dyn error::Error + Send + Sync>> + Send,
Resp: http_body::Body<Data = Bytes> + Send + 'static,
Resp::Error: Into<Box<dyn error::Error + Send + Sync>>,
{
let cluster = Arc::new(Cluster::new(self.cfg, addr));
select! {
r = self.svcs.into_iter()
.map(|s| s.accept(cluster.subscribe()))
.collect::<FuturesUnordered<_>>()
.for_each(|_| async {})
.then(|_| pending()) => r,
r = Arc::clone(&cluster)
.detect_faults(cluster.subscribe())
.err_into() => r,
r = Arc::clone(&cluster)
.handle_parts(cluster.subscribe())
.err_into() => r,
r = self.grpc
.add_service(cluster.into_service())
.serve_with_shutdown(addr, signal)
.err_into() => r,
}
}
}
#[derive(Copy, Clone, Debug)]
pub struct GrpcService<S> {
svc: S,
}
impl<S> GrpcService<S>
where
S: Service<Request<Body>, Response = Response<BoxBody>> + NamedService + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn error::Error + Send + Sync>> + Send,
{
pub fn new(svc: S) -> Self {
Self { svc }
}
}
#[crate::async_trait]
impl<S: Send> MeshService for GrpcService<S> {
#[inline]
async fn accept(self: Box<Self>, _: Subscription) {}
}
impl<S> ExposedService for GrpcService<S>
where S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static
{
type Service = S;
#[inline]
fn into_service(self) -> Self::Service {
self.svc
}
}
#[crate::async_trait]
pub trait MeshService: Send {
async fn accept(self: Box<Self>, cuts: Subscription);
}
pub trait ExposedService: MeshService + Clone {
fn add_metadata<K: Extend<(String, Vec<u8>)>>(&self, _keys: &mut K) {}
type Service: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static;
fn into_service(self) -> Self::Service;
}