mod bootstrap;
pub mod cut;
mod faultdetect;
mod proto;
use super::collections::{EventFilter, EventId, FreqSet, Tumbler};
use cut::{Member, MultiNodeCut, Subscription};
use proto::{
broadcast_req::{Broadcasted::*, *},
membership_client::*,
membership_server::*,
*,
};
use fnv::FnvHasher;
use futures::future::TryFutureExt;
use log::{error, info, warn};
use rand::{thread_rng, Rng};
use std::{
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
convert::{TryFrom, TryInto},
future::Future,
hash::{Hash, Hasher},
mem,
net::SocketAddr,
pin::Pin,
sync::{Arc, Weak},
task::{Context, Poll},
time::Duration,
};
use thiserror::Error;
use tokio::{
sync::{broadcast, oneshot, RwLock},
task,
time::sleep,
};
use tonic::{
transport::{self, ClientTlsConfig},
Code, Request, Response, Status,
};
pub(crate) struct Config {
pub lh: (usize, usize),
pub k: usize,
pub seed: Option<Endpoint>,
pub meta: Metadata,
pub server_tls: bool,
pub client_tls: Option<Arc<ClientTlsConfig>>,
pub fd_timeout: Duration,
pub fd_strikes: usize,
}
type Grpc<T> = Result<T, Status>;
type GrpcResponse<T> = Grpc<Response<T>>;
pub(crate) struct Cluster {
cfg: Config,
addr: SocketAddr,
state: Arc<RwLock<State>>,
cuts: broadcast::Sender<MultiNodeCut>,
}
#[crate::async_trait]
impl Membership for Arc<Cluster> {
async fn pre_join(&self, req: Request<PreJoinReq>) -> GrpcResponse<PreJoinResp> {
let PreJoinReq { sender, uuid } = req.into_inner();
sender.validate()?;
let state = self.state.read().await;
state.verify_unused_host(&sender)?;
state.verify_unused_uuid(&uuid)?;
let resp = PreJoinResp {
sender: self.local_node(),
conf_id: state.conf_id,
contact: state.nodes.predecessors(&sender).cloned().collect(),
};
Ok(Response::new(resp))
}
async fn join(&self, req: Request<JoinReq>) -> GrpcResponse<JoinResp> {
#[rustfmt::skip]
let JoinReq { sender, ring, uuid, conf_id, meta } = req.into_inner();
sender.validate()?;
let mut state = self.state.write().await;
state.verify_config(conf_id)?;
state.verify_unused_host(&sender)?;
state.verify_unused_uuid(&uuid)?;
state.verify_ring(&self.local_node(), &sender, ring)?;
self.enqueue_edge(&mut state, Edge {
node: sender.clone(),
ring,
join: Some(Join { uuid, meta }),
});
match state.join_requests.get(&sender) {
Some(pending) if pending.task_is_waiting() => {
return Err(Status::already_exists("another join request is pending"));
}
_ => {}
}
let (pending, join) = join_task();
state.join_requests.insert(sender, pending);
drop(state);
Ok(Response::new(join.await.unwrap()))
}
async fn batched_alert(&self, req: Request<BatchedAlertReq>) -> GrpcResponse<Ack> {
#[rustfmt::skip]
let BatchedAlertReq { sender, conf_id, edges } = req.into_inner();
let mut state = self.state.write().await;
state.verify_sender(&sender)?;
state.verify_config(conf_id)?;
if state.fpx_announced {
return Err(Status::aborted("already announced fast paxos round"));
}
(edges.iter()).try_for_each(|e| state.verify_edge(&sender, e))?;
for Edge { node, ring, join } in edges {
state.merge_cd_alert(node, join, Vote {
node: sender.clone(),
ring,
});
}
state.merge_implicit_cd_alerts(self.cfg.lh);
if let Some(proposal) = state.generate_cd_proposal(self.cfg.lh) {
state.register_fpx_round(&proposal);
info!("starting fast round: conf_id={}", conf_id);
task::spawn(Arc::clone(self).do_broadcast(FastAccepted(FastAcceptedReq {
sender: self.local_node(),
conf_id,
nodes: proposal,
})));
task::spawn(Arc::clone(self).begin_px_round(PaxosRound {
sender: self.local_node(),
conf_id,
members: state.nodes.len(),
}));
}
Ok(Response::new(Ack {}))
}
async fn fast_accepted(&self, req: Request<FastAcceptedReq>) -> GrpcResponse<Ack> {
#[rustfmt::skip]
let FastAcceptedReq { sender, conf_id, nodes } = req.into_inner();
let mut state = self.state.write().await;
state.verify_sender(&sender)?;
state.verify_config(conf_id)?;
if !state.fpx_voters.insert(sender) {
return Err(Status::already_exists("sender has already voted"));
}
let votes = state.fpx_ballots.insert(nodes.clone());
if votes >= state.fast_quorum() {
self.apply_view_change(&mut state, nodes);
info!(
"(fast) applied view-change with {} vote(s): conf_id={}",
votes, state.conf_id
);
}
Ok(Response::new(Ack {}))
}
async fn prepare(&self, req: Request<PrepareReq>) -> GrpcResponse<Ack> {
#[rustfmt::skip]
let PrepareReq { sender, conf_id, rank } = req.into_inner();
let mut state = self.state.write().await;
state.verify_sender(&sender)?;
state.verify_config(conf_id)?;
if rank <= state.px_rnd {
return Err(Status::aborted("rank is too low"));
}
let sender = self
.resolve_endpoint(&sender)
.map_err(|e| Status::invalid_argument(e.to_string()))?;
state.px_rnd = rank.clone();
let p = PromiseReq {
sender: self.local_node(),
conf_id,
rnd: rank,
vrnd: state.px_vrnd.clone(),
vval: state.px_vval.clone(),
};
task::spawn(async move {
MembershipClient::connect(sender)
.map_err(|e| Status::unavailable(e.to_string()))
.and_then(|mut c| async move { c.promise(p).await })
.map_err(|e| warn!("promise failed: {}", e))
.await
});
Ok(Response::new(Ack {}))
}
async fn promise(&self, req: Request<PromiseReq>) -> GrpcResponse<Ack> {
let req = req.into_inner();
let mut state = self.state.write().await;
state.verify_sender(&req.sender)?;
state.verify_config(req.conf_id)?;
if req.rnd != state.px_crnd {
return Err(Status::aborted("rnd is not our crnd"));
}
state.px_promised.push(req);
let quorum = state.slow_quorum();
let votes = state.px_promised.len();
if votes > quorum && state.px_cval.is_empty() {
if let Some(proposal) = state.choose_px_proposal() {
state.px_cval = proposal.clone();
task::spawn(Arc::clone(self).do_broadcast(Accept(AcceptReq {
sender: self.local_node(),
conf_id: state.conf_id,
rnd: state.px_crnd.clone(),
vval: proposal,
})));
}
}
Ok(Response::new(Ack {}))
}
async fn accept(&self, req: Request<AcceptReq>) -> GrpcResponse<Ack> {
#[rustfmt::skip]
let AcceptReq { sender, conf_id, rnd, vval } = req.into_inner();
let mut state = self.state.write().await;
state.verify_sender(&sender)?;
state.verify_config(conf_id)?;
if rnd < state.px_rnd {
return Err(Status::aborted("rnd is too low"));
}
if rnd == state.px_vrnd {
return Err(Status::aborted("rnd is equal to vrnd"));
}
state.px_rnd = rnd.clone();
state.px_vrnd = rnd.clone();
state.px_vval = vval.clone();
task::spawn(Arc::clone(self).do_broadcast(Accepted(AcceptedReq {
sender: self.local_node(),
conf_id,
rnd,
nodes: vval,
})));
Ok(Response::new(Ack {}))
}
async fn accepted(&self, req: Request<AcceptedReq>) -> GrpcResponse<Ack> {
#[rustfmt::skip]
let AcceptedReq { sender, conf_id, rnd, nodes } = req.into_inner();
let mut state = self.state.write().await;
state.verify_sender(&sender)?;
state.verify_config(conf_id)?;
let votes = match state.px_accepted.entry(rnd.clone()) {
Entry::Occupied(mut o) => {
let (voters, _) = o.get_mut();
voters.insert(sender);
voters.len()
}
Entry::Vacant(v) => {
let (voters, _) = v.insert((HashSet::new(), nodes));
voters.insert(sender);
voters.len()
}
};
if votes > state.slow_quorum() {
let (_, proposal) = state.px_accepted.remove(&rnd).unwrap();
self.apply_view_change(&mut state, proposal);
info!(
"(slow) applied view-change with {} vote(s): conf_id={}",
votes, state.conf_id
);
}
Ok(Response::new(Ack {}))
}
async fn broadcast(&self, req: Request<BroadcastReq>) -> GrpcResponse<Ack> {
#[rustfmt::skip]
let BroadcastReq { unix, uniq, broadcasted } = req.into_inner();
let broadcasted = broadcasted .ok_or_else(|| Status::invalid_argument("missing oneof"))?;
let event_id = EventId::new(unix, uniq);
let mut subjects: Vec<_> = {
let mut state = self.state.write().await;
if !state.bcast_filter.insert(event_id) {
return Err(Status::already_exists("delivery is redundant"));
}
(state.nodes)
.successors(&self.local_node())
.cloned()
.collect()
};
subjects.sort();
subjects.dedup();
for subject in subjects {
let req = Request::new(BroadcastReq {
unix,
uniq,
broadcasted: Some(broadcasted.clone()),
});
let subject = self
.resolve_endpoint(&subject)
.expect("all stored endpoints are valid");
task::spawn(async move {
MembershipClient::connect(subject)
.map_err(|e| Status::unavailable(e.to_string()))
.and_then(|mut c| async move { c.broadcast(req).await })
.map_err(|e| match (e.code(), e.message()) {
(Code::AlreadyExists, "delivery is redundant") => {}
_ => warn!("infection failed: {}", e),
})
.await
});
}
match broadcasted {
BatchedAlert(ba) => self.batched_alert(Request::new(ba)).await,
FastAccepted(fa) => self.fast_accepted(Request::new(fa)).await,
Prepare(p) => self.prepare(Request::new(p)).await,
Promise(p) => self.promise(Request::new(p)).await,
Accept(a) => self.accept(Request::new(a)).await,
Accepted(a) => self.accepted(Request::new(a)).await,
}
}
async fn probe(&self, _: Request<Ack>) -> GrpcResponse<Ack> {
let State { ref nodes, .. } = *self.state.read().await;
if !nodes.contains(&self.local_node()) {
return Err(Status::unavailable("degraded"));
}
Ok(Response::new(Ack {}))
}
}
impl Cluster {
pub(crate) fn new(cfg: Config, addr: SocketAddr) -> Self {
let state = Arc::new(RwLock::new(State {
uuid: NodeId::generate(),
conf_id: 0,
nodes: Tumbler::new(cfg.k),
uuids: BTreeSet::new(),
metadata: HashMap::new(),
last_cut: None,
bcast_filter: EventFilter::new(Duration::from_secs(3600)),
join_requests: HashMap::new(),
cd_batch: AlertBatch::default(),
cd_joiners: HashMap::new(),
cd_reports: BTreeMap::new(),
fpx_announced: false,
fpx_voters: HashSet::new(),
fpx_ballots: FreqSet::new(),
px_rnd: Rank::zero(),
px_vrnd: Rank::zero(),
px_crnd: Rank::zero(),
px_vval: Vec::new(),
px_cval: Vec::new(),
px_accepted: HashMap::new(),
px_promised: Vec::new(),
}));
let (cuts, _) = broadcast::channel(8);
Cluster {
cfg,
addr,
state,
cuts,
}
}
#[inline]
pub(crate) fn into_service(self: Arc<Self>) -> MembershipServer<Arc<Self>> {
MembershipServer::new(self)
}
pub(crate) fn subscribe(&self) -> Subscription {
let state = Arc::downgrade(&self.state);
let rx = self.cuts.subscribe();
Subscription::new(state, rx)
}
#[inline]
fn local_node(&self) -> Endpoint {
Endpoint::from(self.addr).tls(self.cfg.server_tls)
}
fn resolve_endpoint(&self, e: &Endpoint) -> Result<transport::Endpoint, EndpointError> {
if !e.tls {
return e.try_into();
}
let tls = (self.cfg.client_tls)
.as_deref()
.cloned()
.unwrap_or_else(ClientTlsConfig::new);
Ok(transport::Endpoint::try_from(e)?.tls_config(tls)?)
}
fn apply_view_change(&self, state: &mut State, proposal: Vec<Endpoint>) {
let mut joined = Vec::with_capacity(proposal.len());
let mut kicked = Vec::with_capacity(proposal.len());
for node in proposal {
if let Some(Join { uuid, meta }) = state.cd_joiners.remove(&node) {
joined.push(self.resolve_member_meta(meta.clone(), &node).unwrap());
state.join_node(node, Join { uuid, meta });
} else {
let meta = state.kick_node(&node);
kicked.push(self.resolve_member_meta(meta, &node).unwrap());
}
}
state.clear_consensus();
let local_node = self.local_node();
joined.sort_by_key(|m| m.addr());
kicked.sort_by_key(|m| m.addr());
let mut members: Vec<_> = (state.nodes.iter())
.map(|node| self.resolve_member(state, node).unwrap())
.collect();
members.sort_by_key(|m| m.addr());
let cut = MultiNodeCut {
skipped: 0,
local_addr: self.addr,
degraded: !state.nodes.contains(&local_node),
conf_id: state.rehash_config(),
members: members.into(),
joined: joined.into(),
kicked: kicked.into(),
};
state.respond_to_joiners(&cut, local_node);
state.last_cut = Some(cut.clone());
self.propagate_cut(cut);
}
fn resolve_member(&self, state: &State, peer: &Endpoint) -> ResolvedMember {
let addr: SocketAddr = peer.try_into()?;
let meta = (state.metadata)
.get(peer)
.ok_or(MemberResolutionError::MissingMetadata)?
.clone();
let tls = self.get_client_tls(peer.tls);
Ok(Member::new(addr, tls, meta))
}
fn resolve_member_meta(&self, meta: Metadata, peer: &Endpoint) -> ResolvedMember {
let addr: SocketAddr = peer.try_into()?;
let tls = self.get_client_tls(peer.tls);
Ok(Member::new(addr, tls, meta))
}
fn get_client_tls(&self, enabled: bool) -> Option<Arc<ClientTlsConfig>> {
guard! { enabled };
let tls = (self.cfg.client_tls)
.clone()
.unwrap_or_else(|| Arc::new(ClientTlsConfig::new()));
Some(tls)
}
#[inline]
fn propagate_cut(&self, cut: MultiNodeCut) {
let _ = self.cuts.send(cut);
}
#[inline]
fn enqueue_edge(self: &Arc<Self>, state: &mut State, edge: Edge) {
self.enqueue_edges(state, std::iter::once(edge));
}
fn enqueue_edges<I: IntoIterator<Item = Edge>>(self: &Arc<Self>, state: &mut State, iter: I) {
#[rustfmt::skip]
let AlertBatch { started, conf_id, edges } = &mut state.cd_batch;
if mem::replace(conf_id, state.conf_id) != state.conf_id {
edges.clear();
}
let n = edges.len();
edges.extend(iter);
if edges.len() != n && !mem::replace(started, true) {
task::spawn(Arc::clone(self).send_batch());
}
}
async fn send_batch(self: Arc<Self>) {
sleep(Duration::from_millis(100)).await;
let sender = self.local_node();
let AlertBatch { conf_id, edges, .. } = {
let mut state = self.state.write().await;
mem::take(&mut state.cd_batch)
};
if edges.is_empty() {
return;
}
self.do_broadcast(BatchedAlert(BatchedAlertReq {
sender,
conf_id,
edges,
}))
.await;
}
async fn do_broadcast(self: Arc<Self>, msg: Broadcasted) {
let id = EventId::generate();
let req = Request::new(BroadcastReq {
unix: id.timestamp(),
uniq: id.unique(),
broadcasted: Some(msg),
});
if let Err(e) = self.broadcast(req).await {
warn!("broadcast failed: {}", e);
}
}
async fn begin_px_round(self: Arc<Self>, px: PaxosRound) {
fn fnv_hash<T: Hash>(val: T) -> u64 {
let mut h = FnvHasher::default();
val.hash(&mut h);
h.finish()
}
#[rustfmt::skip]
let PaxosRound { sender, conf_id, .. } = px.init_delay().await;
let rank = {
let mut state = self.state.write().await;
if state.conf_id != conf_id {
return;
}
if state.px_crnd.round > 2 {
return;
}
state.px_crnd.round = 2;
state.px_crnd.node_idx = fnv_hash(&sender);
state.px_crnd.clone()
};
info!("starting slow round: conf_id={}", conf_id);
self.do_broadcast(Prepare(PrepareReq {
sender,
conf_id,
rank,
}))
.await;
}
}
struct PaxosRound {
sender: Endpoint,
conf_id: u64,
members: usize,
}
impl PaxosRound {
async fn init_delay(self) -> Self {
let exp = ((self.members + 1) as f64).log(2.0) * 4000.0;
let ms = thread_rng().gen_range(1000..exp as u64);
sleep(Duration::from_millis(ms)).await;
self
}
}
#[derive(Copy, Clone, Debug, Error)]
enum MemberResolutionError {
#[error("invalid socketaddr: {}", .0)]
InvalidSocketAddr(#[from] SocketAddrError),
#[error("missing metadata")]
MissingMetadata,
}
type ResolvedMember = Result<Member, MemberResolutionError>;
struct JoinTask {
rx: oneshot::Receiver<JoinResp>,
#[allow(dead_code)]
tx: Weak<oneshot::Sender<JoinResp>>,
}
impl Future for JoinTask {
type Output = Result<JoinResp, oneshot::error::RecvError>;
#[inline]
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.rx).poll(ctx)
}
}
struct PendingJoin {
tx: Arc<oneshot::Sender<JoinResp>>,
}
impl PendingJoin {
#[inline]
fn task_is_waiting(&self) -> bool {
Arc::weak_count(&self.tx) > 0
}
fn complete(self, join: JoinResp) {
if !self.task_is_waiting() {
return;
}
let _ = Arc::try_unwrap(self.tx)
.expect("no other references to exist")
.send(join);
}
}
fn join_task() -> (PendingJoin, JoinTask) {
let (tx, rx) = oneshot::channel();
let tx = Arc::new(tx);
let pending = PendingJoin { tx };
let tx = Arc::downgrade(&pending.tx);
let join = JoinTask { rx, tx };
(pending, join)
}
#[derive(Default)]
struct AlertBatch {
started: bool,
conf_id: u64,
edges: Vec<Edge>,
}
#[derive(PartialEq, Eq, Hash)]
struct Vote {
node: Endpoint,
ring: u64,
}
pub(crate) struct State {
uuid: NodeId,
conf_id: u64,
nodes: Tumbler<Endpoint>,
uuids: BTreeSet<NodeId>,
metadata: HashMap<Endpoint, Metadata>,
last_cut: Option<MultiNodeCut>,
bcast_filter: EventFilter,
join_requests: HashMap<Endpoint, PendingJoin>,
cd_batch: AlertBatch,
cd_joiners: HashMap<Endpoint, Join>,
cd_reports: BTreeMap<Endpoint, HashSet<Vote>>,
fpx_announced: bool,
fpx_voters: HashSet<Endpoint>,
fpx_ballots: FreqSet<Vec<Endpoint>>,
px_rnd: Rank,
px_vrnd: Rank,
px_crnd: Rank,
px_vval: Vec<Endpoint>,
px_cval: Vec<Endpoint>,
px_accepted: HashMap<Rank, (HashSet<Endpoint>, Vec<Endpoint>)>,
px_promised: Vec<PromiseReq>,
}
#[inline]
fn err_when<F: FnOnce() -> Status>(cond: bool, err: F) -> Grpc<()> {
if cond {
Err(err())
} else {
Ok(())
}
}
impl State {
fn verify_unused_host(&self, host: &Endpoint) -> Grpc<()> {
err_when(self.nodes.contains(host), || {
Status::already_exists("host already exists")
})
}
fn verify_unused_uuid(&self, uuid: &NodeId) -> Grpc<()> {
err_when(self.uuids.contains(uuid), || {
Status::already_exists("uuid already exists")
})
}
fn verify_ring(&self, src: &Endpoint, dst: &Endpoint, ring: u64) -> Grpc<()> {
err_when(
self.nodes
.predecessors(dst)
.nth(ring.try_into().unwrap())
.filter(|e| *e == src)
.is_none(),
|| Status::invalid_argument("invalid ring number"),
)
}
fn verify_sender(&self, sender: &Endpoint) -> Grpc<()> {
err_when(!self.nodes.contains(sender), || {
Status::aborted(format!("{} not in configuration", sender))
})
}
fn verify_config(&self, conf_id: u64) -> Grpc<()> {
err_when(self.conf_id != conf_id, || {
Status::aborted("mismatched configuration id")
})
}
fn verify_edge(&self, sender: &Endpoint, edge: &Edge) -> Grpc<()> {
if edge.join.is_some() {
edge.node.validate()?;
}
if edge.join.is_none() != self.nodes.contains(&edge.node) {
Err(Status::aborted("edge cannot apply to configuration"))
} else {
self.verify_ring(sender, &edge.node, edge.ring)
}
}
fn merge_cd_alert(&mut self, node: Endpoint, join: Option<Join>, src: Vote) {
if let Some(join) = join {
self.cd_joiners.insert(node.clone(), join);
}
self.cd_reports.entry(node).or_default().insert(src);
}
fn merge_implicit_cd_alerts(&mut self, (l, h): (usize, usize)) {
let mut implied = vec![];
for (subject, r) in self.cd_reports.iter() {
if r.len() <= l || r.len() >= h {
continue;
}
for (ring, observer) in self.nodes.predecessors(subject).enumerate() {
let n = (self.cd_reports)
.get(observer)
.map(|r| r.len())
.unwrap_or_default();
if n <= l || n >= h {
continue;
}
implied.push((subject.clone(), Vote {
node: observer.clone(),
ring: ring as u64,
}));
}
}
for (subject, vote) in implied {
self.cd_reports.get_mut(&subject).map(|r| r.insert(vote));
}
}
fn generate_cd_proposal(&mut self, (l, h): (usize, usize)) -> Option<Vec<Endpoint>> {
guard! { self.cd_report_counts().all(|n| n >= h || n < l) }
guard! { self.cd_report_counts().any(|n| n >= h) }
guard! { !mem::replace(&mut self.fpx_announced, true) }
Some(self.drain_cd_reports_gte(h).collect())
}
#[inline]
fn cd_report_counts(&self) -> impl Iterator<Item = usize> + '_ {
self.cd_reports.values().map(HashSet::len)
}
fn drain_cd_reports_gte(&mut self, h: usize) -> impl Iterator<Item = Endpoint> {
mem::take(&mut self.cd_reports)
.into_iter()
.filter_map(move |(e, r)| guard!(r.len() >= h, e))
}
fn register_fpx_round(&mut self, proposal: &[Endpoint]) {
if self.px_rnd.round >= 2 {
return;
}
self.px_rnd = Rank::fast_round();
self.px_vrnd = Rank::fast_round();
self.px_vval = proposal.into();
}
#[inline]
fn fast_quorum(&self) -> usize {
let sz = self.nodes.len() as f64;
(sz * 0.75).ceil() as usize
}
fn clear_consensus(&mut self) {
self.cd_joiners.clear();
self.cd_reports.clear();
self.fpx_announced = false;
self.fpx_voters.clear();
self.fpx_ballots.clear();
self.px_rnd = Rank::zero();
self.px_vrnd = Rank::zero();
self.px_crnd = Rank::zero();
self.px_vval.clear();
self.px_cval.clear();
self.px_accepted.clear();
self.px_promised.clear();
}
fn clear_membership(&mut self) {
self.nodes.clear();
self.uuids.clear();
self.metadata.clear();
self.last_cut = None;
}
fn join_node(&mut self, node: Endpoint, Join { uuid, meta }: Join) {
assert!(self.nodes.insert(node.clone()));
assert!(self.uuids.insert(uuid));
assert!(self.metadata.insert(node, meta).is_none());
}
fn kick_node(&mut self, node: &Endpoint) -> Metadata {
assert!(self.nodes.remove(node));
self.metadata.remove(node).unwrap()
}
fn rehash_config(&mut self) -> u64 {
let mut h = FnvHasher::default();
self.nodes.iter().for_each(|e| e.hash(&mut h));
self.uuids.iter().for_each(|i| i.hash(&mut h));
self.conf_id = h.finish();
self.conf_id
}
fn respond_to_joiners(&mut self, cut: &MultiNodeCut, sender: Endpoint) {
let resp = self.join_response(sender);
(cut.joined())
.iter()
.filter_map(|m| self.join_requests.remove(&m.into()))
.for_each(|p| p.complete(resp.clone()));
}
fn join_response(&self, sender: Endpoint) -> JoinResp {
JoinResp {
sender,
conf_id: self.conf_id,
nodes: (self.metadata.iter())
.map(|(node, meta)| NodeMetadata {
node: node.clone(),
meta: meta.clone(),
})
.collect(),
uuids: self.uuids.iter().cloned().collect(),
}
}
#[inline]
fn slow_quorum(&self) -> usize {
self.nodes.len() / 2
}
fn choose_px_proposal(&self) -> Option<Vec<Endpoint>> {
assert!(!self.px_promised.is_empty());
let max_vrnd = self.px_promised.iter().map(|p| &p.vrnd).max().unwrap();
let vvals: FreqSet<&[Endpoint]> = self
.px_promised
.iter()
.filter(|p| &p.vrnd == max_vrnd)
.filter(|p| !p.vval.is_empty())
.map(|p| p.vval.as_slice())
.collect();
if vvals.len() == 1 {
return vvals.into_iter().next().map(|(vval, _)| vval.into());
}
if vvals.total() > 1 {
if let Some(vval) = vvals
.iter()
.find(|(_, &votes)| votes > self.nodes.len() / 4)
.map(|(&vval, _)| vval.into())
{
return Some(vval);
}
}
self.px_promised
.iter()
.filter(|p| !p.vval.is_empty())
.map(|p| p.vval.clone())
.next()
}
}