pub mod cut;
pub mod partition;
mod proto;
use super::collections::{EventFilter, EventId, FreqSet, Tumbler};
use cut::{Member, MultiNodeCut, Subscription};
use proto::{
broadcast_req::{Broadcasted::*, *},
membership_client::*,
membership_server::*,
*,
};
use failure::{Compat, Fallible};
use fnv::FnvHasher;
use futures::{
future::TryFutureExt,
stream::{FuturesUnordered, StreamExt},
};
use rand::{thread_rng, Rng};
use std::{
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
convert::TryInto,
future::Future,
hash::{Hash, Hasher},
mem,
net::SocketAddr,
pin::Pin,
sync::{Arc, Weak},
task::{Context, Poll},
time::Duration,
};
use tokio::{
join, select,
sync::{broadcast, oneshot, RwLock},
task,
time::{delay_for, timeout},
};
use tonic::{
transport::{self, ClientTlsConfig},
Request, Response, Status,
};
#[doc(hidden)]
pub struct Config<St> {
#[allow(dead_code)]
pub(crate) strategy: St,
pub(crate) lh: (usize, usize),
pub(crate) k: usize,
pub(crate) seed: Option<Endpoint>,
pub(crate) meta: Metadata,
pub(crate) server_tls: bool,
pub(crate) client_tls: Option<Arc<ClientTlsConfig>>,
pub(crate) fd_timeout: Duration,
pub(crate) fd_strikes: usize,
}
type Grpc<T> = Result<T, Status>;
type GrpcResponse<T> = Grpc<Response<T>>;
#[doc(hidden)]
pub struct Cluster<St> {
cfg: Config<St>,
addr: SocketAddr,
state: Arc<RwLock<State>>,
cuts: broadcast::Sender<MultiNodeCut>,
}
#[crate::async_trait]
impl<St: partition::Strategy> Membership for Arc<Cluster<St>> {
async fn pre_join(&self, req: Request<PreJoinReq>) -> GrpcResponse<PreJoinResp> {
let PreJoinReq { sender, uuid } = req.into_inner();
sender.validate()?;
let state = self.state.read().await;
if !state.nodes.contains(&sender) {
state.verify_unused_uuid(&uuid)?;
}
let resp = PreJoinResp {
sender: self.local_node(),
conf_id: state.conf_id,
contact: state.nodes.predecessors(&sender).cloned().collect(),
};
Ok(Response::new(resp))
}
async fn join(&self, req: Request<JoinReq>) -> GrpcResponse<JoinResp> {
#[rustfmt::skip]
let JoinReq { sender, ring, uuid, conf_id, meta } = req.into_inner();
sender.validate()?;
let mut state = self.state.write().await;
state.verify_config(conf_id)?;
let local_node = self.local_node();
if state.nodes.contains(&sender) {
return Ok(Response::new(state.join_response(local_node)));
}
state.verify_unused_uuid(&uuid)?;
state.verify_ring(&local_node, &sender, ring)?;
self.enqueue_edge(&mut state, Edge {
node: sender.clone(),
ring,
join: Some(Join { uuid, meta }),
});
match state.join_requests.get(&sender) {
Some(pending) if pending.task_is_waiting() => {
return Err(Status::already_exists("another join request is pending"));
}
_ => {}
}
let (pending, join) = join_task();
state.join_requests.insert(sender, pending);
drop(state);
Ok(Response::new(join.await.unwrap()))
}
async fn batched_alert(&self, req: Request<BatchedAlertReq>) -> GrpcResponse<Ack> {
#[rustfmt::skip]
let BatchedAlertReq { sender, conf_id, edges } = req.into_inner();
let mut state = self.state.write().await;
state.verify_sender(&sender)?;
state.verify_config(conf_id)?;
if state.fpx_announced {
return Err(Status::aborted("fast paxos round has already begun"));
}
(edges.iter())
.map(|e| state.verify_edge(&sender, e))
.collect::<Grpc<()>>()?;
for Edge { node, ring, join } in edges {
state.merge_cd_alert(node, join, Vote {
node: sender.clone(),
ring,
});
}
state.merge_implicit_cd_alerts(self.cfg.lh);
if let Some(proposal) = state.generate_cd_proposal(self.cfg.lh) {
state.register_fpx_round(&proposal);
task::spawn(Arc::clone(self).do_broadcast(FastPhase2b(FastPhase2bReq {
sender: self.local_node(),
conf_id,
nodes: proposal,
})));
task::spawn(Arc::clone(self).begin_px_round(PaxosRound {
sender: self.local_node(),
conf_id,
members: state.nodes.len(),
}));
}
Ok(Response::new(Ack {}))
}
async fn fast_phase2b(&self, req: Request<FastPhase2bReq>) -> GrpcResponse<Ack> {
#[rustfmt::skip]
let FastPhase2bReq { sender, conf_id, nodes } = req.into_inner();
let mut state = self.state.write().await;
state.verify_sender(&sender)?;
state.verify_config(conf_id)?;
if !state.fpx_voters.insert(sender) {
return Err(Status::already_exists("sender has already voted"));
}
let quorum = state.fast_quorum();
let ballot = match state.fpx_ballots.entry(nodes) {
Entry::Occupied(mut o) => {
*o.get_mut() += 1;
o
}
entry => entry.insert(1),
};
if *ballot.get() >= quorum {
let (proposal, _) = ballot.remove_entry();
self.apply_view_change(&mut state, proposal);
}
Ok(Response::new(Ack {}))
}
async fn phase1a(&self, req: Request<Phase1aReq>) -> GrpcResponse<Ack> {
#[rustfmt::skip]
let Phase1aReq { sender, conf_id, rank } = req.into_inner();
let mut state = self.state.write().await;
state.verify_sender(&sender)?;
state.verify_config(conf_id)?;
if rank <= state.px_rnd {
return Err(Status::aborted("rank is too low"));
}
let sender = self
.resolve_endpoint(&sender)
.map_err(|e| Status::invalid_argument(e.to_string()))?;
let mut sender = MembershipClient::connect(sender)
.map_err(|e| Status::unavailable(e.to_string()))
.await?;
state.px_rnd = rank.clone();
let p1b = Phase1bReq {
sender: self.local_node(),
conf_id,
rnd: rank,
vrnd: state.px_vrnd.clone(),
vval: state.px_vval.clone(),
};
task::spawn(async move {
let _ = sender.phase1b(p1b).await;
});
Ok(Response::new(Ack {}))
}
async fn phase1b(&self, req: Request<Phase1bReq>) -> GrpcResponse<Ack> {
let req = req.into_inner();
let mut state = self.state.write().await;
state.verify_sender(&req.sender)?;
state.verify_config(req.conf_id)?;
if req.rnd != state.px_crnd {
return Err(Status::aborted("rnd is not our crnd"));
}
state.px_p1bs.push(req);
let quorum = state.slow_quorum();
let votes = state.px_p1bs.len();
if votes > quorum && state.px_cval.is_empty() {
if let Some(proposal) = state.choose_px_proposal() {
state.px_cval = proposal.clone();
task::spawn(Arc::clone(self).do_broadcast(Phase2a(Phase2aReq {
sender: self.local_node(),
conf_id: state.conf_id,
rnd: state.px_crnd.clone(),
vval: proposal,
})));
}
}
Ok(Response::new(Ack {}))
}
async fn phase2a(&self, req: Request<Phase2aReq>) -> GrpcResponse<Ack> {
#[rustfmt::skip]
let Phase2aReq { sender, conf_id, rnd, vval } = req.into_inner();
let mut state = self.state.write().await;
state.verify_sender(&sender)?;
state.verify_config(conf_id)?;
if rnd < state.px_rnd {
return Err(Status::aborted("rnd is too low"));
}
if rnd == state.px_vrnd {
return Err(Status::aborted("rnd is equal to vrnd"));
}
state.px_rnd = rnd.clone();
state.px_vrnd = rnd.clone();
state.px_vval = vval.clone();
task::spawn(Arc::clone(self).do_broadcast(Phase2b(Phase2bReq {
sender: self.local_node(),
conf_id,
rnd,
nodes: vval,
})));
Ok(Response::new(Ack {}))
}
async fn phase2b(&self, req: Request<Phase2bReq>) -> GrpcResponse<Ack> {
#[rustfmt::skip]
let Phase2bReq { sender, conf_id, rnd, nodes } = req.into_inner();
let mut state = self.state.write().await;
state.verify_sender(&sender)?;
state.verify_config(conf_id)?;
let quorum = state.slow_quorum();
let mut ballot = match state.px_rsps.entry(rnd) {
Entry::Occupied(o) => o,
e => e.insert((HashSet::new(), nodes)),
};
let (voters, _) = ballot.get_mut();
voters.insert(sender);
if voters.len() > quorum {
let (_, proposal) = ballot.remove();
self.apply_view_change(&mut state, proposal);
}
Ok(Response::new(Ack {}))
}
async fn broadcast(&self, req: Request<BroadcastReq>) -> GrpcResponse<Ack> {
#[rustfmt::skip]
let BroadcastReq { unix, uniq, broadcasted } = req.into_inner();
let broadcasted = broadcasted
.ok_or_else(|| Status::invalid_argument("missing oneof"))?;
let event_id = EventId::new(unix, uniq);
let mut subjects: Vec<_> = {
let mut state = self.state.write().await;
if !state.bcast_filter.insert(event_id) {
return Err(Status::already_exists("delivery is redundant"));
}
(state.nodes)
.successors(&self.local_node())
.cloned()
.collect()
};
subjects.sort();
subjects.dedup();
for subject in subjects {
let req = Request::new(BroadcastReq {
unix,
uniq,
broadcasted: Some(broadcasted.clone()),
});
let subject = self
.resolve_endpoint(&subject)
.expect("all stored endpoints are valid");
task::spawn(async move {
let mut client = MembershipClient::connect(subject)
.map_err(|e| Status::unavailable(e.to_string()))
.await?;
client.broadcast(req).await
});
}
match broadcasted {
BatchedAlert(ba) => self.batched_alert(Request::new(ba)).await,
FastPhase2b(fp2b) => self.fast_phase2b(Request::new(fp2b)).await,
Phase1a(p1a) => self.phase1a(Request::new(p1a)).await,
Phase1b(p1b) => self.phase1b(Request::new(p1b)).await,
Phase2a(p2a) => self.phase2a(Request::new(p2a)).await,
Phase2b(p2b) => self.phase2b(Request::new(p2b)).await,
}
}
async fn probe(&self, _: Request<ProbeReq>) -> GrpcResponse<ProbeResp> {
#[rustfmt::skip]
let State { conf_id, ref nodes, .. } = *self.state.read().await;
let status = if nodes.contains(&self.local_node()) {
NodeStatus::Healthy
} else {
NodeStatus::Degraded
} as i32;
Ok(Response::new(ProbeResp { status, conf_id }))
}
}
impl<St: partition::Strategy> Cluster<St> {
pub(crate) fn new(cfg: Config<St>, addr: SocketAddr) -> Self {
let state = Arc::new(RwLock::new(State {
uuid: NodeId::generate(),
conf_id: 0,
nodes: Tumbler::new(cfg.k),
uuids: BTreeSet::new(),
metadata: HashMap::new(),
last_cut: None,
bcast_filter: EventFilter::new(Duration::from_secs(3600)),
join_requests: HashMap::new(),
cd_batch: AlertBatch::default(),
cd_joiners: HashMap::new(),
cd_reports: BTreeMap::new(),
fpx_announced: false,
fpx_voters: HashSet::new(),
fpx_ballots: HashMap::new(),
px_rnd: Rank::zero(),
px_vrnd: Rank::zero(),
px_crnd: Rank::zero(),
px_vval: Vec::new(),
px_cval: Vec::new(),
px_rsps: HashMap::new(),
px_p1bs: Vec::new(),
}));
let (cuts, _) = broadcast::channel(8);
Cluster {
cfg,
addr,
state,
cuts,
}
}
#[inline]
pub(crate) fn into_service(self: Arc<Self>) -> MembershipServer<Arc<Self>> {
MembershipServer::new(self)
}
pub(crate) fn subscribe(&self) -> Subscription {
let state = Arc::downgrade(&self.state);
let rx = self.cuts.subscribe();
Subscription::new(state, rx)
}
#[inline]
fn local_node(&self) -> Endpoint {
Endpoint::from(self.addr).tls(self.cfg.server_tls)
}
pub(crate) async fn detect_faults(self: Arc<Self>, mut cuts: Subscription) -> cut::Result {
loop {
select! {
_ = self.spin_fd_probes() => {}
cut = cuts.recv() => { cut?; }
}
}
}
async fn spin_fd_probes(self: &Arc<Self>) {
let (conf_id, mut subjects) = async {
#[derive(Default)]
struct Status {
rings: Vec<u64>,
faults: usize,
}
let state = self.state.read().await;
let mut subjects: HashMap<_, Status> = HashMap::with_capacity(self.cfg.k);
for (ring, subject) in (state.nodes.successors(&self.local_node()))
.cloned()
.enumerate()
{
subjects.entry(subject).or_default().rings.push(ring as u64);
}
(state.conf_id, subjects)
}
.await;
loop {
let probes = (subjects.keys().cloned())
.map(|e| self.probe_member(e))
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>();
for result in join![delay_for(self.cfg.fd_timeout), probes].1 {
match result {
Ok(e) => subjects.get_mut(&e).unwrap().faults = 0,
Err(e) => subjects.get_mut(&e).unwrap().faults += 1,
}
}
let mut state = self.state.write().await;
if state.conf_id != conf_id {
break;
}
let faulted = (subjects.iter_mut())
.filter(|(_, s)| s.faults >= self.cfg.fd_strikes)
.flat_map(|(e, s)| {
s.faults = 0;
s.rings.iter().map(move |ring| Edge::down(e.clone(), *ring))
});
self.enqueue_edges(&mut *state, faulted);
}
}
async fn probe_member(&self, endpoint: Endpoint) -> Result<Endpoint, Endpoint> {
let send_probe = timeout(self.cfg.fd_timeout, async {
let e = self.resolve_endpoint(&endpoint)?;
let mut c = MembershipClient::connect(e).await?;
c.probe(ProbeReq {}).await?;
Fallible::Ok(())
});
match send_probe.await {
Ok(Ok(_)) => Ok(endpoint),
_ => Err(endpoint),
}
}
fn resolve_endpoint(&self, e: &Endpoint) -> Result<transport::Endpoint, Compat<EndpointError>> {
if !e.tls {
return e.try_into();
}
let tls = (self.cfg.client_tls)
.as_deref()
.cloned()
.unwrap_or_else(ClientTlsConfig::new);
e.try_into().map(|e: transport::Endpoint| e.tls_config(tls))
}
fn apply_view_change(&self, state: &mut State, proposal: Vec<Endpoint>) {
let mut joined = Vec::with_capacity(proposal.len());
let mut kicked = Vec::with_capacity(proposal.len());
for node in proposal {
if let Some(Join { uuid, meta }) = state.cd_joiners.remove(&node) {
joined.push(self.resolve_member_meta(meta.clone(), &node).unwrap());
state.join_node(node, Join { uuid, meta });
} else {
let meta = state.kick_node(&node);
kicked.push(self.resolve_member_meta(meta, &node).unwrap());
}
}
state.clear_consensus();
let local_node = self.local_node();
let cut = MultiNodeCut {
skipped: 0,
local_addr: self.addr,
degraded: !state.nodes.contains(&local_node),
conf_id: state.refresh_config(),
members: (state.nodes.iter())
.map(|node| self.resolve_member(state, node).unwrap())
.collect(),
joined: joined.into(),
kicked: kicked.into(),
};
state.respond_to_joiners(&cut, local_node);
state.last_cut = Some(cut.clone());
self.propagate_cut(cut);
}
fn resolve_member(&self, state: &State, peer: &Endpoint) -> ResolvedMember {
let addr: SocketAddr = peer
.try_into()
.map_err(MemberResolutionError::InvalidSocketAddr)?;
let meta = (state.metadata)
.get(peer)
.ok_or(MemberResolutionError::MissingMetadata)?
.clone();
let tls = self.get_client_tls(peer.tls);
Ok(Member { addr, tls, meta })
}
fn resolve_member_meta(&self, meta: Metadata, peer: &Endpoint) -> ResolvedMember {
let addr: SocketAddr = peer
.try_into()
.map_err(MemberResolutionError::InvalidSocketAddr)?;
let tls = self.get_client_tls(peer.tls);
Ok(Member { addr, tls, meta })
}
fn get_client_tls(&self, enabled: bool) -> Option<Arc<ClientTlsConfig>> {
guard! { enabled };
let tls = (self.cfg.client_tls)
.clone()
.unwrap_or_else(|| Arc::new(ClientTlsConfig::new()));
Some(tls)
}
#[inline]
fn propagate_cut(&self, cut: MultiNodeCut) {
let _ = self.cuts.send(cut);
}
#[inline]
fn enqueue_edge(self: &Arc<Self>, state: &mut State, edge: Edge) {
self.enqueue_edges(state, std::iter::once(edge));
}
fn enqueue_edges<I: IntoIterator<Item = Edge>>(self: &Arc<Self>, state: &mut State, iter: I) {
#[rustfmt::skip]
let AlertBatch { started, conf_id, edges } = &mut state.cd_batch;
if mem::replace(conf_id, state.conf_id) != state.conf_id {
edges.clear();
}
let n = edges.len();
edges.extend(iter);
if edges.len() != n && !mem::replace(started, true) {
task::spawn(Arc::clone(self).send_batch());
}
}
async fn send_batch(self: Arc<Self>) {
delay_for(Duration::from_millis(100)).await;
let sender = self.local_node();
let AlertBatch { conf_id, edges, .. } = {
let mut state = self.state.write().await;
mem::take(&mut state.cd_batch)
};
if edges.is_empty() {
return;
}
self.do_broadcast(BatchedAlert(BatchedAlertReq {
sender,
conf_id,
edges,
}))
.await;
}
async fn do_broadcast(self: Arc<Self>, msg: Broadcasted) {
let id = EventId::generate();
let req = Request::new(BroadcastReq {
unix: id.timestamp(),
uniq: id.unique(),
broadcasted: Some(msg),
});
let _ = self.broadcast(req).await;
}
async fn begin_px_round(self: Arc<Self>, px: PaxosRound) {
fn fnv_hash<T: Hash>(val: T) -> u64 {
let mut h = FnvHasher::default();
val.hash(&mut h);
h.finish()
}
#[rustfmt::skip]
let PaxosRound { sender, conf_id, .. } = px.init_delay().await;
let rank = {
let mut state = self.state.write().await;
if state.conf_id != conf_id {
return;
}
if state.px_crnd.round > 2 {
return;
}
state.px_crnd.round = 2;
state.px_crnd.node_idx = fnv_hash(&sender);
state.px_crnd.clone()
};
self.do_broadcast(Phase1a(Phase1aReq {
sender,
conf_id,
rank,
}))
.await;
}
}
struct PaxosRound {
sender: Endpoint,
conf_id: u64,
members: usize,
}
impl PaxosRound {
async fn init_delay(self) -> Self {
let exp = ((self.members + 1) as f64).log(2.0) * 4000.0;
let ms = thread_rng().gen_range(1000, exp as u64);
delay_for(Duration::from_millis(ms)).await;
self
}
}
#[derive(Debug, failure_derive::Fail)]
enum MemberResolutionError {
#[fail(display = "invalid socketaddr: {:?}", _0)]
InvalidSocketAddr(SocketAddrError),
#[fail(display = "missing metadata")]
MissingMetadata,
}
type ResolvedMember = Result<Member, MemberResolutionError>;
struct JoinTask {
rx: oneshot::Receiver<JoinResp>,
#[allow(dead_code)]
tx: Weak<oneshot::Sender<JoinResp>>,
}
impl Future for JoinTask {
type Output = Result<JoinResp, oneshot::error::RecvError>;
#[inline]
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.rx).poll(ctx)
}
}
struct PendingJoin {
tx: Arc<oneshot::Sender<JoinResp>>,
}
impl PendingJoin {
#[inline]
fn task_is_waiting(&self) -> bool {
Arc::weak_count(&self.tx) > 0
}
fn complete(self, join: JoinResp) {
if !self.task_is_waiting() {
return;
}
let _ = Arc::try_unwrap(self.tx)
.expect("no other references to exist")
.send(join);
}
}
fn join_task() -> (PendingJoin, JoinTask) {
let (tx, rx) = oneshot::channel();
let tx = Arc::new(tx);
let pending = PendingJoin { tx };
let tx = Arc::downgrade(&pending.tx);
let join = JoinTask { rx, tx };
(pending, join)
}
#[derive(Default)]
struct AlertBatch {
started: bool,
conf_id: u64,
edges: Vec<Edge>,
}
#[derive(PartialEq, Eq, Hash)]
struct Vote {
node: Endpoint,
ring: u64,
}
pub(crate) struct State {
uuid: NodeId,
conf_id: u64,
nodes: Tumbler<Endpoint>,
uuids: BTreeSet<NodeId>,
metadata: HashMap<Endpoint, Metadata>,
last_cut: Option<MultiNodeCut>,
bcast_filter: EventFilter,
join_requests: HashMap<Endpoint, PendingJoin>,
cd_batch: AlertBatch,
cd_joiners: HashMap<Endpoint, Join>,
cd_reports: BTreeMap<Endpoint, HashSet<Vote>>,
fpx_announced: bool,
fpx_voters: HashSet<Endpoint>,
fpx_ballots: HashMap<Vec<Endpoint>, usize>,
px_rnd: Rank,
px_vrnd: Rank,
px_crnd: Rank,
px_vval: Vec<Endpoint>,
px_cval: Vec<Endpoint>,
px_rsps: HashMap<Rank, (HashSet<Endpoint>, Vec<Endpoint>)>,
px_p1bs: Vec<Phase1bReq>,
}
impl State {
fn verify_unused_uuid(&self, uuid: &NodeId) -> Grpc<()> {
if self.uuids.contains(uuid) {
Err(Status::already_exists("uuid already exists"))
} else {
Ok(())
}
}
fn verify_ring(&self, src: &Endpoint, dst: &Endpoint, ring: u64) -> Grpc<()> {
if (self.nodes.predecessors(dst))
.nth(ring.try_into().unwrap())
.filter(|e| *e == src)
.is_none()
{
Err(Status::invalid_argument("invalid ring number"))
} else {
Ok(())
}
}
fn verify_sender(&self, sender: &Endpoint) -> Grpc<()> {
if !self.nodes.contains(sender) {
Err(Status::unauthenticated(format!(
"sender {:?} isn't a cluster member",
sender
)))
} else {
Ok(())
}
}
fn verify_config(&self, conf_id: u64) -> Grpc<()> {
if self.conf_id != conf_id {
Err(Status::aborted("mismatched configuration id"))
} else {
Ok(())
}
}
fn verify_edge(&self, sender: &Endpoint, edge: &Edge) -> Grpc<()> {
if edge.join.is_some() {
edge.node.validate()?;
}
if edge.join.is_none() != self.nodes.contains(&edge.node) {
Err(Status::aborted("edge cannot apply to configuration"))
} else {
self.verify_ring(sender, &edge.node, edge.ring)
}
}
fn merge_cd_alert(&mut self, node: Endpoint, join: Option<Join>, src: Vote) {
if let Some(join) = join {
self.cd_joiners.insert(node.clone(), join);
}
self.cd_reports.entry(node).or_default().insert(src);
}
fn merge_implicit_cd_alerts(&mut self, (l, h): (usize, usize)) {
let mut implied = vec![];
for (subject, r) in self.cd_reports.iter() {
if r.len() <= l || r.len() >= h {
continue;
}
for (ring, observer) in self.nodes.predecessors(subject).enumerate() {
let n = (self.cd_reports)
.get(observer)
.map(|r| r.len())
.unwrap_or_default();
if n <= l || n >= h {
continue;
}
implied.push((subject.clone(), Vote {
node: observer.clone(),
ring: ring as u64,
}));
}
}
for (subject, vote) in implied {
self.cd_reports.get_mut(&subject).map(|r| r.insert(vote));
}
}
fn generate_cd_proposal(&mut self, (l, h): (usize, usize)) -> Option<Vec<Endpoint>> {
guard! { self.cd_report_counts().all(|n| n >= h || n < l) }
guard! { self.cd_report_counts().any(|n| n >= h) }
guard! { !mem::replace(&mut self.fpx_announced, true) }
Some(self.drain_cd_reports_gte(h).collect())
}
#[inline]
fn cd_report_counts(&self) -> impl Iterator<Item = usize> + '_ {
self.cd_reports.values().map(HashSet::len)
}
fn drain_cd_reports_gte(&mut self, h: usize) -> impl Iterator<Item = Endpoint> {
mem::take(&mut self.cd_reports)
.into_iter()
.filter_map(move |(e, r)| guard!(r.len() >= h, e))
}
fn register_fpx_round(&mut self, proposal: &[Endpoint]) {
if self.px_rnd.round >= 2 {
return;
}
self.px_rnd = Rank::fast_round();
self.px_vrnd = Rank::fast_round();
self.px_vval = proposal.into();
}
#[inline]
fn fast_quorum(&self) -> usize {
let sz = self.nodes.len() as f64;
(sz * 0.75).ceil() as usize
}
fn clear_consensus(&mut self) {
self.cd_joiners.clear();
self.cd_reports.clear();
self.fpx_announced = false;
self.fpx_voters.clear();
self.fpx_ballots.clear();
self.px_rnd = Rank::zero();
self.px_vrnd = Rank::zero();
self.px_crnd = Rank::zero();
self.px_vval.clear();
self.px_cval.clear();
self.px_rsps.clear();
self.px_p1bs.clear();
}
fn clear_membership(&mut self) {
self.nodes.clear();
self.uuids.clear();
self.metadata.clear();
self.last_cut = None;
}
fn join_node(&mut self, node: Endpoint, Join { uuid, meta }: Join) {
assert!(self.nodes.insert(node.clone()));
assert!(self.uuids.insert(uuid));
assert!(self.metadata.insert(node, meta).is_none());
}
fn kick_node(&mut self, node: &Endpoint) -> Metadata {
assert!(self.nodes.remove(node));
self.metadata.remove(node).unwrap()
}
fn refresh_config(&mut self) -> u64 {
let mut h = FnvHasher::default();
self.nodes.iter().for_each(|e| e.hash(&mut h));
self.uuids.iter().for_each(|i| i.hash(&mut h));
self.conf_id = h.finish();
self.conf_id
}
fn respond_to_joiners(&mut self, cut: &MultiNodeCut, sender: Endpoint) {
let resp = self.join_response(sender);
(cut.joined())
.iter()
.filter_map(|m| self.join_requests.remove(&m.into()))
.for_each(|p| p.complete(resp.clone()));
}
fn join_response(&self, sender: Endpoint) -> JoinResp {
JoinResp {
sender,
conf_id: self.conf_id,
nodes: (self.metadata.iter())
.map(|(node, meta)| NodeMetadata {
node: node.clone(),
meta: meta.clone(),
})
.collect(),
uuids: self.uuids.iter().cloned().collect(),
}
}
#[inline]
fn slow_quorum(&self) -> usize {
self.nodes.len() / 2
}
fn choose_px_proposal(&self) -> Option<Vec<Endpoint>> {
assert!(!self.px_p1bs.is_empty());
let max_vrnd = self.px_p1bs.iter().map(|p| &p.vrnd).max().unwrap();
let vvals: FreqSet<&[Endpoint]> = self
.px_p1bs
.iter()
.filter(|p| &p.vrnd == max_vrnd)
.filter(|p| !p.vval.is_empty())
.map(|p| p.vval.as_slice())
.collect();
if vvals.len() == 1 {
return vvals.into_iter().next().map(|(vval, _)| vval.into());
}
if vvals.total() > 1 {
if let Some(vval) = vvals
.iter()
.find(|(_, &votes)| votes > self.nodes.len() / 4)
.map(|(&vval, _)| vval.into())
{
return Some(vval);
}
}
self.px_p1bs
.iter()
.filter(|p| !p.vval.is_empty())
.map(|p| p.vval.clone())
.next()
}
}