use std::{
sync::{
atomic::{AtomicIsize, AtomicU32, Ordering},
Arc,
},
time::{Duration, Instant},
};
use crate::{suspicion::Suspicioner, types::Epoch};
use super::{
base::Memberlist,
delegate::Delegate,
error::Error,
suspicion::Suspicion,
transport::Transport,
types::{Alive, Dead, IndirectPing, NodeState, Ping, PushNodeState, SmallVec, State, Suspect},
Member, Members,
};
use agnostic_lite::{AsyncSpawner, RuntimeLite};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use nodecraft::{resolver::AddressResolver, CheapClone, Node};
use rand::{seq::SliceRandom, Rng};
#[cfg(any(test, feature = "test"))]
pub mod tests;
mod ack_manager;
pub(crate) use ack_manager::*;
#[viewit::viewit]
#[derive(Debug)]
pub(crate) struct LocalNodeState<I, A> {
server: Arc<NodeState<I, A>>,
incarnation: Arc<AtomicU32>,
state_change: Epoch,
state: State,
}
impl<I, A> Clone for LocalNodeState<I, A> {
fn clone(&self) -> Self {
Self {
server: self.server.clone(),
incarnation: self.incarnation.clone(),
state_change: self.state_change,
state: self.state,
}
}
}
impl<I, A> core::ops::Deref for LocalNodeState<I, A> {
type Target = NodeState<I, A>;
fn deref(&self) -> &Self::Target {
&self.server
}
}
impl<I, A> LocalNodeState<I, A> {
#[inline]
pub(crate) fn dead_or_left(&self) -> bool {
self.state == State::Dead || self.state == State::Left
}
}
impl<T, D> Memberlist<T, D>
where
T: Transport,
D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
{
#[inline]
pub(crate) fn next_sequence_number(&self) -> u32 {
self
.inner
.hot
.sequence_num
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+ 1
}
#[inline]
pub(crate) fn next_incarnation(&self) -> u32 {
self
.inner
.hot
.incarnation
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+ 1
}
#[inline]
pub(crate) fn skip_incarnation(&self, offset: u32) -> u32 {
self
.inner
.hot
.incarnation
.fetch_add(offset, std::sync::atomic::Ordering::SeqCst)
+ offset
}
#[inline]
pub(crate) fn estimate_num_nodes(&self) -> u32 {
self
.inner
.hot
.num_nodes
.load(std::sync::atomic::Ordering::SeqCst)
}
#[inline]
pub(crate) fn has_shutdown(&self) -> bool {
self.inner.shutdown_tx.is_closed()
}
#[inline]
pub(crate) fn has_left(&self) -> bool {
self
.inner
.hot
.leave
.load(std::sync::atomic::Ordering::SeqCst)
}
}
impl<T, D> Memberlist<T, D>
where
D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
T: Transport,
{
pub(crate) async fn push_pull_node(
&self,
id: Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
join: bool,
) -> Result<(), Error<T, D>> {
#[cfg(feature = "metrics")]
let now = Instant::now();
#[cfg(feature = "metrics")]
scopeguard::defer!(
metrics::histogram!("memberlist.push_pull_node", self.inner.opts.metric_labels.iter()).record(now.elapsed().as_millis() as f64);
);
let data = self.send_and_receive_state(&id, join).await?;
self.merge_remote_state(data).await
}
pub(crate) async fn dead_node(
&self,
memberlist: &mut Members<T, D>,
d: Dead<T::Id>,
) -> Result<(), Error<T, D>> {
let idx = match memberlist.node_map.get(d.node()) {
Some(idx) => *idx,
None => return Ok(()),
};
let state = &mut memberlist.nodes[idx];
if d.incarnation() < state.state.incarnation.load(Ordering::Acquire) {
return Ok(());
}
state.suspicion = None;
if state.state.dead_or_left() {
return Ok(());
}
let incarnation = d.incarnation();
let is_dead_self = d.node() == d.from();
let is_self = state.id().eq(self.local_id());
if is_self {
if !self.has_left() {
self.refute(state, incarnation).await;
tracing::warn!(
"memberlist.state: refuting a dead message (from: {})",
d.from()
);
return Ok(()); }
self
.broadcast_notify(
d.node().cheap_clone(),
d.into(),
Some(self.inner.leave_broadcast_tx.clone()),
)
.await;
} else {
self.broadcast(d.node().cheap_clone(), d.into()).await;
}
#[cfg(feature = "metrics")]
{
metrics::counter!("memberlist.msg.dead", self.inner.opts.metric_labels.iter()).increment(1);
}
state
.state
.incarnation
.store(incarnation, Ordering::Release);
if is_dead_self {
state.state.state = State::Left;
} else {
state.state.state = State::Dead;
}
state.state.state_change = Epoch::now();
if let Some(ref delegate) = self.delegate {
delegate.notify_leave(state.state.server.clone()).await;
}
Ok(())
}
pub(crate) async fn suspect_node(&self, s: Suspect<T::Id>) -> Result<(), Error<T, D>> {
let mut mu = self.inner.nodes.write().await;
let Some(&idx) = mu.node_map.get(s.node()) else {
return Ok(());
};
let state = &mut mu.nodes[idx];
if s.incarnation() < state.state.incarnation.load(Ordering::Relaxed) {
return Ok(());
}
if let Some(timer) = &mut state.suspicion {
if timer.confirm(s.from()).await {
self.broadcast(s.node().cheap_clone(), s.into()).await;
}
return Ok(());
}
if state.state.state != State::Alive {
return Ok(());
}
let snode = s.node().cheap_clone();
let sfrom = s.from().cheap_clone();
let sincarnation = s.incarnation();
if state.id().eq(self.local_id()) {
self.refute(&state.state, s.incarnation()).await;
tracing::warn!(
"memberlist.state: refuting a suspect message (from: {})",
s.from()
);
return Ok(());
} else {
self.broadcast(s.node().clone(), s.into()).await;
}
#[cfg(feature = "metrics")]
{
metrics::counter!(
"memberlist.msg.suspect",
self.inner.opts.metric_labels.iter()
)
.increment(1);
}
state
.state
.incarnation
.store(sincarnation, Ordering::Relaxed);
state.state.state = State::Suspect;
let change_time = Epoch::now();
state.state.state_change = change_time;
let mut k = self.inner.opts.suspicion_mult as isize - 2;
let n = self.estimate_num_nodes() as isize;
if n - 2 < k {
k = 0;
}
let min = suspicion_timeout(
self.inner.opts.suspicion_mult,
n as usize,
self.inner.opts.probe_interval,
);
let max = min * (self.inner.opts.suspicion_max_timeout_mult as u32);
let this = self.clone();
state.suspicion = Some(Suspicion::new(
sfrom,
k,
min,
max,
Suspicioner::new(
this,
snode,
change_time,
sincarnation,
#[cfg(feature = "metrics")]
k,
),
));
Ok(())
}
pub(crate) async fn alive_node(
&self,
alive: Alive<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
notify_tx: Option<async_channel::Sender<()>>,
bootstrap: bool,
) {
let mut memberlist = self.inner.nodes.write().await;
if self.has_left() && alive.node().id().eq(&self.inner.id) {
return;
}
let anode = alive.node();
let server = Arc::new(
NodeState::new(
anode.id().cheap_clone(),
anode.address().cheap_clone(),
State::Alive,
)
.with_meta(alive.meta().cheap_clone())
.with_protocol_version(alive.protocol_version())
.with_delegate_version(alive.delegate_version()),
);
if let Some(delegate) = &self.delegate {
if let Err(e) = delegate.notify_alive(server.clone()).await {
tracing::warn!(target = "memberlist.state", local = %self.inner.id, peer = %anode, err=%e, "ignoring alive message");
return;
}
}
let mut updates_node = false;
if let Some(idx) = memberlist.node_map.get(anode.id()) {
let state = &memberlist.nodes[*idx].state;
if state.address() != alive.node().address() {
if let Err(err) = self.inner.transport.blocked_address(anode.address()) {
tracing::warn!(target = "memberlist.state", local = %self.inner.id, remote = %anode, err=%err, "rejected IP update from {} to {} for node {}", alive.node().id(), state.address(), anode.address());
return;
};
let can_reclaim = self.inner.opts.dead_node_reclaim_time > Duration::ZERO
&& state.state_change.elapsed() > self.inner.opts.dead_node_reclaim_time;
if state.state == State::Left || (state.state == State::Dead && can_reclaim) {
tracing::info!(target = "memberlist.state", local = %self.inner.id, "updating address for left or failed node {} from {} to {}", state.id(), state.address(), alive.node().address());
updates_node = true;
} else {
tracing::error!(target = "memberlist.state", local = %self.inner.id, "conflicting address for {}(mine: {}, theirs: {}, old state: {})", state.id(), state.address(), alive.node(), state.state);
if let Some(delegate) = self.delegate.as_ref() {
delegate
.notify_conflict(state.server.cheap_clone(), Arc::new(NodeState::from(alive)))
.await;
}
return;
}
}
} else {
if let Err(err) = self.inner.transport.blocked_address(anode.address()) {
tracing::warn!(local = %self.inner.id, remote = %anode, err=%err, "memberlist.state: rejected node");
return;
};
let state = LocalNodeState {
server,
incarnation: Arc::new(AtomicU32::new(0)),
state_change: Epoch::now(),
state: State::Dead,
};
let n = memberlist.nodes.len();
let offset = random_offset(n);
memberlist.nodes.push(Member {
state: state.clone(),
suspicion: None,
});
memberlist.nodes.swap(n, offset);
memberlist.node_map.insert(anode.id().cheap_clone(), offset);
let id = memberlist.nodes[n].state.id().clone();
*memberlist.node_map.get_mut(&id).unwrap() = n;
self.inner.hot.num_nodes.fetch_add(1, Ordering::Relaxed);
}
let idx = memberlist.node_map.get(anode.id()).copied().unwrap();
let member = &mut memberlist.nodes[idx];
let local_incarnation = member.state.incarnation.load(Ordering::Relaxed);
let is_local_node = anode.id().eq(&self.inner.id);
if !updates_node && !is_local_node && alive.incarnation() <= local_incarnation {
return;
}
if is_local_node && alive.incarnation() < local_incarnation {
return;
}
member.suspicion = None;
let old_state = member.state.state;
let old_meta = member.meta().cheap_clone();
if !bootstrap && is_local_node {
let (pv, dv) = (member.protocol_version(), member.delegate_version());
if alive.incarnation() == local_incarnation
&& alive.meta() == member.meta()
&& alive.protocol_version() == pv
&& alive.delegate_version() == dv
{
return;
}
self.refute(&member.state, alive.incarnation()).await;
tracing::warn!(target = "memberlist.state", local = %self.inner.id, peer = %alive.node(), local_meta = ?member.meta(), remote_meta = ?alive.meta(), "refuting an alive message");
} else {
self
.broadcast_notify(
alive.node().id().cheap_clone(),
alive.cheap_clone().into(),
notify_tx,
)
.await;
member
.state
.incarnation
.store(alive.incarnation(), Ordering::Release);
member.state.server = Arc::new(NodeState::from(alive));
if member.state.state != State::Alive {
member.state.state = State::Alive;
member.state.state_change = Epoch::now();
}
}
#[cfg(feature = "metrics")]
{
metrics::counter!("memberlist.msg.alive", self.inner.opts.metric_labels.iter()).increment(1);
}
if let Some(delegate) = &self.delegate {
if old_state == State::Dead || old_state == State::Left {
delegate
.notify_join(member.state.server.cheap_clone())
.await
} else if old_meta.ne(member.state.meta()) {
delegate
.notify_update(member.state.server.cheap_clone())
.await
}
}
}
pub(crate) async fn merge_state<'a>(
&'a self,
remote: &'a [PushNodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>],
) {
let mut futs = remote
.iter()
.map(|r| {
let state = match r.state() {
State::Alive => StateMessage::Alive(
Alive::new(r.incarnation(), r.node())
.with_meta(r.meta().cheap_clone())
.with_protocol_version(r.protocol_version())
.with_delegate_version(r.delegate_version()),
),
State::Left => StateMessage::Left(Dead::new(
r.incarnation(),
r.id().cheap_clone(),
self.local_id().cheap_clone(),
)),
State::Dead | State::Suspect => StateMessage::Suspect(Suspect::new(
r.incarnation(),
r.id().cheap_clone(),
self.local_id().cheap_clone(),
)),
_ => unreachable!(),
};
state.run(self)
})
.collect::<FuturesUnordered<_>>();
while futs.next().await.is_some() {}
}
}
enum StateMessage<T: Transport> {
Alive(Alive<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>),
Left(Dead<T::Id>),
Suspect(Suspect<T::Id>),
}
impl<T: Transport> StateMessage<T> {
async fn run<D>(self, s: &Memberlist<T, D>)
where
D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
{
match self {
StateMessage::Alive(alive) => s.alive_node(alive, None, false).await,
StateMessage::Left(dead) => {
let id = dead.node().cheap_clone();
let mut memberlist = s.inner.nodes.write().await;
if let Err(e) = s.dead_node(&mut memberlist, dead).await {
tracing::error!(id=%id, err=%e, "memberlist.state: fail to dead node");
}
}
StateMessage::Suspect(suspect) => {
let id = suspect.node().cheap_clone();
if let Err(e) = s.suspect_node(suspect).await {
tracing::error!(id=%id, err=%e, "memberlist.state: fail to suspect node");
}
}
}
}
}
#[inline]
fn move_dead_nodes<T, D>(nodes: &mut [Member<T, D>], gossip_to_the_dead_time: Duration) -> usize
where
D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
T: Transport,
{
let mut num_dead = 0;
let n = nodes.len();
let mut i = 0;
while i < n - num_dead {
let node = &nodes[i];
if !node.state.dead_or_left() {
i += 1;
continue;
}
if node.state.state_change.elapsed() <= gossip_to_the_dead_time {
i += 1;
continue;
}
nodes.swap(i, n - num_dead - 1);
num_dead += 1;
}
n - num_dead
}
macro_rules! bail_trigger {
($fn:ident) => {
paste::paste! {
async fn [<trigger _ $fn>](&self, stagger: Duration, interval: Duration, stop_rx: async_channel::Receiver<()>) -> <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>
{
let this = self.clone();
let rand_stagger = random_stagger(stagger);
<T::Runtime as RuntimeLite>::spawn(async move {
let delay = <T::Runtime as RuntimeLite>::sleep(rand_stagger);
futures::select! {
_ = delay.fuse() => {},
_ = stop_rx.recv().fuse() => return,
}
let mut timer = <T::Runtime as RuntimeLite>::interval(interval);
loop {
futures::select! {
_ = futures::StreamExt::next(&mut timer).fuse() => {
this.$fn().await;
}
_ = stop_rx.recv().fuse() => {
return;
}
}
}
})
}
}
};
}
impl<T, D> Memberlist<T, D>
where
D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
T: Transport,
{
pub(crate) async fn schedule(&self, shutdown_rx: async_channel::Receiver<()>) {
let mut handles = self.inner.shutdown_lock.lock().await;
if self.inner.opts.probe_interval > Duration::ZERO {
handles.push(
self
.trigger_probe(
self.inner.opts.probe_interval,
self.inner.opts.probe_interval,
shutdown_rx.clone(),
)
.await,
);
}
if self.inner.opts.push_pull_interval > Duration::ZERO {
handles.push(self.trigger_push_pull(shutdown_rx.clone()).await);
}
if self.inner.opts.gossip_interval > Duration::ZERO && self.inner.opts.gossip_nodes > 0 {
handles.push(
self
.trigger_gossip(
self.inner.opts.gossip_interval,
self.inner.opts.gossip_interval,
shutdown_rx.clone(),
)
.await,
);
}
}
bail_trigger!(probe);
bail_trigger!(gossip);
async fn trigger_push_pull(
&self,
stop_rx: async_channel::Receiver<()>,
) -> <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()> {
let interval = self.inner.opts.push_pull_interval;
let this = self.clone();
let mut rng = rand::thread_rng();
let rand_stagger = Duration::from_millis(rng.gen_range(0..interval.as_millis() as u64));
<T::Runtime as RuntimeLite>::spawn(async move {
futures::select! {
_ = <T::Runtime as RuntimeLite>::sleep(rand_stagger).fuse() => {},
_ = stop_rx.recv().fuse() => return,
}
loop {
let tick_time = push_pull_scale(interval, this.estimate_num_nodes() as usize);
let mut timer = <T::Runtime as RuntimeLite>::interval(tick_time);
futures::select! {
_ = futures::StreamExt::next(&mut timer).fuse() => {
this.push_pull().await;
}
_ = stop_rx.recv().fuse() => return,
}
}
})
}
async fn probe(&self) {
let mut num_check = 0;
let mut probe_index = 0;
loop {
let memberlist = self.inner.nodes.read().await;
if num_check >= memberlist.nodes.len() {
return;
}
if probe_index >= memberlist.nodes.len() {
drop(memberlist);
self.reset_nodes().await;
probe_index = 0;
num_check += 1;
continue;
}
let mut skip = false;
let node = memberlist.nodes[probe_index].state.clone();
if node.dead_or_left() || node.id() == self.local_id() {
skip = true;
}
drop(memberlist);
probe_index += 1;
if skip {
num_check += 1;
continue;
}
self.probe_node(&node).await;
return;
}
}
async fn probe_node(
&self,
target: &LocalNodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
) {
#[cfg(feature = "metrics")]
let now = Instant::now();
#[cfg(feature = "metrics")]
scopeguard::defer! {
metrics::histogram!("memberlist.probe_node", self.inner.opts.metric_labels.iter()).record(now.elapsed().as_millis() as f64);
}
let probe_interval = self
.inner
.awareness
.scale_timeout(self.inner.opts.probe_interval);
#[cfg(feature = "metrics")]
{
if probe_interval > self.inner.opts.probe_interval {
metrics::counter!(
"memberlist.degraded.probe",
self.inner.opts.metric_labels.iter()
)
.increment(1);
}
}
let ping = Ping::new(
self.next_sequence_number(),
self.advertise_node(),
target.node(),
);
let (ack_tx, ack_rx) = async_channel::bounded(self.inner.opts.indirect_checks + 1);
let (nack_tx, nack_rx) = async_channel::bounded(self.inner.opts.indirect_checks + 1);
let sent = Instant::now();
let deadline = sent + probe_interval;
self.inner.ack_manager.set_probe_channels(
ping.sequence_number(),
ack_tx.clone(),
Some(nack_tx),
sent,
probe_interval,
);
let awareness_delta = AtomicIsize::new(0);
scopeguard::defer!(self
.inner
.awareness
.apply_delta(awareness_delta.load(Ordering::Acquire)));
if target.state == State::Alive {
match self
.send_msg(target.address(), ping.cheap_clone().into())
.await
{
Ok(_) => {}
Err(e) => {
tracing::error!(local = %self.inner.id, remote = %target.id(), err=%e, "memberlist.state: failed to send ping by unreliable connection");
if e.is_remote_failure() {
return self
.handle_remote_failure(
target,
ping.sequence_number(),
&ack_rx,
&nack_rx,
deadline,
&awareness_delta,
)
.await;
}
return;
}
}
} else {
let suspect = Suspect::new(
target.incarnation.load(Ordering::SeqCst),
target.id().cheap_clone(),
self.local_id().cheap_clone(),
);
match self
.transport_send_packets(
target.address(),
[ping.cheap_clone().into(), suspect.into()].into(),
)
.await
{
Ok(_) => {}
Err(e) => {
tracing::error!(target = "memberlist.state", local = %self.inner.id, remote = %target.id(), err=%e, "failed to send compound ping and suspect message by unreliable connection");
if e.is_remote_failure() {
return self
.handle_remote_failure(
target,
ping.sequence_number(),
&ack_rx,
&nack_rx,
deadline,
&awareness_delta,
)
.await;
}
return;
}
}
}
awareness_delta.store(-1, Ordering::Release);
let delegate = self.delegate.as_ref();
futures::select! {
v = ack_rx.recv().fuse() => {
match v {
Ok(v) => {
if v.complete {
if let Some(delegate) = delegate {
let rtt = v.timestamp.elapsed();
tracing::trace!(local = %self.inner.id, remote = %target.id(), "memberlist.state: notify ping complete ack");
delegate.notify_ping_complete(target.server.cheap_clone(), rtt, v.payload).await;
}
return;
}
if !v.complete {
if let Err(e) = ack_tx.send(v).await {
tracing::error!(local = %self.inner.id, remote = %target.id(), err=%e, "memberlist.state: failed to re-enqueue UDP ping ack");
}
}
}
Err(e) => {
tracing::debug!(local = %self.inner.id, remote = %target.id(), err = %e, "memberlist.state: failed unreliable connection ping (ack channel closed)");
}
}
},
_ = <T::Runtime as RuntimeLite>::sleep(self.inner.opts.probe_timeout).fuse() => {
tracing::debug!(target = "memberlist.state", local = %self.inner.id, remote = %target.id(), "failed unreliable connection ping (timeout reached)");
}
}
self
.handle_remote_failure(
target,
ping.sequence_number(),
&ack_rx,
&nack_rx,
deadline,
&awareness_delta,
)
.await
}
async fn handle_remote_failure(
&self,
target: &LocalNodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
ping_sequence_number: u32,
ack_rx: &async_channel::Receiver<AckMessage>,
nack_rx: &async_channel::Receiver<()>,
deadline: Instant,
awareness_delta: &AtomicIsize,
) {
let nodes = {
let nodes = self
.inner
.nodes
.read()
.await
.nodes
.iter()
.filter_map(|m| {
if m.id() == &self.inner.id || m.id() == target.id() || m.state.state != State::Alive {
None
} else {
Some(m.state.server.cheap_clone())
}
})
.collect::<SmallVec<_>>();
random_nodes(self.inner.opts.indirect_checks, nodes)
};
let expected_nacks = nodes.len() as isize;
let ind = IndirectPing::new(ping_sequence_number, self.advertise_node(), target.node());
let mut futs = nodes.into_iter().map(|peer| {
let ind = ind.cheap_clone();
async move {
match self
.send_msg(peer.address(), ind.into())
.await {
Ok(_) => {},
Err(e) => {
tracing::error!(target = "memberlist.state", local = %self.inner.id, remote = %peer, err=%e, "failed to send indirect unreliable ping");
}
}
}
}).collect::<futures::stream::FuturesUnordered<_>>();
while futs.next().await.is_some() {}
let (fallback_tx, fallback_rx) = async_channel::bounded(1);
let mut disable_reliable_pings = self.inner.opts.disable_promised_pings;
if let Some(delegate) = self.delegate.as_ref() {
disable_reliable_pings |= delegate.disable_promised_pings(target.id());
}
if !disable_reliable_pings {
let target_addr = target.address().cheap_clone();
let this = self.clone();
<T::Runtime as RuntimeLite>::spawn_detach(async move {
scopeguard::defer!(fallback_tx.close(););
match this
.send_ping_and_wait_for_ack(&target_addr, ind.into(), deadline)
.await
{
Ok(did_contact) => {
if let Err(e) = fallback_tx.send(did_contact).await {
tracing::error!(local = %this.inner.id, remote_addr = %target_addr, err=%e, "memberlist.state: failed to send fallback");
}
}
Err(e) => {
tracing::error!(local = %this.inner.id, remote_addr = %target_addr, err=%e, "memberlist.state: failed to send ping by reliable connection");
if let Err(e) = fallback_tx.send(false).await {
tracing::error!(local = %this.inner.id, remote_addr = %target_addr, err=%e, "memberlist.state: failed to send fallback");
}
}
}
});
}
futures::select! {
v = ack_rx.recv().fuse() => {
if let Ok(v) = v {
if v.complete {
return;
}
}
}
}
if !disable_reliable_pings {
futures::pin_mut!(fallback_rx);
while let Some(did_contact) = fallback_rx.next().await {
if did_contact {
tracing::warn!(target = "memberlist.state", local = %self.inner.id, remote = %target.id(), "was able to connect to target over reliable connection but unreliable probes failed, network may be misconfigured");
return;
}
}
}
awareness_delta.store(0, Ordering::Release);
if expected_nacks > 0 {
let nack_count = nack_rx.len() as isize;
if nack_count < expected_nacks {
awareness_delta.fetch_add(expected_nacks - nack_count, Ordering::AcqRel);
}
} else {
awareness_delta.fetch_add(1, Ordering::AcqRel);
}
tracing::info!(target = "memberlist.state", local = %self.inner.id, remote = %target.id(), "suspecting has failed, no acks received");
let s = Suspect::new(
target.incarnation.load(Ordering::SeqCst),
target.id().cheap_clone(),
self.local_id().cheap_clone(),
);
if let Err(e) = self.suspect_node(s).await {
tracing::error!(target = "memberlist.state", local = %self.inner.id, remote = %target.id(), err=%e, "failed to suspect node");
}
}
async fn reset_nodes(&self) {
let mut memberlist = self.inner.nodes.write().await;
let dead_idx = move_dead_nodes(
&mut memberlist.nodes,
self.inner.opts.gossip_to_the_dead_time,
);
let mut i = 0;
let num_remove = memberlist.node_map.len() - dead_idx;
while i < num_remove {
let node = memberlist.nodes.pop().unwrap();
memberlist.node_map.remove(node.state.id());
i += 1;
}
self
.inner
.hot
.num_nodes
.store(dead_idx as u32, Ordering::Relaxed);
memberlist.shuffle(&mut rand::thread_rng());
}
async fn gossip(&self) {
#[cfg(feature = "metrics")]
let now = Instant::now();
#[cfg(feature = "metrics")]
scopeguard::defer!(
metrics::histogram!("memberlist.gossip", self.inner.opts.metric_labels.iter()).record(now.elapsed().as_millis() as f64);
);
let nodes = {
let nodes = self
.inner
.nodes
.read()
.await
.nodes
.iter()
.filter_map(|m| {
if m.state.id() == &self.inner.id {
return None;
}
match m.state.state {
State::Alive | State::Suspect => Some(m.state.server.clone()),
State::Dead => {
if m.state.state_change.elapsed() > self.inner.opts.gossip_to_the_dead_time {
None
} else {
Some(m.state.server.clone())
}
}
_ => None,
}
})
.collect::<SmallVec<_>>();
random_nodes(self.inner.opts.gossip_nodes, nodes)
};
let bytes_avail =
self.inner.transport.max_payload_size() - self.inner.transport.packets_header_overhead();
let futs = nodes.into_iter().map(|server| async move {
let msgs = match self
.get_broadcast_with_prepend(
Default::default(),
self.inner.transport.packet_overhead(),
bytes_avail,
)
.await
{
Ok(msgs) => msgs,
Err(e) => {
tracing::error!(err = %e, "memberlist.state: failed to get broadcast messages from {}", server);
return None;
}
};
if msgs.is_empty() {
return None;
}
Some((server.address().cheap_clone(), msgs))
}).collect::<FuturesUnordered<_>>();
futs
.filter_map(|batch| async { batch })
.for_each_concurrent(None, |(addr, mut msgs)| async move {
let fut = if msgs.len() == 1 {
futures::future::Either::Left(async {
if let Err(e) = self.transport_send_packet(&addr, msgs.pop().unwrap()).await {
tracing::error!(err = %e, "memberlist.state: failed to send gossip to {}", addr);
}
})
} else {
futures::future::Either::Right(async {
if let Err(e) = self.transport_send_packets(&addr, msgs).await {
tracing::error!(err = %e, "memberlist.state: failed to send gossip to {}", addr);
}
})
};
fut.await
})
.await;
}
async fn push_pull(&self) {
let nodes = {
let nodes = self
.inner
.nodes
.read()
.await
.nodes
.iter()
.filter_map(|n| {
if n.state.id() == &self.inner.id || n.state.state != State::Alive {
None
} else {
Some(n.state.server.clone())
}
})
.collect::<SmallVec<_>>();
random_nodes(1, nodes)
};
if nodes.is_empty() {
return;
}
let server = &nodes[0];
if let Err(e) = self.push_pull_node(server.node(), false).await {
tracing::error!(target = "memberlist.state", err = %e, "push/pull with {} failed", server.id());
}
}
async fn refute(
&self,
state: &LocalNodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
accused_inc: u32,
) {
let mut inc = self.next_incarnation();
if accused_inc >= inc {
inc = self.skip_incarnation(accused_inc - inc + 1);
}
state.incarnation.store(inc, Ordering::Relaxed);
self.inner.awareness.apply_delta(1);
let anode = Node::new(state.id().cheap_clone(), state.address().cheap_clone());
let a = Alive::new(inc, anode)
.with_meta(state.meta().cheap_clone())
.with_protocol_version(state.protocol_version())
.with_delegate_version(state.delegate_version());
self.broadcast(a.node().id().cheap_clone(), a.into()).await;
}
}
#[inline]
fn suspicion_timeout(suspicion_mult: usize, n: usize, interval: Duration) -> Duration {
let node_scale = ((n as f64).max(1.0)).log10().max(1.0);
let interval = interval * (suspicion_mult as u32);
let interval = (interval.as_millis() as f64 * node_scale * 1000.0) as u64;
Duration::from_millis(interval / 1000)
}
#[inline]
fn push_pull_scale(interval: Duration, n: usize) -> Duration {
const PUSH_PULL_SCALE_THRESHOLD: usize = 32;
if n <= PUSH_PULL_SCALE_THRESHOLD {
return interval;
}
let multiplier = (f64::log2(n as f64) - f64::log2(PUSH_PULL_SCALE_THRESHOLD as f64)).ceil() + 1.0;
interval * multiplier as u32
}
#[inline]
fn random_offset(n: usize) -> usize {
if n == 0 {
return 0;
}
(rand::random::<u32>() % (n as u32)) as usize
}
#[inline]
fn random_nodes<I, A>(
k: usize,
mut nodes: SmallVec<Arc<NodeState<I, A>>>,
) -> SmallVec<Arc<NodeState<I, A>>> {
let n = nodes.len();
if n == 0 {
return SmallVec::new();
}
let rounds = 3 * n;
let mut i = 0;
while i < rounds && i < n {
let j = rand::random::<usize>() % (n - i) + i;
nodes.swap(i, j);
i += 1;
if i >= k && i >= rounds {
break;
}
}
nodes.truncate(k);
nodes
}
#[inline]
fn random_stagger(duration: Duration) -> Duration {
let mut rng = rand::thread_rng();
Duration::from_nanos(rng.gen_range(0..u64::MAX) % (duration.as_nanos() as u64))
}
#[test]
fn test_random_stagger() {
let d = Duration::from_millis(1);
let stagger = random_stagger(d);
assert!(stagger <= d, "bad stagger");
}
#[test]
fn test_push_pull_scale() {
let sec = Duration::from_secs(1);
for i in 0..=32 {
let s = push_pull_scale(sec, i);
assert_eq!(s, sec, "Bad time scale {s:?}");
}
for i in 33..=64 {
let s = push_pull_scale(sec, i);
assert_eq!(s, sec * 2, "Bad time scale {s:?}");
}
for i in 65..=128 {
let s = push_pull_scale(sec, i);
assert_eq!(s, sec * 3, "Bad time scale {s:?}");
}
}
#[test]
fn test_suspicion_timeout() {
let timeouts: &[(usize, Duration)] = &[
(5, Duration::from_millis(1000)),
(10, Duration::from_millis(1000)),
(50, Duration::from_secs_f64(1.698666666)),
(100, Duration::from_millis(2000)),
(500, Duration::from_secs_f64(2.698666666)),
(1000, Duration::from_millis(3000)),
];
for (n, expected) in timeouts {
let actual = suspicion_timeout(3, *n, Duration::from_secs(1)) / 3;
assert_eq!(actual, *expected);
}
}
#[test]
fn test_random_offset() {
let mut vals = std::collections::HashSet::new();
for _ in 0..100 {
let offset = random_offset(2 << 30);
assert!(!vals.contains(&offset), "got collision");
vals.insert(offset);
}
}
#[test]
fn test_random_offset_zero() {
let offset = random_offset(0);
assert_eq!(offset, 0, "bad offset");
}