use cannyls::deadline::Deadline;
use fibers::time::timer;
use fibers_rpc::client::ClientServiceHandle as RpcServiceHandle;
use frugalos_core::tracer::SpanExt;
use frugalos_mds::{Error as MdsError, ErrorKind as MdsErrorKind};
use frugalos_raft::{LocalNodeId, NodeId};
use futures::future::Either;
use futures::{Async, Future, Poll};
use libfrugalos::client::mds::Client as RaftMdsClient;
use libfrugalos::consistency::ReadConsistency;
use libfrugalos::entity::node::RemoteNodeId;
use libfrugalos::entity::object::{
DeleteObjectsByPrefixSummary, Metadata, ObjectId, ObjectPrefix, ObjectSummary, ObjectVersion,
};
use libfrugalos::expect::Expect;
use libfrugalos::time::Seconds;
use rand::{self, thread_rng, Rng};
use rustracing::tag::{StdTag, Tag};
use rustracing_jaeger::span::{Span, SpanHandle};
use slog::Logger;
use std::collections::hash_set::HashSet;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::{Arc, Mutex};
use trackable::error::ErrorKindExt;
use crate::config::{ClusterConfig, MdsClientConfig, MdsRequestPolicy};
use crate::{Error, ErrorKind, ObjectValue, Result};
#[derive(Debug, Clone)]
pub struct MdsClient {
logger: Logger,
rpc_service: RpcServiceHandle,
inner: Arc<Mutex<Inner>>,
client_config: MdsClientConfig,
}
impl MdsClient {
pub fn new(
logger: Logger,
rpc_service: RpcServiceHandle,
cluster_config: ClusterConfig,
client_config: MdsClientConfig,
) -> Self {
MdsClient {
logger,
rpc_service,
inner: Arc::new(Mutex::new(Inner::new(cluster_config))),
client_config,
}
}
pub fn latest(&self) -> impl Future<Item = Option<ObjectSummary>, Error = Error> {
let parent = Span::inactive().handle();
let request = SingleRequestOnce::new(RequestKind::Other, move |client| {
Box::new(client.latest_version().map_err(MdsError::from))
});
Request::new(self.clone(), parent, request)
}
pub fn list(&self) -> impl Future<Item = Vec<ObjectSummary>, Error = Error> {
debug!(self.logger, "Starts LIST");
let parent = Span::inactive().handle();
let request = SingleRequestOnce::new(RequestKind::Other, move |client| {
Box::new(
client
.list_objects(ReadConsistency::Consistent)
.map_err(MdsError::from),
)
});
Request::new(self.clone(), parent, request)
}
pub fn list_by_prefix(
&self,
prefix: ObjectPrefix,
parent: SpanHandle,
) -> impl Future<Item = Vec<ObjectSummary>, Error = Error> {
debug!(self.logger, "Starts LIST BY PREFIX");
let request = SingleRequestOnce::new(RequestKind::Other, move |client| {
Box::new(
client
.list_objects_by_prefix(prefix.clone())
.map_err(MdsError::from),
)
});
Request::new(self.clone(), parent, request)
}
pub fn get(
&self,
id: ObjectId,
consistency: ReadConsistency,
parent: SpanHandle,
) -> impl Future<Item = Option<ObjectValue>, Error = Error> {
debug!(self.logger, "Starts GET: id={:?}", id);
let member_size = self.member_size();
if let Err(e) = validate_consistency(consistency.clone(), member_size) {
return Either::A(futures::future::err(track!(e)));
}
let concurrency = match consistency {
ReadConsistency::Quorum => Some(self.majority_size()),
ReadConsistency::Subset(n) => Some(n),
_ => None,
};
let request = if let Some(concurrency) = concurrency {
RequestOnce2::Parallel(ParallelRequestOnce::new(
RequestKind::Get,
concurrency,
move |clients| {
let futures: Vec<_> = clients
.into_iter()
.map(|(client, mut span)| {
let future: BoxFuture<_> = Box::new(
client
.get_object(id.clone(), Expect::Any, consistency.clone())
.map(to_object_value)
.map_err(move |e| {
span.log_error(&e);
track!(MdsError::from(e))
}),
);
future
})
.collect();
Box::new(GetLatestObject::new(futures))
},
))
} else {
RequestOnce2::Single(SingleRequestOnce::new(RequestKind::Get, move |client| {
let future = client
.get_object(id.clone(), Expect::Any, consistency.clone())
.map(to_object_value)
.map_err(MdsError::from);
Box::new(future)
}))
};
Either::B(Request::new(self.clone(), parent, request))
}
pub fn head(
&self,
id: ObjectId,
consistency: ReadConsistency,
parent: SpanHandle,
) -> impl Future<Item = Option<ObjectVersion>, Error = Error> {
debug!(self.logger, "Starts HEAD: id={:?}", id);
let member_size = self.member_size();
if let Err(e) = validate_consistency(consistency.clone(), member_size) {
return Either::A(futures::future::err(track!(e)));
}
let concurrency = match consistency {
ReadConsistency::Quorum => Some(self.majority_size()),
ReadConsistency::Subset(n) => Some(n),
_ => None,
};
let request = if let Some(concurrency) = concurrency {
RequestOnce2::Parallel(ParallelRequestOnce::new(
RequestKind::Head,
concurrency,
move |clients| {
let futures: Vec<_> = clients
.into_iter()
.map(|(client, mut span)| {
let future: BoxFuture<_> = Box::new(
client
.head_object(id.clone(), Expect::Any, consistency.clone())
.map_err(move |e| {
span.log_error(&e);
track!(MdsError::from(e))
}),
);
future
})
.collect();
Box::new(GetLatestObject::new(futures))
},
))
} else {
RequestOnce2::Single(SingleRequestOnce::new(RequestKind::Head, move |client| {
Box::new(
client
.head_object(id.clone(), Expect::Any, consistency.clone())
.map_err(MdsError::from),
)
}))
};
Either::B(Request::new(self.clone(), parent, request))
}
pub fn delete(
&self,
id: ObjectId,
expect: Expect,
parent: SpanHandle,
) -> impl Future<Item = Option<ObjectVersion>, Error = Error> {
debug!(self.logger, "Starts DELETE: id={:?}", id);
let request = SingleRequestOnce::new(RequestKind::Other, move |client| {
Box::new(
client
.delete_object(id.clone(), expect.clone())
.map_err(MdsError::from),
)
});
Request::new(self.clone(), parent, request)
}
pub fn delete_by_version(
&self,
version: ObjectVersion,
parent: SpanHandle,
) -> impl Future<Item = Option<ObjectVersion>, Error = Error> {
debug!(self.logger, "Starts DELETE: version={:?}", version);
let request = SingleRequestOnce::new(RequestKind::Other, move |client| {
Box::new(
client
.delete_object_by_version(version)
.map_err(MdsError::from),
)
});
Request::new(self.clone(), parent, request)
}
pub fn delete_by_range(
&self,
targets: Range<ObjectVersion>,
parent: SpanHandle,
) -> impl Future<Item = Vec<ObjectSummary>, Error = Error> {
debug!(
self.logger,
"Starts DELETE: versions if {:?} <= it < {:?}", targets.start, targets.end
);
let request = SingleRequestOnce::new(RequestKind::Other, move |client| {
Box::new(
client
.delete_by_range(targets.clone())
.map_err(MdsError::from),
)
});
Request::new(self.clone(), parent, request)
}
pub fn delete_by_prefix(
&self,
prefix: ObjectPrefix,
parent: SpanHandle,
) -> impl Future<Item = DeleteObjectsByPrefixSummary, Error = Error> {
debug!(self.logger, "Starts DELETE: prefix={:?}", prefix);
let request = SingleRequestOnce::new(RequestKind::Other, move |client| {
Box::new(
client
.delete_by_prefix(prefix.clone())
.map_err(MdsError::from),
)
});
Request::new(self.clone(), parent, request)
}
pub fn put(
&self,
id: ObjectId,
content: Vec<u8>,
expect: Expect,
deadline: Deadline,
parent: SpanHandle,
) -> impl Future<Item = (ObjectVersion, bool), Error = Error> {
debug!(self.logger, "Starts PUT: id={:?}", id);
let put_content_timeout = Seconds(if let Deadline::Within(d) = deadline {
d.as_secs() + self.client_config.put_content_timeout.0
} else {
self.client_config.put_content_timeout.0
});
let request = SingleRequestOnce::new(RequestKind::Other, move |client| {
Box::new(
client
.put_object(
id.clone(),
content.clone(),
expect.clone(),
put_content_timeout.into(),
)
.map(|(leader, (version, old))| (leader, (version, old.is_none())))
.map_err(MdsError::from),
)
});
Request::new(self.clone(), parent, request)
}
pub fn object_count(&self) -> impl Future<Item = u64, Error = Error> {
let parent = Span::inactive().handle();
let request = SingleRequestOnce::new(RequestKind::Other, |client| {
Box::new(
client
.object_count(ReadConsistency::Consistent)
.map_err(MdsError::from),
)
});
Request::new(self.clone(), parent, request)
}
fn timeout(&self, kind: RequestKind) -> RequestTimeout {
match self.request_policy(&kind) {
MdsRequestPolicy::Conservative => RequestTimeout::Never,
MdsRequestPolicy::Speculative { timeout, .. } => RequestTimeout::Speculative {
timer: timer::timeout(*timeout),
},
}
}
fn request_policy(&self, kind: &RequestKind) -> &MdsRequestPolicy {
match kind {
RequestKind::Get => &self.client_config.get_request_policy,
RequestKind::Head => &self.client_config.head_request_policy,
RequestKind::Other => &self.client_config.default_request_policy,
}
}
fn majority_size(&self) -> usize {
((self.member_size() as f64 / 2.0).ceil()) as usize
}
fn member_size(&self) -> usize {
self.inner
.lock()
.unwrap_or_else(|e| panic!("{}", e))
.config
.members
.len()
}
fn clear_leader(&self) {
self.inner.lock().unwrap_or_else(|e| panic!("{}", e)).leader = None;
}
fn set_leader(&self, leader: LocalNodeId) {
info!(self.logger, "Set leader: {:?}", leader);
let mut inner = self.inner.lock().unwrap_or_else(|e| panic!("{}", e));
let leader = inner
.config
.members
.iter()
.map(|m| m.node)
.find(|node| node.local_id == leader)
.expect("Never fails");
inner.leader = Some(leader);
}
fn next_peer(&self, policy: &MdsRequestPolicy, candidate: usize) -> NodeId {
match policy {
MdsRequestPolicy::Conservative => self.leader(),
MdsRequestPolicy::Speculative { .. } => self.leader_or_candidate(candidate),
}
}
fn next_peers(&self, mut i: usize, required_peers: usize) -> Result<HashSet<NodeId>> {
let inner = track!(self
.inner
.lock()
.map_err(|e| Error::from(ErrorKind::Other.cause(format!("{}", e)))))?;
let member_total = inner.config.members.len();
track_assert!(
required_peers <= member_total,
ErrorKind::Invalid,
"This cluster has {} members but {} peers are required",
member_total,
required_peers
);
let mut peers = HashSet::new();
if let Some(leader) = inner.leader {
peers.insert(leader);
}
while peers.len() < required_peers {
peers.insert(inner.config.members[i % member_total].node);
i += 1;
}
Ok(peers)
}
fn leader(&self) -> NodeId {
let mut inner = self.inner.lock().unwrap_or_else(|e| panic!("{}", e));
if inner.leader.is_none() {
inner.leader = rand::thread_rng()
.choose(&inner.config.members)
.map(|m| m.node);
}
inner.leader.unwrap_or_else(|| unreachable!())
}
fn leader_or_candidate(&self, member: usize) -> NodeId {
let inner = self.inner.lock().unwrap_or_else(|e| panic!("{}", e));
if inner.leader.is_none() {
return inner
.config
.members
.get(member % inner.config.members.len())
.map(|m| m.node)
.unwrap_or_else(|| unreachable!());
}
inner.leader.unwrap_or_else(|| unreachable!())
}
}
type BoxFuture<V> =
Box<dyn Future<Item = (Option<RemoteNodeId>, V), Error = MdsError> + Send + 'static>;
fn to_object_value(
response: (Option<RemoteNodeId>, Option<Metadata>),
) -> (Option<RemoteNodeId>, Option<ObjectValue>) {
(
response.0,
response.1.map(|metadata| ObjectValue {
version: metadata.version,
content: metadata.data,
}),
)
}
fn validate_consistency(consistency: ReadConsistency, member_size: usize) -> Result<()> {
if member_size == 0 {
return track!(Err(ErrorKind::Invalid
.cause("The size of cluster member must be bigger than 0")
.into()));
}
match consistency {
ReadConsistency::Subset(n) => {
if n == 0 || member_size < n {
track!(Err(ErrorKind::Invalid
.cause(format!("subset must be 0 < n <= {}", member_size))
.into()))
} else {
Ok(())
}
}
ReadConsistency::Quorum | ReadConsistency::Stale | ReadConsistency::Consistent => Ok(()),
}
}
fn make_request_span(parent: &SpanHandle, peer: &NodeId) -> Span {
parent.child("mds_request", |span| {
span.tag(StdTag::component(module_path!()))
.tag(StdTag::span_kind("client"))
.tag(StdTag::peer_ip(peer.addr.ip()))
.tag(StdTag::peer_port(peer.addr.port()))
.tag(Tag::new("peer.node", peer.local_id.to_string()))
.start()
})
}
#[derive(Debug)]
pub struct Inner {
config: ClusterConfig,
leader: Option<NodeId>,
}
impl Inner {
pub fn new(config: ClusterConfig) -> Self {
Inner {
config,
leader: None,
}
}
}
#[derive(Clone, Copy)]
pub enum RequestKind {
Head,
Get,
Other,
}
pub enum RequestTimeout {
Never,
Speculative { timer: timer::Timeout },
}
impl Future for RequestTimeout {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
RequestTimeout::Never => Ok(Async::NotReady),
RequestTimeout::Speculative { timer } => track!(timer.poll().map_err(Error::from)),
}
}
}
#[allow(clippy::type_complexity)]
pub struct Request<T: RequestOnce> {
client: MdsClient,
max_retry: usize,
request: T,
parent: SpanHandle,
peers: Vec<NodeId>,
timeout: RequestTimeout,
future: Option<BoxFuture<T::Item>>,
}
impl<T> Request<T>
where
T: RequestOnce,
T::Item: Send + 'static,
{
pub fn new(client: MdsClient, parent: SpanHandle, request: T) -> Self {
let max_retry = client.member_size();
let timeout = client.timeout(request.kind());
Request {
client,
max_retry,
request,
parent,
peers: Vec::new(),
timeout,
future: None,
}
}
fn request_once(&mut self) -> Result<()> {
track_assert_ne!(self.max_retry, 0, ErrorKind::Busy);
self.max_retry -= 1;
let (peers, future) = track!(self.request.request_once(&self.client, &self.parent))?;
self.peers = peers;
self.timeout = self.client.timeout(self.request.kind());
self.future = Some(future);
Ok(())
}
}
impl<T> Future for Request<T>
where
T: RequestOnce,
T::Item: Send + 'static,
{
type Item = T::Item;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
while let Async::Ready(()) = track!(self.timeout.poll())? {
warn!(
self.client.logger,
"Request timeout: peers={:?}, max_retry={}", self.peers, self.max_retry
);
self.client.clear_leader();
if self.max_retry == 0 {
track_panic!(ErrorKind::Busy, "max retry reached: peers={:?}", self.peers);
}
track!(self.request_once())?;
}
match self.future.poll() {
Err(e) => {
debug!(
self.client.logger,
"Error: peers={:?}, reason={}", self.peers, e
);
if let MdsErrorKind::Unexpected(current) = *e.kind() {
return Err(
track!(ErrorKind::UnexpectedVersion { current }.takes_over(e)).into(),
);
} else {
self.client.clear_leader();
}
if self.max_retry == 0 {
return Err(
track!(ErrorKind::Busy.takes_over(e), "peers={:?}", self.peers).into(),
);
}
track!(self.request_once())?;
debug!(self.client.logger, "Tries next peers: {:?}", self.peers);
self.poll()
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => {
track!(self.request_once())?;
self.poll()
}
Ok(Async::Ready(Some((new_leader, v)))) => {
if let Some(leader) = new_leader {
let (_addr, local_node_id) = leader;
self.client.set_leader(track!(local_node_id.parse())?);
}
Ok(Async::Ready(v))
}
}
}
}
pub trait RequestOnce {
type Item;
fn kind(&self) -> RequestKind;
fn request_once(
&mut self,
client: &MdsClient,
parent: &SpanHandle,
) -> Result<(Vec<NodeId>, BoxFuture<Self::Item>)>;
}
enum RequestOnce2<F, G> {
Single(SingleRequestOnce<F>),
Parallel(ParallelRequestOnce<G>),
}
impl<F, G, V> RequestOnce for RequestOnce2<F, G>
where
F: Fn(RaftMdsClient) -> BoxFuture<V>,
G: Fn(Vec<(RaftMdsClient, Span)>) -> BoxFuture<V>,
V: Send + 'static,
{
type Item = V;
fn kind(&self) -> RequestKind {
match self {
RequestOnce2::Single(r) => r.kind(),
RequestOnce2::Parallel(r) => r.kind(),
}
}
fn request_once(
&mut self,
client: &MdsClient,
parent: &SpanHandle,
) -> Result<(Vec<NodeId>, BoxFuture<Self::Item>)> {
match self {
RequestOnce2::Single(r) => r.request_once(client, parent),
RequestOnce2::Parallel(r) => r.request_once(client, parent),
}
}
}
struct ParallelRequestOnce<F> {
kind: RequestKind,
required_peers: usize,
from_peer: usize,
f: F,
}
impl<F, V> ParallelRequestOnce<F>
where
F: Fn(Vec<(RaftMdsClient, Span)>) -> BoxFuture<V>,
V: Send + 'static,
{
fn new(kind: RequestKind, required_peers: usize, f: F) -> Self {
let from_peer = thread_rng().gen();
Self {
kind,
required_peers,
from_peer,
f,
}
}
}
impl<F, V> RequestOnce for ParallelRequestOnce<F>
where
F: Fn(Vec<(RaftMdsClient, Span)>) -> BoxFuture<V>,
V: Send + 'static,
{
type Item = V;
fn kind(&self) -> RequestKind {
self.kind
}
fn request_once(
&mut self,
client: &MdsClient,
parent: &SpanHandle,
) -> Result<(Vec<NodeId>, BoxFuture<Self::Item>)> {
self.from_peer += 1;
let peers = track!(client.next_peers(self.from_peer, self.required_peers))?;
let mut clients = Vec::new();
for peer in &peers {
let client = RaftMdsClient::new(
(peer.addr, peer.local_id.to_string()),
client.rpc_service.clone(),
);
let span = make_request_span(parent, peer);
clients.push((client, span));
}
let future = (self.f)(clients);
Ok((peers.into_iter().collect(), Box::new(future)))
}
}
struct SingleRequestOnce<F> {
kind: RequestKind,
from_peer: usize,
f: F,
}
impl<F, V> SingleRequestOnce<F>
where
F: Fn(RaftMdsClient) -> BoxFuture<V>,
V: Send + 'static,
{
fn new(kind: RequestKind, f: F) -> Self {
let from_peer = thread_rng().gen();
Self { kind, from_peer, f }
}
}
impl<F, V> RequestOnce for SingleRequestOnce<F>
where
F: Fn(RaftMdsClient) -> BoxFuture<V>,
V: Send + 'static,
{
type Item = V;
fn kind(&self) -> RequestKind {
self.kind
}
fn request_once(
&mut self,
client: &MdsClient,
parent: &SpanHandle,
) -> Result<(Vec<NodeId>, BoxFuture<Self::Item>)> {
self.from_peer += 1;
let request_policy = client.request_policy(&self.kind);
let peer = client.next_peer(request_policy, self.from_peer);
let mut span = make_request_span(parent, &peer);
let client = RaftMdsClient::new(
(peer.addr, peer.local_id.to_string()),
client.rpc_service.clone(),
);
let future = (self.f)(client);
let future = future.then(move |result| {
if let Err(ref e) = result {
span.log_error(e);
}
track!(result)
});
Ok((vec![peer], Box::new(future)))
}
}
trait ContainObjectVersion {
fn object_version(&self) -> ObjectVersion;
}
impl ContainObjectVersion for ObjectVersion {
fn object_version(&self) -> ObjectVersion {
*self
}
}
impl ContainObjectVersion for ObjectValue {
fn object_version(&self) -> ObjectVersion {
self.version
}
}
impl<A, B: ContainObjectVersion> ContainObjectVersion for (A, B) {
fn object_version(&self) -> ObjectVersion {
self.1.object_version()
}
}
#[inline]
fn select_latest<I: Iterator>(values: I) -> Option<I::Item>
where
I::Item: ContainObjectVersion,
{
values.max_by_key(ContainObjectVersion::object_version)
}
struct GetLatestObject<V> {
not_found_count: usize,
futures: Vec<BoxFuture<Option<V>>>,
values: Vec<(Option<RemoteNodeId>, V)>,
}
impl<V> GetLatestObject<V> {
fn new(futures: Vec<BoxFuture<Option<V>>>) -> Self {
Self {
futures,
not_found_count: 0,
values: Vec::new(),
}
}
}
impl<V> Future for GetLatestObject<V>
where
V: Debug + ContainObjectVersion,
{
type Item = (Option<RemoteNodeId>, Option<V>);
type Error = MdsError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut i = 0;
while i < self.futures.len() {
match track!(self.futures[i].poll()) {
Err(e) => {
self.futures.swap_remove(i);
return track!(Err(e));
}
Ok(Async::NotReady) => {
i += 1;
}
Ok(Async::Ready(response)) => {
self.futures.swap_remove(i);
if let (leader, Some(value)) = response {
self.values.push((leader, value));
} else {
self.not_found_count += 1;
}
}
}
}
if self.futures.is_empty() {
let values = self.values.drain(..);
if self.not_found_count > values.len() {
return Ok(Async::Ready((None, None)));
}
if let Some((leader, value)) = select_latest(values) {
return Ok(Async::Ready((leader, Some(value))));
}
let e = MdsErrorKind::Other.cause("No MDS response");
return track!(Err(e.into()));
}
Ok(Async::NotReady)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn validate_consistency_works() {
assert!(validate_consistency(ReadConsistency::Consistent, 3).is_ok());
assert!(validate_consistency(ReadConsistency::Consistent, 0).is_err());
assert!(validate_consistency(ReadConsistency::Stale, 2).is_ok());
assert!(validate_consistency(ReadConsistency::Stale, 0).is_err());
assert!(validate_consistency(ReadConsistency::Quorum, 1).is_ok());
assert!(validate_consistency(ReadConsistency::Quorum, 0).is_err());
assert!(validate_consistency(ReadConsistency::Subset(4), 12).is_ok());
assert!(validate_consistency(ReadConsistency::Subset(2), 2).is_ok());
assert!(validate_consistency(ReadConsistency::Subset(2), 1).is_err());
assert!(validate_consistency(ReadConsistency::Subset(0), 1).is_err());
}
}