/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
//! Foca is a building block for your gossip-based cluster discovery. It's
//! a small library-first crate that implements the SWIM protocol along
//! with its useful extensions (`SWIM+Inf.+Susp.`).
//!
//! * It's a `no_std` + `alloc` crate by default. There's an optional
//! `std` feature that simply brings compatibility with some types
//! and the `std::error::Error` trait
//!
//! * Bring Your Own Everything: Foca doesn't care about anything that
//! isn't part of the cluster membership functionality:
//!
//! * Pluggable, renewable identities: Using a fixed port number?
//! No need to send it all the time. Want to attach extra crucial
//! information (shard id, deployment version, etc)? Easy.
//! Always have a lookup table mapping `u16` to hostnames? Use
//! that instead of a socket address! Bring your own type,
//! implement [`Identity`] and enjoy.
//!
//! * Write your own wire format by implementing [`Codec`]; Like
//! serde? There is `bincode-codec` and `postcard-codec` features,
//! or just use the `serde` feature and pick your favorite format.
//!
//! * Use any transport you want, it's up to you how messages
//! reach each member: Foca will tell you "Send these bytes to
//! member M", how that happens is not its business.
//!
//! * Custom Broadcasts: Foca can attach arbitrary data to its messages
//! and disseminate them the same way it distributes cluster updates.
//! Send CRDT operations, take a stab at implementing metadata-heavy
//! service discovery system, anything really. Give it something
//! that implements [`BroadcastHandler`] and Foca will ship it.
//!
//! * No runtime crashes: Apart from `alloc`-related aborts, Foca should
//! only crash inside something you provided: a [`Codec`], [`Runtime`]
//! or a [`BroadcastHandler`]- so long as those are solid, Foca is too.
//!
//! * Doesn't force you to choose between `sync` and `async`. It's as
//! easy to plug it in an evented runtime as it is to go old-school.
//!
#![forbid(unsafe_code)]
#![no_std]
#![deny(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]
extern crate alloc;
use alloc::vec::Vec;
#[cfg(feature = "std")]
extern crate std;
use core::{cmp::Ordering, convert::TryFrom, fmt, mem};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use rand::Rng;
mod broadcast;
mod codec;
mod config;
mod error;
mod identity;
mod member;
mod payload;
mod probe;
mod runtime;
#[cfg(test)]
mod testing;
use crate::{
broadcast::Broadcasts,
member::{ApplySummary, Members},
probe::Probe,
};
pub use crate::{
broadcast::{BroadcastHandler, Invalidates},
codec::Codec,
config::Config,
error::Error,
identity::Identity,
member::{Incarnation, Member, State},
payload::{Header, Message, ProbeNumber},
runtime::{Notification, Runtime, Timer, TimerToken},
};
#[cfg(feature = "postcard-codec")]
pub use crate::codec::postcard_impl::PostcardCodec;
#[cfg(feature = "bincode-codec")]
pub use crate::codec::bincode_impl::BincodeCodec;
type Result<T> = core::result::Result<T, Error>;
/// Foca is the main interaction point of this crate.
///
/// It manages the cluster members and executes the SWIM protocol. It's
/// intended as a low-level guts-exposed safe view into the protocol
/// allowing any kind of Identity and transport to be used.
///
/// Most interactions with Foca require the caller to provide a
/// [`Runtime`] type, which is simply a way to turn the result of an
/// operation inside out (think callbacks, or an out parameter like
/// `void* out`). This allows Foca to avoid deciding anything related
/// to how it interacts with the operating system.
pub struct Foca<T, C, RNG, B: BroadcastHandler<T>> {
identity: T,
codec: C,
rng: RNG,
incarnation: Incarnation,
config: Config,
connection_state: ConnectionState,
timer_token: TimerToken,
members: Members<T>,
probe: Probe<T>,
// Used to buffer up members/updates when receiving and
// sending data
member_buf: Vec<Member<T>>,
// Since we emit data via `Runtime::send_to`, this could
// easily be a Vec, but `BytesMut::limit` is quite handy
send_buf: BytesMut,
// Holds (serialized) cluster updates, which may live for a
// while until they get disseminated `Config::max_transmissions`
// times or replaced by fresher updates.
updates_buf: BytesMut,
updates: Broadcasts<ClusterUpdate<T>>,
broadcast_handler: B,
custom_broadcasts: Broadcasts<B::Broadcast>,
}
impl<T, C, RNG> Foca<T, C, RNG, NoCustomBroadcast>
where
T: Identity,
C: Codec<T>,
RNG: Rng,
{
/// Create a new Foca instance with custom broadcasts disabled.
///
/// This is a simple shortcut for [`Foca::with_custom_broadcast`]
/// using the [`NoCustomBroadcast`] type to deny any form of custom
/// broadcast.
pub fn new(identity: T, config: Config, rng: RNG, codec: C) -> Self {
Self::with_custom_broadcast(identity, config, rng, codec, NoCustomBroadcast)
}
}
#[cfg(feature = "tracing")]
impl<T: Identity, C, RNG, B: BroadcastHandler<T>> fmt::Debug for Foca<T, C, RNG, B> {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
// Assuming that when tracing comes into play the cluster is actually
// uniform. Meaning: everything is configured the same, including
// codec and broadcast handler.
// So the actually interesting thing is the identity.
formatter.debug_tuple("Foca").field(&self.identity).finish()
}
}
// XXX Does it make sense to have different associated type restrictions
// based on a feature flag? Say: when using `std` we would enforce
// that `Codec::Error` and `BroadcastHandler::Error` both implement
// `std::error::Error`, thus instead of wrapping these errors via
// `anyhow::Error::msg` we can use `anyhow::Error::new`.
impl<T, C, RNG, B> Foca<T, C, RNG, B>
where
T: Identity,
C: Codec<T>,
RNG: Rng,
B: BroadcastHandler<T>,
{
/// Initialize a new Foca instance.
pub fn with_custom_broadcast(
identity: T,
config: Config,
rng: RNG,
codec: C,
broadcast_handler: B,
) -> Self {
let max_indirect_probes = config.num_indirect_probes.get();
let max_bytes = config.max_packet_size.get();
Self {
identity,
config,
rng,
codec,
incarnation: Incarnation::default(),
timer_token: TimerToken::default(),
members: Members::new(Vec::new()),
probe: Probe::new(Vec::with_capacity(max_indirect_probes)),
member_buf: Vec::new(),
connection_state: ConnectionState::Disconnected,
updates: Broadcasts::new(),
send_buf: BytesMut::with_capacity(max_bytes),
custom_broadcasts: Broadcasts::new(),
updates_buf: BytesMut::new(),
broadcast_handler,
}
}
/// Getter for the current identity.
pub fn identity(&self) -> &T {
&self.identity
}
/// Re-enable joining a cluster with the same identity after being
/// declared Down.
///
/// This is intended to be use by implementations that decide not to
/// opt-in on auto-rejoining: once Foca detects its Down you'll
/// only be able to receive messages (which will likely stop after
/// a short while since the cluster things you are down).
///
/// Whatever is controlling the running Foca will then have to wait
/// for at least [`Config::remove_down_after`] before attempting a
/// rejoin. Then you can call this method followed by a
/// [`Foca::announce(T)`] to go back to the cluster.
pub fn reuse_down_identity(&mut self) -> Result<()> {
if self.connection_state != ConnectionState::Undead {
Err(Error::NotUndead)
} else {
self.reset();
Ok(())
}
}
/// Change the current identity.
///
/// Foca will declare its previous identity as Down and immediatelly
/// notify the cluster about the changes.
///
/// Notice that changing your identity does not guarantee a
/// successful (re)join. After changing it and disseminating the updates
/// Foca will only know it's actually accepted after receiving a
/// message addressed to it.
///
/// Watch for [`Notification::Active`] if you want more confidence about
/// a successful (re)join.
///
/// Intended to be used when identities carry metadata that occasionally
/// changes.
pub fn change_identity(&mut self, new_id: T, runtime: impl Runtime<T>) -> Result<()> {
if self.identity == new_id {
Err(Error::SameIdentity)
} else {
let previous_is_down = self.connection_state == ConnectionState::Undead;
let previous_id = mem::replace(&mut self.identity, new_id);
self.reset();
#[cfg(feature = "tracing")]
tracing::info!(?self, ?previous_id, "changed identity");
// If our previous identity wasn't known as Down already,
// we'll declare it ourselves
if !previous_is_down {
let data = self.serialize_member(Member::down(previous_id.clone()))?;
self.updates.add_or_replace(
ClusterUpdate {
member_id: previous_id,
data,
},
self.config.max_transmissions.get().into(),
);
}
self.gossip(runtime)?;
Ok(())
}
}
/// Iterate over the currently active cluster members.
pub fn iter_members(&self) -> impl Iterator<Item = &Member<T>> {
self.members.iter_active()
}
/// Returns the number of active members in the cluster.
///
/// May only be used as a bound for [`Foca::iter_members`] if no
/// Foca method that takes `&mut self` is called in-between.
pub fn num_members(&self) -> usize {
self.members.num_active()
}
/// Applies cluster updates to this foca instance.
///
/// This is for advanced usage. It's intended as a way to unlock
/// more elaborate synchronization protocols: implementations may
/// choose to unify their cluster knowledge (say: a streaming
/// join protocol or a periodic sync) and use [`Foca::apply_many`]
/// as a way to feed Foca this new (external) knowledge.
pub fn apply_many(
&mut self,
updates: impl Iterator<Item = Member<T>>,
mut runtime: impl Runtime<T>,
) -> Result<()> {
for update in updates {
if update.id() == &self.identity {
self.handle_self_update(update.incarnation(), update.state(), &mut runtime)?;
} else if self.identity.has_same_prefix(update.id()) {
// We received an update that's about an identity that *could*
// have been ours but definitely isn't (the branch right above,
// where we check equality)
//
// This can happen naturally: an instance rejoins the cluster
// while the cluster activelly talking about its previous identity
// going down.
//
// Any non-Down state, however, is questionable: maybe there are
// multiple instances using the same id; Maybe our own instance
// has been restarted many times at once as the cluster still
// hasn't figured out the correct state yet.
//
// So we assume that this is always delayed/stale information and
// declare this previous identity as Down.
//
// NOTE If there are multiple nodes claiming to have the same
// identity, this will lead to a looping scenario where
// node A delcares B down, then B changes identity and
// declares A down; nonstop
#[cfg(feature = "tracing")]
if update.is_active() {
tracing::warn!(
?self,
?update,
"update about identity with same prefix as ours, declaring it down"
);
}
self.apply_update(Member::down(update.into_identity()), &mut runtime)?;
} else {
self.apply_update(update, &mut runtime)?;
}
}
self.adjust_connection_state(runtime);
Ok(())
}
fn adjust_connection_state(&mut self, runtime: impl Runtime<T>) {
match self.connection_state {
ConnectionState::Disconnected => {
if self.members.num_active() > 0 {
self.become_connected(runtime);
}
}
ConnectionState::Connected => {
if self.members.num_active() == 0 {
self.become_disconnected(runtime);
}
}
ConnectionState::Undead => {
// We're undead. The only ways to recover are via
// an id change or reuse_down_identity(). Nothing else
// to do
}
}
}
/// Attempt to join the cluster `dst` belongs to.
///
/// Sends a [`Message::Announce`] to `dst`. If accepted, we'll receive
/// a [`Message::Feed`] as reply.
pub fn announce(&mut self, dst: T, runtime: impl Runtime<T>) -> Result<()> {
self.send_message(dst, Message::Announce, runtime)
}
/// Disseminate updates/broadcasts to cluster members.
///
/// This instructs Foca to pick [`Config::num_indirect_probes`]
/// random active members and send a [`Message::Gossip`] containing
/// cluster updates.
///
/// Intended for more complex scenarios where an implementation wants
/// to attempt reducing the time it takes for information to
/// propagate thoroughly.
pub fn gossip(&mut self, runtime: impl Runtime<T>) -> Result<()> {
self.choose_and_send(
self.config.num_indirect_probes.get(),
Message::Gossip,
runtime,
)
}
// Pick `num_members` random active members and send `msg` to them
fn choose_and_send(
&mut self,
num_members: usize,
msg: Message<T>,
mut runtime: impl Runtime<T>,
) -> Result<()> {
self.member_buf.clear();
self.members.choose_active_members(
num_members,
&mut self.member_buf,
&mut self.rng,
|_| true,
);
while let Some(chosen) = self.member_buf.pop() {
self.send_message(chosen.into_identity(), msg.clone(), &mut runtime)?;
}
Ok(())
}
/// Only disseminate custom broadcasts to cluster members
///
/// This instructs Foca to pick [`Config::num_indirect_probes`]
/// random active members that *pass* the
/// [`BroadcastHandler::should_add_broadcast_data`] check. It
/// guarantees custom broadcast dissemination if there are
/// candidate members available.
///
/// No cluster update will be sent with these messages. Intended
/// to be used in tandem with a non-default
/// `should_add_broadcast_data`.
pub fn broadcast(&mut self, mut runtime: impl Runtime<T>) -> Result<()> {
if self.custom_broadcast_backlog() == 0 {
// Nothing to broadcast
return Ok(());
}
self.member_buf.clear();
self.members.choose_active_members(
self.config.num_indirect_probes.get(),
&mut self.member_buf,
&mut self.rng,
|member| self.broadcast_handler.should_add_broadcast_data(member),
);
while let Some(chosen) = self.member_buf.pop() {
self.send_message(chosen.into_identity(), Message::Broadcast, &mut runtime)?;
// Crafting the message above left the backlog empty,
// no need to send more messages since they won't
// contain anything
if self.custom_broadcast_backlog() == 0 {
break;
}
}
Ok(())
}
/// Leave the cluster by declaring our own identity as down.
///
/// If there are active members, we select a few are selected
/// and notify them of our exit so that the cluster learns
/// about it quickly.
///
/// This is the cleanest way to terminate a running Foca.
pub fn leave_cluster(mut self, mut runtime: impl Runtime<T>) -> Result<()> {
let data = self.serialize_member(Member::down(self.identity().clone()))?;
self.updates.add_or_replace(
ClusterUpdate {
member_id: self.identity().clone(),
data,
},
self.config.max_transmissions.get().into(),
);
self.gossip(&mut runtime)?;
// We could try to be smart here and only go defunct if there
// are active members, but I'd rather have consistent behaviour.
self.become_undead(&mut runtime);
Ok(())
}
/// Register some data to be broadcast along with Foca messages.
///
/// Calls into this instance's BroadcastHandler and reacts accordingly.
pub fn add_broadcast(&mut self, data: &[u8]) -> Result<()> {
// NOTE: Receiving B::Broadcast instead of a byte slice would make it
// look more convenient, however it gets in the way when
// implementing more ergonomic interfaces (say: an async driver)
// it forces everything to know the exact concrete type of
// the broadcast. So... maybe revisit this decision later?
// Not considering the whole header
if data.len() > self.config.max_packet_size.get() {
return Err(Error::DataTooBig);
}
self.handle_custom_broadcasts(data)
}
/// React to a previously scheduled timer event.
///
/// See [`Runtime::submit_after`].
pub fn handle_timer(&mut self, event: Timer<T>, mut runtime: impl Runtime<T>) -> Result<()> {
#[cfg(feature = "tracing")]
let _span = tracing::error_span!("handle_timer", ?event).entered();
match event {
Timer::SendIndirectProbe { probed_id, token } => {
// Changing identities in the middle of the probe cycle may
// naturally lead to this.
if token != self.timer_token {
#[cfg(feature = "tracing")]
tracing::debug!("Invalid timer token");
return Ok(());
}
// Bookkeeping: This is how we verify that the probe code
// is running correctly. If we reach the end of the
// probe and this hasn't happened, we know something is
// wrong.
self.probe.mark_indirect_probe_stage_reached();
if !self.probe.is_probing(&probed_id) {
#[cfg(feature = "tracing")]
tracing::warn!(?probed_id, "SendIndirectProbe: Member not being probed");
return Ok(());
}
if self.probe.succeeded() {
// We received an Ack already, nothing else to do
#[cfg(feature = "tracing")]
tracing::trace!(?probed_id, "Probe succeeded, no need for indirect cycle");
return Ok(());
}
self.member_buf.clear();
self.members.choose_active_members(
self.config.num_indirect_probes.get(),
&mut self.member_buf,
&mut self.rng,
|candidate| candidate != &probed_id && !candidate.has_same_prefix(&probed_id),
);
#[cfg(feature = "tracing")]
tracing::debug!(
?probed_id,
"Member didn't respond to ping, starting indirect probe cycle"
);
while let Some(chosen) = self.member_buf.pop() {
let indirect = chosen.into_identity();
self.probe.expect_indirect_ack(indirect.clone());
self.send_message(
indirect,
Message::PingReq {
target: probed_id.clone(),
probe_number: self.probe.probe_number(),
},
&mut runtime,
)?;
}
Ok(())
}
Timer::ChangeSuspectToDown {
member_id,
incarnation,
token,
} => {
if self.timer_token == token {
let as_down = Member::new(member_id.clone(), incarnation, State::Down);
if let Some(summary) = self
.members
// Down is terminal, so before doing that we ensure the member
// is still under suspicion.
// Checking only incarnation is sufficient because to refute
// suspicion the member must increment its own incarnation
.apply_existing_if(as_down.clone(), |member| {
member.incarnation() == incarnation
})
{
self.handle_apply_summary(&summary, as_down, &mut runtime)?;
// Member went down we might need to adjust our internal state
self.adjust_connection_state(&mut runtime);
if self.config.notify_down_members {
// As a courtesy, we send a lightweight message to the member
// we're declaring down so that if it manages to receive it,
// it can react accordingly
self.send_message(member_id, Message::TurnUndead, runtime)?;
}
} else {
#[cfg(feature = "tracing")]
tracing::debug!(?member_id, "Member not found");
}
}
Ok(())
}
Timer::RemoveDown(down) => {
#[cfg_attr(
not(feature = "tracing"),
allow(unused_variables, clippy::if_same_then_else)
)]
if let Some(_removed) = self.members.remove_if_down(&down) {
#[cfg(feature = "tracing")]
tracing::debug!(?down, "Member removed");
} else {
#[cfg(feature = "tracing")]
tracing::debug!(?down, "Member not found / not down");
}
Ok(())
}
Timer::ProbeRandomMember(token) => {
if token == self.timer_token {
if self.connection_state != ConnectionState::Connected {
// Not expected to happen during normal operation, but
// may reach here via manually crafted Timer::
Err(Error::NotConnected)
} else {
self.probe_random_member(runtime)
}
} else {
// Invalid token, may happen whenever we go offline after
// being online
Ok(())
}
}
Timer::PeriodicAnnounce(token) => {
if token == self.timer_token && self.connection_state == ConnectionState::Connected
{
// The configuration may change during runtime, so we can't
// assume that this is Some() when the timer fires
if let Some(ref params) = self.config.periodic_announce {
// Re-schedule the event
runtime.submit_after(
Timer::PeriodicAnnounce(self.timer_token),
params.frequency,
);
// And send the messages
self.choose_and_send(params.num_members.get(), Message::Announce, runtime)?;
}
}
// else: invalid token and/or not-connected: may happen if the
// instance gets declared down by the cluster
Ok(())
}
Timer::PeriodicGossip(token) => {
// Exact same thing as PeriodicAnnounce, just using different settings / messages
if token == self.timer_token && self.connection_state == ConnectionState::Connected
{
if let Some(ref params) = self.config.periodic_gossip {
runtime.submit_after(
Timer::PeriodicGossip(self.timer_token),
params.frequency,
);
// Only actually gossip if there are updates to send
if !self.updates.is_empty() || !self.custom_broadcasts.is_empty() {
self.choose_and_send(
params.num_members.get(),
Message::Gossip,
runtime,
)?;
}
}
}
Ok(())
}
}
}
/// Reports the current length of the cluster updates queue.
///
/// Updates are transmitted [`Config::max_transmissions`] times
/// at most or until we learn new information about the same
/// member.
pub fn updates_backlog(&self) -> usize {
self.updates.len()
}
/// Repports the current length of the custom broadcast queue.
///
/// Custom broadcasts are transmitted [`Config::max_transmissions`]
/// times at most or until they get invalidated by another custom
/// broadcast.
pub fn custom_broadcast_backlog(&self) -> usize {
self.custom_broadcasts.len()
}
/// Replaces the current configuration with a new one.
///
/// Most of the time a static configuration is more than enough, but
/// for use-cases where the cluster size can drastically change during
/// normal operations, changing the configuration parameters is a
/// nicer alternative to recreating the Foca instance.
///
/// Presently, attempting to change [`Config::probe_period`] or
/// [`Config::probe_rtt`] results in [`Error::InvalidConfig`]; For
/// such cases it's recommended to recreate your Foca instance. When
/// an error occurrs, every configuration parameter remains
/// unchanged.
pub fn set_config(&mut self, config: Config) -> Result<()> {
if self.config.probe_period != config.probe_period
|| self.config.probe_rtt != config.probe_rtt
|| (self.config.periodic_announce.is_none() && config.periodic_announce.is_some())
|| (self.config.periodic_gossip.is_none() && config.periodic_gossip.is_some())
{
Err(Error::InvalidConfig)
} else {
#[cfg(feature = "tracing")]
tracing::debug!(?config, "Configuration changed");
self.config = config;
Ok(())
}
}
/// Handle data received from the network.
///
/// Data larger than the configured limit will be rejected. Errors are
/// expected if you're receiving arbitrary data (which very likely if
/// you are listening to a socket address).
pub fn handle_data(&mut self, mut data: &[u8], mut runtime: impl Runtime<T>) -> Result<()> {
#[cfg(feature = "tracing")]
let span = tracing::error_span!(
"handle_data",
len = data.len(),
header = tracing::field::Empty,
num_updates = tracing::field::Empty,
)
.entered();
if data.remaining() > self.config.max_packet_size.get() {
return Err(Error::DataTooBig);
}
let header = self
.codec
.decode_header(&mut data)
.map_err(anyhow::Error::msg)
.map_err(Error::Decode)?;
#[cfg(feature = "tracing")]
span.record("header", tracing::field::debug(&header));
if header.src == self.identity {
#[cfg(feature = "tracing")]
tracing::warn!(?self, "sender has same identity as ours");
return Err(Error::DataFromOurselves);
}
let remaining = data.remaining();
// A single trailing byte or a Announce payload with _any_
// data is bad
if remaining == 1 || (header.message == Message::Announce && remaining > 0) {
#[cfg(feature = "tracing")]
tracing::warn!("rejected malformed payload");
return Err(Error::MalformedPacket);
}
if !self.accept_payload(&header) {
#[cfg(feature = "tracing")]
tracing::debug!("payload not accepted");
return Ok(());
}
// We can skip this buffering is we assume that reaching here
// means the packet is valid. But that doesn't seem like a very
// good idea...
self.member_buf.clear();
if remaining >= 2 && header.message != Message::Broadcast {
let num_updates = data.get_u16();
#[cfg(feature = "tracing")]
span.record("num_updates", num_updates);
for _i in 0..num_updates {
self.member_buf.push(
self.codec
.decode_member(&mut data)
.map_err(anyhow::Error::msg)
.map_err(Error::Decode)?,
);
}
}
let Header {
src,
src_incarnation,
dst: _,
message,
} = header;
let sender_is_active = self
// It's a known member, so we ensure our knowledge about
// it is up-to-date (it is at _least_ alive, since it can
// talk)
.apply_update(
Member::new(src.clone(), src_incarnation, State::Alive),
&mut runtime,
)?;
// But dead members are ignored. At least until the member
// list gets reaped.
if !sender_is_active {
#[cfg(feature = "tracing")]
tracing::debug!("Discarded payload: Inactive sender");
if self.config.notify_down_members {
#[cfg(feature = "tracing")]
tracing::info!("Sender is considered down, sending notification message");
self.send_message(src, Message::TurnUndead, runtime)?;
}
return Ok(());
}
// Now that we know the member is active, we'll handle the
// updates, which may change our referential cluster
// representation and our own connection state.
//
// Here we take the Vec so we can drain it without upsetting
// the borrow checker. And then put it back in its place, so
// that we can keep reusing its already-allocated space.
let mut updates = mem::take(&mut self.member_buf);
self.apply_many(updates.drain(..), &mut runtime)?;
debug_assert!(
self.member_buf.is_empty(),
"member_buf modified while taken"
);
self.member_buf = updates;
// Right now there might still be some data left to read in the
// buffer (custom broadcasts).
// We choose to defer handling them until after we're done
// with the core of the protocol.
// If we're not connected (anymore), we can't react to a message
// So we just finish consuming the data
if self.connection_state != ConnectionState::Connected {
return self.handle_custom_broadcasts(data);
}
match message {
Message::Ping(probe_number) => {
self.send_message(src, Message::Ack(probe_number), runtime)?;
}
Message::Ack(probe_number) => {
#[cfg_attr(not(feature = "tracing"), allow(clippy::if_same_then_else))]
if self.probe.receive_ack(&src, probe_number) {
#[cfg(feature = "tracing")]
tracing::debug!(probed_id=?src, "Probe success");
} else {
// May be triggered by a member that slows down (say, you ^Z
// the proccess and `fg` back after a while).
// Might be interesting to keep an eye on.
#[cfg(feature = "tracing")]
tracing::warn!(
current_probe_number = self.probe.probe_number(),
"Unexpected Ack"
);
}
}
Message::PingReq {
target,
probe_number,
} => {
if target == self.identity {
return Err(Error::IndirectForOurselves);
} else {
self.send_message(
target,
Message::IndirectPing {
origin: src,
probe_number,
},
runtime,
)?;
}
}
Message::IndirectPing {
origin,
probe_number,
} => {
if origin == self.identity {
return Err(Error::IndirectForOurselves);
} else {
self.send_message(
src,
Message::IndirectAck {
target: origin,
probe_number,
},
runtime,
)?;
}
}
Message::IndirectAck {
target,
probe_number,
} => {
if target == self.identity {
return Err(Error::IndirectForOurselves);
} else {
self.send_message(
target,
Message::ForwardedAck {
origin: src,
probe_number,
},
runtime,
)?;
}
}
Message::ForwardedAck {
origin,
probe_number,
} =>
{
#[cfg_attr(not(feature = "tracing"), allow(clippy::if_same_then_else))]
if origin == self.identity {
return Err(Error::IndirectForOurselves);
} else if self.probe.receive_indirect_ack(&src, probe_number) {
#[cfg(feature = "tracing")]
tracing::debug!(
probed_id = ?self.probe.target(),
"Indirect probe success"
);
} else {
#[cfg(feature = "tracing")]
tracing::warn!("Unexpected ForwardedAck sender");
}
}
Message::Announce => self.send_message(src, Message::Feed, runtime)?,
Message::TurnUndead => {
#[cfg(feature = "tracing")]
tracing::warn!("The cluster thinks we're down");
self.handle_self_update(Incarnation::default(), State::Down, runtime)?
}
// Nothing to do. These messages do not expect any reply
Message::Gossip | Message::Feed | Message::Broadcast => {}
};
self.handle_custom_broadcasts(data)
}
fn serialize_member(&mut self, member: Member<T>) -> Result<Bytes> {
let mut buf = self.updates_buf.split();
self.codec
.encode_member(&member, &mut buf)
.map_err(anyhow::Error::msg)
.map_err(Error::Encode)?;
Ok(buf.freeze())
}
fn reset(&mut self) {
self.connection_state = ConnectionState::Disconnected;
self.incarnation = Incarnation::default();
self.timer_token = self.timer_token.wrapping_add(1);
self.probe.clear();
// XXX It might make sense to `self.updates.clear()` if we're
// down for a very long while, but we don't track instants
// internally... Exposing a public method to do so and
// letting drivers decide when to do it could be a way
// out. But recreating Foca is quite cheap, so revisit
// me maybe?
}
fn probe_random_member(&mut self, mut runtime: impl Runtime<T>) -> Result<()> {
debug_assert_eq!(self.connection_state, ConnectionState::Connected);
if !self.probe.validate() {
#[cfg(feature = "tracing")]
tracing::warn!(
probed_id = ?self.probe.target(),
"Recovering: Probe cycle didn't complete correctly"
);
// Probe has invalid state. We'll reset and submit another timer
// so that foca can recover from the issue gracefully
self.probe.clear();
runtime.submit_after(
Timer::ProbeRandomMember(self.timer_token),
self.config.probe_period,
);
return Err(Error::IncompleteProbeCycle);
}
if let Some(failed) = self.probe.take_failed() {
// Applying here can fail if:
//
// 1. The member increased its incarnation since the probe started
// (as a side effect of someone else probing and suspecting it)
//
// 2. The member was ALREADY suspect when we picked it for probing
//
// 3. The member is now Down, either by leaving voluntarily or by
// being declared down by another cluster member
//
// 4. The member doesn't exist anymore, which shouldn't actually
// happen...?
let as_suspect = Member::new(failed.id().clone(), failed.incarnation(), State::Suspect);
if let Some(summary) = self
.members
.apply_existing_if(as_suspect.clone(), |_member| true)
{
self.handle_apply_summary(&summary, as_suspect, &mut runtime)?;
// Now we ensure we change the member to Down if it
// isn't already inactive
if summary.is_active_now {
// We check for summary.apply_successful prior to logging
// because we may pick a member multiple times before the
// timer runs out.
// May lead to not logging at all if our knowledge of this
// member was already set as State::Suspect
#[cfg(feature = "tracing")]
if summary.apply_successful {
tracing::info!(
member_id = ?failed.id(),
timeout = ?self.config.suspect_to_down_after,
"Member failed probe, will declare it down if it doesn't react"
);
}
runtime.submit_after(
Timer::ChangeSuspectToDown {
member_id: failed.id().clone(),
incarnation: failed.incarnation(),
token: self.timer_token,
},
self.config.suspect_to_down_after,
);
}
} else {
#[cfg(feature = "tracing")]
tracing::error!(
member_id = ?failed.id(),
"Member failed probe but doesn't exist"
);
}
}
if let Some(member) = self.members.next(&mut self.rng) {
let member_id = member.id().clone();
let probe_number = self.probe.start(member.clone());
#[cfg(feature = "tracing")]
tracing::debug!(?member_id, "Probing member");
self.send_message(member_id.clone(), Message::Ping(probe_number), &mut runtime)?;
runtime.submit_after(
Timer::SendIndirectProbe {
probed_id: member_id,
token: self.timer_token,
},
self.config.probe_rtt,
);
} else {
// Should never happen... Reaching here is gated by being
// online, which requires having at least one active member
#[cfg(feature = "tracing")]
tracing::error!("Expected to find an active member to probe");
}
runtime.submit_after(
Timer::ProbeRandomMember(self.timer_token),
self.config.probe_period,
);
Ok(())
}
// shortcut for apply + handle
fn apply_update(&mut self, update: Member<T>, runtime: impl Runtime<T>) -> Result<bool> {
debug_assert_ne!(&self.identity, update.id());
let summary = self.members.apply(update.clone(), &mut self.rng);
self.handle_apply_summary(&summary, update, runtime)?;
Ok(summary.is_active_now)
}
fn handle_apply_summary(
&mut self,
summary: &ApplySummary,
update: Member<T>,
mut runtime: impl Runtime<T>,
) -> Result<()> {
let id = update.id().clone();
if summary.apply_successful {
#[cfg(feature = "tracing")]
tracing::debug!(?update, ?summary, "Update applied");
// Cluster state changed, start broadcasting it
let data = self.serialize_member(update)?;
self.updates.add_or_replace(
ClusterUpdate {
member_id: id.clone(),
data,
},
self.config.max_transmissions.get().into(),
);
// Down is a terminal state, so set up a handler for removing
// the member so that it may rejoin later
if !summary.is_active_now {
runtime.submit_after(Timer::RemoveDown(id.clone()), self.config.remove_down_after);
}
}
if summary.changed_active_set {
if summary.is_active_now {
#[cfg(feature = "tracing")]
tracing::info!(member_id=?id, "Member up");
runtime.notify(Notification::MemberUp(id));
} else {
#[cfg(feature = "tracing")]
tracing::info!(member_id=?id, "Member down");
runtime.notify(Notification::MemberDown(id));
}
}
Ok(())
}
fn handle_custom_broadcasts(&mut self, mut data: impl Buf) -> Result<()> {
#[cfg(feature = "tracing")]
if data.has_remaining() {
tracing::debug!(len = data.remaining(), "handle_custom_broadcasts");
}
while data.has_remaining() {
if let Some(broadcast) = self
.broadcast_handler
.receive_item(&mut data)
.map_err(anyhow::Error::msg)
.map_err(Error::CustomBroadcast)?
{
#[cfg(feature = "tracing")]
tracing::info!("received broadcast item");
self.custom_broadcasts
.add_or_replace(broadcast, self.config.max_transmissions.get().into());
}
}
Ok(())
}
fn become_disconnected(&mut self, mut runtime: impl Runtime<T>) {
// We reached zero active members, so we're offline
debug_assert_eq!(0, self.num_members());
self.connection_state = ConnectionState::Disconnected;
// Ignore every timer event we sent up until this point.
// This is to stop the probe cycle and prevent members from
// being switched the Down state since we have little
// confidence about our own state at this point.
self.timer_token = self.timer_token.wrapping_add(1);
self.probe.clear();
runtime.notify(Notification::Idle);
}
fn become_undead(&mut self, mut runtime: impl Runtime<T>) {
self.connection_state = ConnectionState::Undead;
// We're down, whatever we find out by probing is unreliable
self.probe.clear();
// Just like `become_disconnected`, we want to avoid
// handling events that aren't relevant anymore.
self.timer_token = self.timer_token.wrapping_add(1);
runtime.notify(Notification::Defunct);
}
fn become_connected(&mut self, mut runtime: impl Runtime<T>) {
debug_assert_ne!(0, self.num_members());
self.connection_state = ConnectionState::Connected;
// We have at least one active member, so we can start
// probing
runtime.submit_after(
Timer::ProbeRandomMember(self.timer_token),
self.config.probe_period,
);
if let Some(ref params) = self.config.periodic_announce {
runtime.submit_after(Timer::PeriodicAnnounce(self.timer_token), params.frequency);
}
if let Some(ref params) = self.config.periodic_gossip {
runtime.submit_after(Timer::PeriodicGossip(self.timer_token), params.frequency);
}
runtime.notify(Notification::Active);
}
fn send_message(
&mut self,
dst: T,
message: Message<T>,
mut runtime: impl Runtime<T>,
) -> Result<()> {
let header = Header {
src: self.identity.clone(),
src_incarnation: self.incarnation,
dst: dst.clone(),
message,
};
#[cfg(feature = "tracing")]
let span = tracing::error_span!(
"send_message",
?header,
num_updates = tracing::field::Empty,
num_broadcasts = tracing::field::Empty,
len = tracing::field::Empty,
)
.entered();
// XXX this looks very backwards. it's done as such to be able to
// reuse the buffer without having to do significant changes
// to the Codec trait or the existing code. With some effort,
// send_buf could simply be a Vec<u8>
// XXX We split_off() here and by the end we unsplit().
// This must be done for every return point in send_message
self.send_buf.clear();
let mut buf = self
.send_buf
.split_off(0)
.limit(self.config.max_packet_size.get());
debug_assert_eq!(
buf.get_ref().capacity(),
self.config.max_packet_size.get(),
"send_buf lost capacity, would trigger unnecessary allocs"
);
if let Err(err) = self
.codec
.encode_header(&header, &mut buf)
.map_err(anyhow::Error::msg)
.map_err(Error::Encode)
{
self.send_buf.unsplit(buf.into_inner());
return Err(err);
}
let (needs_piggyback, only_active_members) = match header.message {
// Announce/TurnUndead packets contain nothing but the header
Message::Announce | Message::TurnUndead => (false, false),
// Feed packets stuff active members at the tail
Message::Feed => (true, true),
// Broadcast packets stuffs only custom broadcasts
Message::Broadcast => (false, false),
// Every other message stuffs cluster updates
_ => (true, false),
};
// If we're piggybacking data, we need at least 2 extra bytes
// so that we can also encode the number of items we're stuffing
// into this buffer
if needs_piggyback && buf.remaining_mut() > 2 {
// Where we'll write the total number of items
let tally_position = buf.get_ref().len();
// We leave a zero here so that the buffer advances, then
// we'll come back to `tally_position` and overwrite this
// with the actual total
buf.put_u16(0);
let mut num_items = 0;
if only_active_members {
for member in self
.members
.iter_active()
.filter(|member| member.id() != &dst)
{
let pos = buf.get_ref().len();
if let Err(_ignored) = self.codec.encode_member(member, &mut buf) {
// encoding the member might have advanced the cursor
// of the buffer before yielding the error
// this resets it to the last known valid position
buf.get_mut().truncate(pos);
break;
}
num_items += 1;
}
} else {
// u16::MAX guarantees that num_events can be safely
// cast from usize to u16
let num_updates = self.updates.fill(&mut buf, u16::MAX.into());
num_items = u16::try_from(num_updates).expect("usize bound by u16::MAX");
}
// Seek back and write the correct number of items added
buf.get_mut()[tally_position..].as_mut().put_u16(num_items);
#[cfg(feature = "tracing")]
span.record("num_updates", num_items);
}
let add_custom_broadcast = buf.has_remaining_mut()
// Every message but Announce includes custom broadcasts by
// default
&& !matches!(header.message, Message::Announce)
// Unless the broadcast handler says no
&& self.broadcast_handler.should_add_broadcast_data(&dst);
if add_custom_broadcast {
// Fill the remaining space in the buffer with custom
// broadcasts, if any
#[cfg_attr(not(feature = "tracing"), allow(unused_variables))]
let num_broadcasts = self.custom_broadcasts.fill(&mut buf, usize::MAX);
#[cfg(feature = "tracing")]
span.record("num_broadcasts", num_broadcasts);
}
let data = buf.into_inner();
#[cfg(feature = "tracing")]
#[allow(clippy::needless_borrow)]
span.record("len", &data.len());
#[cfg(feature = "tracing")]
tracing::debug!("Message sent");
runtime.send_to(dst, &data);
// absorb the buf into send_buf so we can reuse its capacity
self.send_buf.unsplit(data);
Ok(())
}
fn accept_payload(&self, header: &Header<T>) -> bool {
// Only accept payloads addessed to us
header.dst == self.identity
// Unless it's an Announce message
|| (header.message == Message::Announce
// Then we accept it if DST is one of our _possible_
// identities
&& self.identity.has_same_prefix(&header.dst))
}
fn handle_self_update(
&mut self,
incarnation: Incarnation,
state: State,
mut runtime: impl Runtime<T>,
) -> Result<()> {
match state {
State::Suspect => {
match self.incarnation.cmp(&incarnation) {
// This can happen when a member received an update about
// someone else suspecting us but hasn't received our
// refutal yet. We can ignore it.
// There is a chance that it may lead to us being declared
// down due to this if our new incarnation doesn't reach
// them, but we shouldn't try to bump our incarnation again
// else we risk entering a game of counting
Ordering::Greater => {
#[cfg(feature = "tracing")]
tracing::debug!(
?self.incarnation,
suspected = incarnation,
"Ignored suspicion about old incarnation",
);
return Ok(());
}
// Unexpected: someone suspects our identity but thinks we were
// in a higher incarnation. May happen due to members flapping,
// but can also be a sign of a bad actor (multiple identical
// identities, clients bumping identities from other members,
// corrupted data, etc)
// We'll emit a warning and then refute the suspicion normally
Ordering::Less => {
#[cfg(feature = "tracing")]
tracing::warn!(
?self.incarnation,
suspected = incarnation,
"Suspicion on incarnation higher than current",
);
}
// The usual case: our current incarnation is being suspected,
// so we need to bump ours.
Ordering::Equal => {}
};
let incarnation = Incarnation::max(incarnation, self.incarnation);
// We need to rejoin the cluster when this situation happens
// because it will be impossible to refute suspicion
if incarnation == Incarnation::MAX {
if !self.attempt_rejoin(&mut runtime)? {
#[cfg(feature = "tracing")]
tracing::warn!("Inactive: reached Incarnation::MAX",);
self.become_undead(runtime);
}
return Ok(());
}
// XXX Overzealous checking
self.incarnation = incarnation.saturating_add(1);
// We do NOT add ourselves as Alive to the updates buffer
// because it's unnecessary: by bumping our incarnation *any*
// message we send will be interpreted as a broadcast update
// See: `tests::message_from_aware_suspect_refutes_suspicion`
}
State::Alive => {
// The cluster is talking about our liveness. Nothing to do.
}
State::Down => {
// It's impossible to refute a Down state so we'll need
// to rejoin somehow
if !self.attempt_rejoin(&mut runtime)? {
self.become_undead(runtime);
}
}
}
Ok(())
}
fn attempt_rejoin(&mut self, mut runtime: impl Runtime<T>) -> Result<bool> {
if let Some(new_identity) = self.identity.renew() {
if self.identity == new_identity {
#[cfg(feature = "tracing")]
tracing::warn!("Rejoin failure: Identity::renew() returned same id",);
Ok(false)
} else {
self.change_identity(new_identity.clone(), &mut runtime)?;
runtime.notify(Notification::Rejoin(new_identity));
Ok(true)
}
} else {
Ok(false)
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
enum ConnectionState {
Disconnected,
Connected,
Undead,
}
/// A Broadcast Handler that rejects any form of custom broadcast.
///
/// Used by Foca when constructed via [`Foca::new()`].
pub struct NoCustomBroadcast;
/// Error emitted by [`NoCustomBroadcast`] when any trailing byte is
/// found. Will be wrapped by [`Error`]
#[derive(Debug, Clone, Copy)]
pub struct BroadcastsDisabledError;
impl fmt::Display for BroadcastsDisabledError {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("Broadcasts disabled")
}
}
#[cfg(feature = "std")]
impl std::error::Error for BroadcastsDisabledError {}
impl<T> BroadcastHandler<T> for NoCustomBroadcast {
type Broadcast = &'static [u8];
type Error = BroadcastsDisabledError;
fn receive_item(
&mut self,
_data: impl Buf,
) -> core::result::Result<Option<Self::Broadcast>, Self::Error> {
Err(BroadcastsDisabledError)
}
}
struct ClusterUpdate<T> {
member_id: T,
data: Bytes,
}
impl<T: PartialEq> Invalidates for ClusterUpdate<T> {
// State is managed externally (via Members), so invalidation
// is a trivial replace-if-same-key
fn invalidates(&self, other: &Self) -> bool {
self.member_id == other.member_id
}
}
impl<T> AsRef<[u8]> for ClusterUpdate<T> {
fn as_ref(&self) -> &[u8] {
self.data.as_ref()
}
}
#[cfg(test)]
impl<T, C, RNG, B> Foca<T, C, RNG, B>
where
T: Identity,
C: Codec<T>,
RNG: rand::Rng,
B: BroadcastHandler<T>,
{
pub fn incarnation(&self) -> Incarnation {
self.incarnation
}
pub fn probe(&self) -> &Probe<T> {
&self.probe
}
pub fn timer_token(&self) -> TimerToken {
self.timer_token
}
pub(crate) fn connection_state(&self) -> ConnectionState {
self.connection_state
}
pub(crate) fn apply(&mut self, member: Member<T>, mut runtime: impl Runtime<T>) -> Result<()> {
self.apply_many(core::iter::once(member), &mut runtime)
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::vec;
use core::{
num::{NonZeroU8, NonZeroUsize},
time::Duration,
};
use bytes::{Buf, BufMut};
use rand::{rngs::SmallRng, SeedableRng};
use crate::testing::{BadCodec, InMemoryRuntime, ID};
fn rng() -> SmallRng {
SmallRng::seed_from_u64(0xF0CA)
}
fn codec() -> BadCodec {
BadCodec
}
fn config() -> Config {
Config::simple()
}
fn encode(src: (Header<ID>, Vec<Member<ID>>)) -> Bytes {
let (header, updates) = src;
let mut codec = codec();
let mut buf = BytesMut::new();
codec
.encode_header(&header, &mut buf)
.expect("MAYBE FIXME?");
if !updates.is_empty() {
buf.put_u16(u16::try_from(updates.len()).unwrap());
for member in updates.iter() {
codec.encode_member(member, &mut buf).expect("MAYBE FIXME?");
}
}
buf.freeze()
}
fn decode(mut src: impl Buf) -> (Header<ID>, Vec<Member<ID>>) {
let mut codec = codec();
let header = codec.decode_header(&mut src).unwrap();
let mut updates = Vec::new();
if src.has_remaining() {
let num_items = src.get_u16();
updates.reserve(num_items.into());
for _i in 0..num_items {
updates.push(codec.decode_member(&mut src).unwrap());
}
}
(header, updates)
}
#[test]
fn invariants() {
let identity = ID::new(42);
let mut foca = Foca::new(identity, config(), rng(), codec());
assert_eq!(ConnectionState::Disconnected, foca.connection_state());
assert_eq!(0, foca.num_members());
assert_eq!(None, foca.iter_members().next());
assert_eq!(Err(Error::NotUndead), foca.reuse_down_identity());
let mut runtime = InMemoryRuntime::new();
assert_eq!(
Err(Error::SameIdentity),
foca.change_identity(identity, &mut runtime)
);
assert_eq!(Ok(()), foca.change_identity(ID::new(43), &mut runtime));
assert_eq!(&ID::new(43), foca.identity());
}
#[test]
fn cant_change_config_probe_timers() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut bad_config = config();
bad_config.probe_rtt += Duration::from_millis(1);
assert_eq!(
Err(Error::InvalidConfig),
foca.set_config(bad_config),
"must not be able to change probe_rtt"
);
let mut bad_config = config();
bad_config.probe_period -= Duration::from_secs(1);
assert_eq!(
Err(Error::InvalidConfig),
foca.set_config(bad_config),
"must not be able to change probe_period"
);
assert_eq!(Ok(()), foca.set_config(config()));
}
#[test]
fn cant_probe_when_not_connected() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let runtime = InMemoryRuntime::new();
let res = foca.handle_timer(Timer::ProbeRandomMember(foca.timer_token()), runtime);
assert_eq!(Err(Error::NotConnected), res);
}
#[test]
fn codec_errors_are_forwarded_correctly() {
// A codec that only produces errors
struct UnitErroringCodec;
#[derive(Debug)]
struct UnitError;
impl fmt::Display for UnitError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("no")
}
}
impl Codec<ID> for UnitErroringCodec {
type Error = UnitError;
fn encode_header(
&mut self,
_header: &Header<ID>,
_buf: impl BufMut,
) -> core::result::Result<(), Self::Error> {
Err(UnitError)
}
fn decode_header(
&mut self,
_buf: impl Buf,
) -> core::result::Result<Header<ID>, Self::Error> {
Err(UnitError)
}
fn encode_member(
&mut self,
_member: &Member<ID>,
_buf: impl BufMut,
) -> core::result::Result<(), Self::Error> {
Err(UnitError)
}
fn decode_member(
&mut self,
_buf: impl Buf,
) -> core::result::Result<Member<ID>, Self::Error> {
Err(UnitError)
}
}
// And a runtime that does nothing to pair it with
struct NoopRuntime;
impl Runtime<ID> for NoopRuntime {
fn notify(&mut self, _notification: Notification<ID>) {}
fn send_to(&mut self, _to: ID, _data: &[u8]) {}
fn submit_after(&mut self, _event: Timer<ID>, _after: Duration) {}
}
let mut foca = Foca::new(ID::new(1), Config::simple(), rng(), UnitErroringCodec);
assert_eq!(
Err(Error::Encode(anyhow::Error::msg(UnitError))),
foca.announce(ID::new(2), NoopRuntime)
);
assert_eq!(
Err(Error::Decode(anyhow::Error::msg(UnitError))),
foca.handle_data(b"hue", NoopRuntime)
);
}
macro_rules! expect_scheduling {
($runtime: expr, $timer: expr, $after: expr) => {
$runtime
.take_scheduling($timer)
.map(|after| assert_eq!(after, $after, "Incorrect scheduling for {:?}", $timer))
.unwrap_or_else(|| panic!("Timer {:?} not found", $timer));
};
}
macro_rules! expect_notification {
($runtime: expr, $notification: expr) => {
$runtime
.take_notification($notification)
.unwrap_or_else(|| panic!("Notification {:?} not found", $notification));
};
}
macro_rules! reject_notification {
($runtime: expr, $notification: expr) => {
assert!(
$runtime.take_notification($notification).is_none(),
"Unwanted notification {:?} found",
$notification
);
};
}
macro_rules! expect_message {
($runtime: expr, $member: expr, $message: expr) => {
let d = $runtime
.take_data($member)
.unwrap_or_else(|| panic!("Message to member {:?} not found", $member));
let (header, _) = decode(d);
assert_eq!(
header.message, $message,
"Message to member {:?} is {:?}. Expected {:?}",
$member, header.message, $message
);
};
}
#[test]
fn can_join_with_another_client() {
let mut foca_one = Foca::new(ID::new(1), config(), rng(), codec());
let mut foca_two = Foca::new(ID::new(2), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
// Here foca_one will send an announce packet to foca_two
foca_one
.announce(*foca_two.identity(), &mut runtime)
.expect("no errors");
assert_eq!(
0,
foca_one.num_members(),
"announcing shouldn't change members"
);
// So the runtime should've been instructed to send a
// message to foca_two
let data = decode(
runtime
.take_data(ID::new(2))
.expect("No data for ID::new(2) found"),
);
assert_eq!(data.0.message, Message::Announce);
runtime.clear();
foca_two
.handle_data(&encode(data), &mut runtime)
.expect("no errors");
// Right now, foca_two should be aware of foca_one
assert_eq!(1, foca_two.num_members());
// Whilst foca_one is oblivious to the effect of its announce
assert_eq!(0, foca_one.num_members());
// So we should have gotten a notification about going online
expect_notification!(runtime, Notification::<ID>::Active);
expect_notification!(runtime, Notification::MemberUp(ID::new(1)));
// And a event to trigger a probe should've been
// scheduled
expect_scheduling!(
runtime,
Timer::<ID>::ProbeRandomMember(foca_one.timer_token()),
config().probe_period
);
// More importantly, the runtime should've been instructed to
// send a feed to foca_one, which will finally complete its
// join cycle
let data = decode(
runtime
.take_data(ID::new(1))
.expect("Feed for ID::new(1) not found"),
);
assert_eq!(data.0.message, Message::Feed);
runtime.clear();
assert_eq!(Ok(()), foca_one.handle_data(&encode(data), &mut runtime));
expect_notification!(runtime, Notification::<ID>::Active);
expect_notification!(runtime, Notification::MemberUp(ID::new(2)));
assert_eq!(1, foca_one.num_members());
}
#[test]
fn feed_contains_only_active_members() {
// We'll make `foca_one` send an Announce to `foca_two` and verify
// that its reply is a Feed containing its known *active* members
let one = ID::new(1);
let two = ID::new(2);
let mut foca_one = Foca::new(one, config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
assert_eq!(Ok(()), foca_one.announce(two, &mut runtime));
let data = runtime
.take_data(two)
.expect("Should have a message for foca_two");
runtime.clear();
// Members 3 and 4 are active, 5 is down and should not be
// found in the feed
let members = [
Member::alive(ID::new(3)),
Member::suspect(ID::new(4)),
Member::down(ID::new(5)),
];
let mut foca_two = Foca::new(two, config(), rng(), codec());
// Let `foca_two` know about the members
for member in members.iter() {
assert_eq!(Ok(()), foca_two.apply(member.clone(), &mut runtime));
}
// Receive the packet from `foca_one`
assert_eq!(Ok(()), foca_two.handle_data(&data, &mut runtime));
let feed_data = runtime
.take_data(one)
.expect("Should have a message for foca_one");
let (header, updates) = decode(feed_data);
assert_eq!(header.message, Message::Feed);
assert_eq!(2, updates.len());
assert_eq!(
members
.iter()
.cloned()
.filter(|m| m.is_active())
.collect::<Vec<_>>(),
updates
);
}
#[test]
fn piggyback_behaviour() {
let max_transmissions = NonZeroU8::new(10).unwrap();
let num_indirect_probes = NonZeroUsize::new(3).unwrap();
let config = Config {
max_transmissions,
num_indirect_probes,
..config()
};
let mut foca = Foca::new(ID::new(1), config.clone(), rng(), codec());
// A manually crafted Gossip packet from ID::new(2) addressed to
// our foca instance
let data = {
let header = Header {
src: ID::new(2),
src_incarnation: 0,
dst: ID::new(1),
message: Message::Gossip,
};
// Containing these cluster updates:
let updates = vec![
Member::new(ID::new(3), 3, State::Alive),
Member::new(ID::new(4), 1, State::Suspect),
Member::new(ID::new(5), 1, State::Down),
];
(header, updates)
};
let mut runtime = InMemoryRuntime::new();
assert_eq!(Ok(()), foca.handle_data(&encode(data), &mut runtime));
expect_notification!(runtime, Notification::<ID>::Active);
// We didn't know about any mentioned in the packet
expect_notification!(runtime, Notification::MemberUp(ID::new(2)));
expect_notification!(runtime, Notification::MemberUp(ID::new(3)));
expect_notification!(runtime, Notification::MemberUp(ID::new(4)));
// But an update about a Down member that we didn't know
// about is cluster metadata only and shouldn't trigger
// a notification
reject_notification!(runtime, Notification::MemberDown(ID::new(5)));
// It should, however, trigger a scheduling for forgetting
// the member, so that they may rejoin the cluster
expect_scheduling!(
runtime,
Timer::RemoveDown(ID::new(5)),
config.remove_down_after
);
// 2 active members from the updates + the member the sent
// the payload
assert_eq!(3, foca.num_members());
let mut members = foca
.iter_members()
.map(Member::id)
.cloned()
.collect::<Vec<_>>();
members.sort_unstable();
assert_eq!(vec![ID::new(2), ID::new(3), ID::new(4)], members);
// Now, whenever we send a message that isn't part of the
// join subprotocol (i.e.: not Feed nor Announce)
// we should be emitting updates regardless of who we're
// sending the message to.
runtime.clear();
assert_eq!(Ok(()), foca.gossip(&mut runtime));
// When we gossip, we pick random `num_indirect_probes`
// members to send them. And every update is disseminated
// at most `max_transmissions` times.
//
// Since our ids are tiny, we know that every update
// we have at the moment (the 3 that we received, plus
// the discovery of the sender) will fit in a single
// message.
//
// And since we just verified we have 3 active members,
// which is exactly our fan out parameter
// (`num_indirect_probes`), we expect that every
// call to `gossip()` will drain 3 from the transmission
// count of each update.
//
// So now we have `max_transmissions - 3` remaining
// transmissions for each update.
let mut remaining_tx = usize::from(max_transmissions.get()) - foca.num_members();
assert_eq!(
4,
foca.updates_backlog(),
"We should still have 4 updates in the backlog"
);
// let's gossip some more until we're in a more interesting
// scenario
while remaining_tx >= foca.num_members() {
assert_eq!(Ok(()), foca.gossip(&mut runtime));
remaining_tx -= foca.num_members();
// So long as we have remaining_tx, the backlog should
// remain the same
assert_eq!(4, foca.updates_backlog());
}
assert!(remaining_tx < foca.num_members() && remaining_tx > 0);
assert_eq!(4, foca.updates_backlog());
// Now we sent enough broadcasts that we'll finally see the updates
// backlog tank.
// Since max_transmissions is set to 10 and every gossip() call
// dropped 3:
assert_eq!(1, remaining_tx);
// Which means that the next gossip round should not only
// finally drain the backlog: only one of the three Gossip
// messages sent will contain our 4 updates. The remaining
// should have no update at all.
// (The value of an empty gossip message is questionable, but
// since a valid message counts as a valid update it
// essentially helps disseminate the knowledge of our
// existance)
runtime.clear();
assert_eq!(Ok(()), foca.gossip(&mut runtime));
let mut gossip_with_updates = 0;
let mut empty_gossip = 0;
for (_dst, data) in runtime.take_all_data() {
let (header, updates) = decode(data);
assert_eq!(Message::Gossip, header.message);
if updates.is_empty() {
empty_gossip += 1;
} else {
gossip_with_updates += 1;
}
}
assert_eq!(1, gossip_with_updates);
assert_eq!(2, empty_gossip);
assert_eq!(0, foca.updates_backlog());
}
#[test]
fn new_down_member_triggers_remove_down_scheduling() -> Result<()> {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
// ID::new(2) is new and down
foca.apply(Member::down(ID::new(2)), &mut runtime)?;
expect_scheduling!(
runtime,
Timer::RemoveDown(ID::new(2)),
config().remove_down_after
);
// We already know about ID::new(2) being down. So we don't
// want to schedule anything
foca.apply(Member::down(ID::new(2)), &mut runtime)?;
assert_eq!(
None,
runtime.take_scheduling(Timer::RemoveDown(ID::new(2))),
"Must not duplicate removal scheduling"
);
// A new _active_ member must not trigger the scheduling
foca.apply(Member::alive(ID::new(3)), &mut runtime)?;
assert_eq!(
None,
runtime.take_scheduling(Timer::RemoveDown(ID::new(3))),
"Must not schedule removal of active member ID=3"
);
// But it should trigger if we change it to down via an update
foca.apply(Member::down(ID::new(3)), &mut runtime)?;
expect_scheduling!(
runtime,
Timer::RemoveDown(ID::new(3)),
config().remove_down_after
);
Ok(())
}
#[test]
fn notification_triggers() -> Result<()> {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
// Brand new member. The first in our set, so we should
// also be notified about going active
foca.apply(Member::alive(ID::new(2)), &mut runtime)?;
expect_notification!(runtime, Notification::MemberUp(ID::new(2)));
expect_notification!(runtime, Notification::<ID>::Active);
// Updated/stale knowledge about an active member shouldn't
// trigger a notification so long as it doesn't go down
runtime.clear();
foca.apply(Member::alive(ID::new(2)), &mut runtime)?;
foca.apply(Member::suspect(ID::new(2)), &mut runtime)?;
foca.apply(Member::new(ID::new(2), 10, State::Alive), &mut runtime)?;
reject_notification!(runtime, Notification::MemberUp(ID::new(2)));
reject_notification!(runtime, Notification::MemberDown(ID::new(2)));
// Another new member
runtime.clear();
foca.apply(Member::suspect(ID::new(3)), &mut runtime)?;
expect_notification!(runtime, Notification::MemberUp(ID::new(3)));
reject_notification!(runtime, Notification::<ID>::Active);
// Existing member going down
runtime.clear();
foca.apply(Member::down(ID::new(3)), &mut runtime)?;
expect_notification!(runtime, Notification::MemberDown(ID::new(3)));
// A stale update should trigger no notification
runtime.clear();
foca.apply(Member::down(ID::new(3)), &mut runtime)?;
reject_notification!(runtime, Notification::MemberDown(ID::new(3)));
// A new member, but already down, so no notification
runtime.clear();
foca.apply(Member::down(ID::new(4)), &mut runtime)?;
reject_notification!(runtime, Notification::MemberDown(ID::new(4)));
// Last active member going down, we're going idle
runtime.clear();
assert_eq!(1, foca.num_members());
foca.apply(Member::down(ID::new(2)), &mut runtime)?;
expect_notification!(runtime, Notification::MemberDown(ID::new(2)));
expect_notification!(runtime, Notification::<ID>::Idle);
// New active member, going back to active
runtime.clear();
foca.apply(Member::alive(ID::new(5)), &mut runtime)?;
expect_notification!(runtime, Notification::MemberUp(ID::new(5)));
expect_notification!(runtime, Notification::<ID>::Active);
// Now someone declared us (ID=1) down, we should
// go defunct
runtime.clear();
foca.apply(Member::down(ID::new(1)), &mut runtime)?;
expect_notification!(runtime, Notification::<ID>::Defunct);
// But since we're not part of the member list, there shouldn't
// be a notification about our id going down
reject_notification!(runtime, Notification::MemberDown(ID::new(1)));
// While defunct, we can still maintain members,
runtime.clear();
foca.apply(Member::down(ID::new(5)), &mut runtime)?;
expect_notification!(runtime, Notification::MemberDown(ID::new(5)));
foca.apply(Member::alive(ID::new(6)), &mut runtime)?;
expect_notification!(runtime, Notification::MemberUp(ID::new(6)));
// But until manual intervention happens, we are not active
reject_notification!(runtime, Notification::<ID>::Active);
assert_eq!(Ok(()), foca.reuse_down_identity());
// Now since we are not defunct anymore, any message
// received, even if it's a stale update should
// notify that we're active again
runtime.clear();
assert_eq!(1, foca.num_members());
foca.apply(Member::alive(ID::new(6)), &mut runtime)?;
expect_notification!(runtime, Notification::<ID>::Active);
Ok(())
}
#[test]
fn not_submitting_indirect_probe_timer_causes_probe_error() -> Result<()> {
// The probe cycle requires two timer events:
//
// 1. Timer::ProbeRandomMember, which starts the probe
// 2. Timer::SendIndirectProbe, which sends indirect
// probes IFF we haven't received a direct reply
//
// This test verifies that not submitting the second
// timer event causes an error.
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
// Add an active member so that the probing can start
foca.apply(Member::alive(ID::new(2)), &mut runtime)?;
let probe_random_member = Timer::ProbeRandomMember(foca.timer_token());
expect_scheduling!(runtime, probe_random_member.clone(), config().probe_period);
// Start the probe now, instead of after `probe_period`
runtime.clear();
assert_eq!(
Ok(()),
foca.handle_timer(probe_random_member.clone(), &mut runtime)
);
// Which should instruct the runtime to trigger the second stage of
// the probe after `probe_rtt`
expect_scheduling!(
runtime,
Timer::SendIndirectProbe {
probed_id: ID::new(2),
token: foca.timer_token(),
},
config().probe_rtt
);
// But instead of triggering send_indirect_probe as instructed
// we'll trigger probe_random_member again, simulating a
// broken runtime
assert_eq!(
Err(Error::IncompleteProbeCycle),
foca.handle_timer(probe_random_member, &mut runtime)
);
Ok(())
}
#[test]
fn receiving_indirect_for_ourselves_causes_error() {
// To pierce holes/partitions in the cluster the protocol
// has a mechanism to request a member to talk to another
// one on our behalf.
//
// This test verifies that if someone ask us to talk to
// ourselves via this mechanism, an error occurrs.
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
let probe_number = foca.probe().probe_number();
let indirect_messages = vec![
Message::PingReq {
target: ID::new(1),
probe_number,
},
Message::IndirectPing {
origin: ID::new(1),
probe_number,
},
Message::IndirectAck {
target: ID::new(1),
probe_number,
},
Message::ForwardedAck {
origin: ID::new(1),
probe_number,
},
];
for message in indirect_messages.into_iter() {
let bad_header = Header {
src: ID::new(2),
src_incarnation: 0,
dst: ID::new(1),
message,
};
assert_eq!(
Err(Error::IndirectForOurselves),
foca.handle_data(&encode((bad_header, Vec::new())), &mut runtime)
);
}
}
#[test]
fn cant_receive_data_from_same_identity() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
assert_eq!(
Err(Error::DataFromOurselves),
foca.handle_data(
&encode((
Header {
src: ID::new(1),
src_incarnation: 0,
dst: ID::new(1),
message: Message::Announce
},
Vec::new()
)),
&mut runtime
)
);
}
#[test]
fn cant_receive_announce_with_extra_data() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
assert_eq!(
Err(Error::MalformedPacket),
foca.handle_data(
&encode((
Header {
src: ID::new(2),
src_incarnation: 0,
dst: ID::new(1),
message: Message::Announce
},
Vec::from([Member::alive(ID::new(3))])
)),
&mut runtime
)
);
}
#[test]
fn announce_to_wrong_id_is_accepted_if_same_prefix() {
// Joining a cluster involves sending a Announce message to a
// member we know about, that means that we need to know the
// exact identity of a member.
//
// Re-joining a cluster involves either waiting until the
// cluster forgets you went down or simply changing your
// identity.
//
// That's when things may get confusing: if we want to be
// able to rejoin a cluster fast, we need to be able to change
// identities; But if everyone can change identities, how
// can we send a valid Announce message?
//
// To facilitate this, we provide a mechanism relax the
// check on Announce messages: if the packet was not addressed
// directly to us, but to an identity that "has the same prefix"
// we accept it.
//
// This mechanism is disabled by default. To enable it an
// identity must specialize the default implementation of
// the `has_same_prefix` method to yield `true` when they
// want.
//
// This test verifies that this mechanism actually works.
// This is our running Foca instance, with `target_id`. Nobody
// in the cluster knows that our bump is 255, but everyone
// knows about the ID::new(1) part.
let target_id = ID::new_with_bump(1, 255);
let codec = codec();
let mut foca = Foca::new(target_id, config(), rng(), codec);
let mut runtime = InMemoryRuntime::new();
// Our goal is getting `src` to join `target_id`'s cluster.
let src_id = ID::new(2);
// We'll send a packet destined to the wrong id, not
// passing the "has same prefix" check to verify the join
// doesn't happen
let wrong_dst = ID::new(3);
assert!(!target_id.has_same_prefix(&wrong_dst));
let data = (
Header {
src: src_id,
src_incarnation: 0,
dst: wrong_dst,
message: Message::Announce,
},
Vec::new(),
);
// Whislt it won't cause any errors
assert_eq!(Ok(()), foca.handle_data(&encode(data), &mut runtime));
// The packet was simply ignored:
assert_eq!(0, foca.num_members());
// Now we'll send it to an identity that matches the same
// prefix check
let dst = ID::new_with_bump(1, 42);
assert_ne!(target_id, dst);
assert!(target_id.has_same_prefix(&dst));
let data = (
Header {
src: src_id,
src_incarnation: 0,
dst,
message: Message::Announce,
},
Vec::new(),
);
assert_eq!(Ok(()), foca.handle_data(&encode(data), &mut runtime));
// So we should've successfully joined
assert_eq!(1, foca.num_members());
assert!(foca.iter_members().any(|member| member.id() == &src_id));
}
#[test]
fn suspicion_refutal() -> Result<()> {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
let original_incarnation = foca.incarnation();
// Update declaring we are suspect.
// We should be able to refute it simply by increasing
// our incarnation
foca.apply(Member::suspect(ID::new(1)), &mut runtime)?;
assert!(original_incarnation < foca.incarnation());
// Our incarnation may grow until a maximum level
foca.apply(
Member::new(ID::new(1), Incarnation::MAX - 1, State::Suspect),
&mut runtime,
)?;
assert_eq!(Incarnation::MAX, foca.incarnation());
// But if we live long enough, we may reach a point where
// the incarnation is too high to refute. When this
// happens, manual intervention is required.
foca.apply(
Member::new(ID::new(1), Incarnation::MAX, State::Suspect),
&mut runtime,
)?;
assert_eq!(ConnectionState::Undead, foca.connection_state());
Ok(())
}
#[test]
fn change_identity_gossips_immediately() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
// Introduce a new member so we have someone to gossip to
assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(2)), &mut runtime));
assert_eq!(Ok(()), foca.change_identity(ID::new(99), &mut runtime));
assert!(foca.updates_backlog() > 0);
let (header, updates) = decode(
runtime
.take_data(ID::new(2))
.expect("Should have sent a message to ID=2"),
);
assert_eq!(Message::Gossip, header.message);
assert!(!updates.is_empty());
}
#[test]
fn changing_identity_resets_timer_token() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let orig_timer_token = foca.timer_token();
let mut runtime = InMemoryRuntime::new();
assert_eq!(Ok(()), foca.change_identity(ID::new(2), &mut runtime));
assert_ne!(orig_timer_token, foca.timer_token());
}
#[test]
fn renew_during_probe_shouldnt_cause_errors() {
let id = ID::new(1).rejoinable();
let mut foca = Foca::new(id, config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
let updates = [
Member::alive(ID::new(2)),
Member::alive(ID::new(3)),
Member::alive(ID::new(4)),
];
// Prepare a foca instance with 3 known peers
assert_eq!(
Ok(()),
foca.apply_many(updates.iter().cloned(), &mut runtime)
);
// By now we should've gotten an event to schedule probing
let expected_timer = Timer::ProbeRandomMember(foca.timer_token());
expect_scheduling!(runtime, expected_timer.clone(), config().probe_period);
// When it fires (after Config::probe_period normally- directly now)
// the probe cycle starts
assert_eq!(Ok(()), foca.handle_timer(expected_timer, &mut runtime));
// Which instructs us to probe a random member
let probe_random_member_timer = runtime
.find_scheduling(|e| matches!(e, Timer::ProbeRandomMember(_)))
.expect("Probe cycle should have started")
.clone();
// Now we're in the middle of a probe cycle. What happens if
// we are forced to change identities (either via being declared
// down or manually changing ids)?
let new_id = id.renew().unwrap();
assert_eq!(Ok(()), foca.change_identity(new_id, &mut runtime));
// In the bug scenario, a member received our new identity
// and sent us a message, so foca became connected:
//
// assert_eq!(Ok(()), foca.apply(Member::alive(new_id), &mut runtime));
//
// And THEN the ProbeRandomMember event fired, which accepted
// the event and made it all the way to `probe_random_member`
// that correctly detected that there was still a member being
// probed when it shouldn't
// But the issue was present even before receiving a message
// that makes foca go online: Since before the bug fix the
// timer token wasn't being updated, the ProbeRandomMember
// event was accepted and another trip-wire would fire:
// `Error::NotConnected` (right before calling
// `probe_random_member`, that would lead to the original
// scenario)
// So this verifies that Foca now correctly discards the
// event instead of throwing an error:
assert_eq!(
Ok(()),
foca.handle_timer(probe_random_member_timer, &mut runtime)
);
}
// Simple helper to ease testing of the probe cycle
// Yields:
// .0: a foca instance, ID=1, with `num_members` active members
// .1: the member being probed
// .2: the event (ProbeRandomMember) to submit in order
// to continue the probe cycle
fn craft_probing_foca(
num_members: u8,
config: Config,
) -> (
Foca<ID, BadCodec, SmallRng, NoCustomBroadcast>,
ID,
Timer<ID>,
) {
let mut foca = Foca::new(ID::new(1), config.clone(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
assert!(num_members > 0);
// Assume some members exist
for smallid in 2..(num_members + 2) {
foca.apply(Member::alive(ID::new(smallid)), &mut runtime)
.expect("infallible");
}
// The runtime shoud've been instructed to schedule a
// probe for later on
let expected_timer = Timer::ProbeRandomMember(foca.timer_token());
expect_scheduling!(runtime, expected_timer.clone(), config.probe_period);
// We'll trigger it right now instead
runtime.clear();
assert_eq!(Ok(()), foca.handle_timer(expected_timer, &mut runtime));
let probed = *foca.probe().target().expect("Probe should have started");
// Now we know which member is being probed. So we can verify
// that a ping message was sent to it:
let (header, _updates) = decode(
runtime
.take_data(probed)
.expect("Should have initiated a probe"),
);
assert!(matches!(header.message, Message::Ping(_)));
// We should also have received a scheduling request
// for when we should trigger the second stage of the
// probe
let send_indirect_probe = Timer::SendIndirectProbe {
probed_id: probed,
token: foca.timer_token(),
};
expect_scheduling!(runtime, send_indirect_probe.clone(), config.probe_rtt);
(foca, probed, send_indirect_probe)
}
#[test]
fn going_idle_clears_probe_state() {
// Here we'll craft a scenario where a foca instance is in the middle
// of a probe cycle when, for whatever reason, it learns that there
// are no more active members in the cluster (thus going Idle)
// A foca is probing
let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2, config());
let mut runtime = InMemoryRuntime::new();
// Clippy gets it wrong here: can't use just the plain iterator
// otherwise foca remains borrowed
#[allow(clippy::needless_collect)]
let updates = foca
.iter_members()
.map(Member::id)
.cloned()
.map(Member::down)
.collect::<Vec<_>>();
// But somehow all members "disappear"
assert_eq!(Ok(()), foca.apply_many(updates.into_iter(), &mut runtime));
// Making the instance go idle
expect_notification!(runtime, Notification::<ID>::Idle);
// The probe state should've been cleared now so that when the instance
// resumes operation things are actually functional
assert!(foca.probe().validate(), "invalid probe state")
}
#[test]
fn probe_ping_ack_cycle() {
let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5, config());
let mut runtime = InMemoryRuntime::new();
// Now if probed replies before the timer fires, the probe
// should complete and the indirect probe cycle shouldn't
// start.
let data = (
Header {
src: probed,
src_incarnation: Incarnation::default(),
dst: ID::new(1),
message: Message::Ack(foca.probe().probe_number()),
},
Vec::new(),
);
assert_eq!(Ok(()), foca.handle_data(&encode(data), &mut runtime));
assert_eq!(Ok(()), foca.handle_timer(send_indirect_probe, &mut runtime));
assert!(
foca.probe().succeeded(),
"probe should have succeeded after Ack"
);
}
#[test]
fn probe_cycle_requires_correct_probe_number() {
let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5, config());
let mut runtime = InMemoryRuntime::new();
let incorrect_probe_number = foca.probe().probe_number() + 1;
assert_ne!(incorrect_probe_number, foca.probe().probe_number());
// An Ack payload akin to the one in `tests::probe_ping_ack_cycle`,
// but with an incorrect probe number
let data = (
Header {
src: probed,
src_incarnation: Incarnation::default(),
dst: ID::new(1),
message: Message::Ack(incorrect_probe_number),
},
Vec::new(),
);
assert_eq!(Ok(()), foca.handle_data(&encode(data), &mut runtime));
assert_eq!(Ok(()), foca.handle_timer(send_indirect_probe, &mut runtime));
assert!(
!foca.probe().succeeded(),
"Ack with incorrect probe number should be discarded"
);
}
#[test]
fn probe_valid_indirect_ack_completes_succesfully() {
// Like `probe_ping_ack_cycle` but instead of a successful
// direct refutal via Ack, we'll stress the indirect mechanism
// that kicks off after SendIndirectProbe is accepted
let num_indirect_probes = config().num_indirect_probes.get();
// We create a cluser with _more_ active members than
// `num_indirect_probes + 1` so that we can verify that
// we don't send more requests than the configured value.
let (mut foca, probed, send_indirect_probe) =
craft_probing_foca((num_indirect_probes + 2) as u8, config());
let mut runtime = InMemoryRuntime::new();
// `probed` did NOT reply with an Ack before the timer
assert_eq!(Ok(()), foca.handle_timer(send_indirect_probe, &mut runtime));
let mut ping_req_dsts = Vec::new();
let all_data = runtime.take_all_data();
for (to, data) in all_data.into_iter() {
let (header, _updates) = decode(data);
if matches!(
header.message,
Message::PingReq {
target: _,
probe_number: _
}
) {
assert_ne!(
to, probed,
"Must not request a ping to the member being probed"
);
assert_ne!(
to,
foca.identity().clone(),
"Must not request a ping to ourselves"
);
ping_req_dsts.push(to);
}
}
assert_eq!(num_indirect_probes, ping_req_dsts.len());
runtime.clear();
// Now the probe can succeed via:
//
// 1. A direct ack coming from `probed`
// 2. An forwarded ack coming from ANY of the members we sent
//
// For this indirect scenario we'll verify that:
//
// 1. A ForwardedAck from a member we did NOT send a ping
// request to gets ignored
//
// 2. A well-formed ForwardedAck makes the probe succeed
let outsider = ID::new(42);
assert!(ping_req_dsts.iter().all(|id| id != &outsider));
let forwarded_ack = Message::ForwardedAck {
origin: probed,
probe_number: foca.probe().probe_number(),
};
assert_eq!(
Ok(()),
foca.handle_data(
&encode((
Header {
src: outsider,
src_incarnation: Incarnation::default(),
dst: ID::new(1),
message: forwarded_ack.clone(),
},
Vec::new(),
)),
&mut runtime,
)
);
assert!(
!foca.probe().succeeded(),
"Must not accept ForwardedAck from outsider"
);
for src in ping_req_dsts.into_iter() {
assert_eq!(
Ok(()),
foca.handle_data(
&encode((
Header {
src,
src_incarnation: Incarnation::default(),
dst: ID::new(1),
message: forwarded_ack.clone(),
},
Vec::new(),
)),
&mut runtime,
)
);
// Only one ack is necessary for the probe to succeed
assert!(
foca.probe().succeeded(),
"Probe should succeed with any expected ForwardedAck"
);
}
}
#[test]
fn probe_receiving_ping_replies_with_ack() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
let probe_number = foca.probe().probe_number();
let data = (
Header {
src: ID::new(2),
src_incarnation: Incarnation::default(),
dst: ID::new(1),
message: Message::Ping(probe_number),
},
Vec::new(),
);
assert_eq!(Ok(()), foca.handle_data(&encode(data), &mut runtime));
let (header, _updates) = decode(runtime.take_data(ID::new(2)).unwrap());
assert_eq!(header.message, Message::Ack(probe_number));
}
#[test]
fn probe_receiving_ping_req_sends_indirect_ping() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
let probe_number = foca.probe().probe_number();
let data = (
Header {
src: ID::new(2),
src_incarnation: Incarnation::default(),
dst: ID::new(1),
message: Message::PingReq {
target: ID::new(3),
probe_number,
},
},
Vec::new(),
);
assert_eq!(Ok(()), foca.handle_data(&encode(data), &mut runtime));
let (header, _updates) = decode(runtime.take_data(ID::new(3)).unwrap());
assert_eq!(
header.message,
Message::IndirectPing {
origin: ID::new(2),
probe_number
}
);
}
#[test]
fn probe_receiving_indirect_ping_sends_indirect_ack() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
let probe_number = foca.probe().probe_number();
let data = (
Header {
src: ID::new(2),
src_incarnation: Incarnation::default(),
dst: ID::new(1),
message: Message::IndirectPing {
origin: ID::new(3),
probe_number,
},
},
Vec::new(),
);
assert_eq!(Ok(()), foca.handle_data(&encode(data), &mut runtime));
let (header, _updates) = decode(runtime.take_data(ID::new(2)).unwrap());
assert_eq!(
header.message,
Message::IndirectAck {
target: ID::new(3),
probe_number
}
);
}
#[test]
fn probe_receiving_indirect_ack_sends_forwarded_ack() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
let probe_number = foca.probe().probe_number();
let data = (
Header {
src: ID::new(2),
src_incarnation: Incarnation::default(),
dst: ID::new(1),
message: Message::IndirectAck {
target: ID::new(3),
probe_number,
},
},
Vec::new(),
);
assert_eq!(Ok(()), foca.handle_data(&encode(data), &mut runtime));
let (header, _updates) = decode(runtime.take_data(ID::new(3)).unwrap());
assert_eq!(
header.message,
Message::ForwardedAck {
origin: ID::new(2),
probe_number
}
);
}
#[test]
fn message_from_aware_suspect_refutes_suspicion() -> Result<()> {
// Scenario: 3-member active cluster
// - One of the members will suspect the other two
// - Only one of the suspected members will learn about the suspicion
let mut herd = {
let mut herd = Vec::new();
let members = [
Member::alive(ID::new(1)),
Member::alive(ID::new(2)),
Member::alive(ID::new(3)),
];
for member in members.iter().rev() {
let mut foca = Foca::new(*member.id(), config(), rng(), codec());
foca.apply_many(members.iter().cloned(), InMemoryRuntime::new())?;
herd.push(foca)
}
herd
};
let mut foca_one = herd.pop().unwrap();
let mut foca_two = herd.pop().unwrap();
let mut foca_three = herd.pop().unwrap();
let one = *foca_one.identity();
let two = *foca_two.identity();
let three = *foca_three.identity();
// foca_one starts suspecting two and three
let mut runtime = InMemoryRuntime::new();
foca_one.apply(Member::suspect(two), &mut runtime)?;
foca_one.apply(Member::suspect(three), &mut runtime)?;
assert_eq!(2, foca_one.num_members());
// But only foca_three learns that its being suspected
// (Likely learned about ID=2 too, but that's irrelevant)
foca_three.apply(Member::suspect(three), &mut runtime)?;
// `foca_two` messages `foca_one`
runtime.clear();
assert_eq!(Ok(()), foca_two.announce(one, &mut runtime));
let data = runtime
.take_data(one)
.expect("foca_two sending data to ID::new(1)");
assert_eq!(Ok(()), foca_one.handle_data(&data, &mut runtime));
// same for `foca_three`
runtime.clear();
assert_eq!(Ok(()), foca_three.announce(one, &mut runtime));
let data = runtime
.take_data(one)
.expect("foca_three sending data to ID::new(1)");
assert_eq!(Ok(()), foca_one.handle_data(&data, &mut runtime));
// Now `foca_one` has received messages from both members
// and our runtime triggered the timer to change suspect
// member to down
// timer event related to `foca_two`
runtime.clear();
assert_eq!(
Ok(()),
foca_one.handle_timer(
Timer::ChangeSuspectToDown {
member_id: two,
incarnation: Incarnation::default(),
token: foca_one.timer_token()
},
&mut runtime
)
);
// foca_two hasn't refuted the suspicion, so `foca_one` should
// have marked it as down
expect_notification!(runtime, Notification::MemberDown(two));
assert_eq!(1, foca_one.num_members());
assert!(
foca_one.iter_members().all(|m| m.id() != &two),
"foca_two shouldn't be in the member list anymore"
);
// But `foca_three` knew about it, and its message should've
// been enough to remain active
assert_eq!(
Ok(()),
foca_one.handle_timer(
Timer::ChangeSuspectToDown {
member_id: three,
incarnation: Incarnation::default(),
token: foca_one.timer_token()
},
&mut runtime
)
);
assert_eq!(1, foca_one.num_members());
assert!(
foca_one.iter_members().any(|m| m.id() == &three),
"foca_three should have recovered"
);
Ok(())
}
#[test]
fn leave_cluster_gossips_about_our_death() -> Result<()> {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
foca.apply(Member::alive(ID::new(2)), &mut runtime)?;
assert_eq!(Ok(()), foca.leave_cluster(&mut runtime));
// Since we only have ID::new(2) as an active member, we know that
// `leave_cluster` should have sent a message to it
let (header, updates) = decode(
runtime
.take_data(ID::new(2))
.expect("No message for ID::new(2) found"),
);
assert_eq!(Message::Gossip, header.message);
assert!(
updates
.iter()
.any(|update| update.id() == &ID::new(1) && update.state() == State::Down),
"Gossip message should contain an update about our exit"
);
Ok(())
}
#[test]
fn leave_cluster_doesnt_gossip_to_duplicates() -> Result<()> {
// We want to gossip to 5 distict members when leaving
let config = Config {
num_indirect_probes: NonZeroUsize::new(5).unwrap(),
..Config::simple()
};
let mut foca = Foca::new(ID::new(1), config, rng(), codec());
let mut runtime = InMemoryRuntime::new();
// And only have one
foca.apply(Member::alive(ID::new(2)), &mut runtime)?;
assert_eq!(Ok(()), foca.leave_cluster(&mut runtime));
assert!(
runtime.take_data(ID::new(2)).is_some(),
"Should have one message for ID::new(2)"
);
assert!(
runtime.take_data(ID::new(2)).is_none(),
"But never more than one to the same member"
);
Ok(())
}
#[test]
fn auto_rejoin_behaviour() {
let mut foca = Foca::new(ID::new(1).rejoinable(), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
let updates = [
// New known members
Member::alive(ID::new(2)),
Member::alive(ID::new(3)),
Member::alive(ID::new(4)),
// Us, being down
Member::down(ID::new(1)),
];
assert_eq!(
Ok(()),
foca.apply_many(updates.iter().cloned(), &mut runtime)
);
// Change our identity
let expected_new_id = ID::new_with_bump(1, 1);
assert_eq!(&expected_new_id, foca.identity());
expect_notification!(runtime, Notification::Rejoin(expected_new_id));
reject_notification!(runtime, Notification::<ID>::Defunct);
// And disseminate our new identity to K members
let to_send = runtime.take_all_data();
assert!(to_send.into_iter().any(|(_dst, data)| {
let (header, _updates) = decode(data);
header.message == Message::Gossip
}));
}
// TODO duplicate renew() identity no error test
#[test]
fn more_data_than_allowed_causes_error() {
let config = config();
let max_bytes = config.max_packet_size.get();
let mut foca = Foca::new(ID::new(1), config, rng(), codec());
let large_data = vec![42u8; max_bytes + 1];
assert_eq!(
Err(Error::DataTooBig),
foca.handle_data(&large_data[..], InMemoryRuntime::new())
);
assert_eq!(Err(Error::DataTooBig), foca.add_broadcast(&large_data[..]));
}
#[test]
fn cant_use_broadcasts_by_default() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
assert!(foca.add_broadcast(b"foo").is_err());
}
#[test]
fn trailing_data_is_error() {
// We'll prepare some data that's actually valid
let valid_data = encode((
Header {
src: ID::new(2),
src_incarnation: Incarnation::default(),
dst: ID::new(1),
message: Message::Ping(0),
},
vec![Member::alive(ID::new(3)), Member::down(ID::new(4))],
));
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
assert_eq!(
Ok(()),
foca.handle_data(valid_data.as_ref(), InMemoryRuntime::new()),
"valid_data should be valid :-)"
);
// Now we'll append some rubbish to it, so that everything
// is still valid up the trash.
let mut bad_data = Vec::from(valid_data.as_ref());
// A single trailing byte should be enough to trigger an error
bad_data.push(0);
assert_eq!(
Err(Error::CustomBroadcast(anyhow::Error::msg(
BroadcastsDisabledError
))),
foca.handle_data(bad_data.as_ref(), InMemoryRuntime::new()),
);
}
#[test]
fn custom_broadcast() {
// Here we'll do some basic testing of the custom broadcast
// functionality.
// This the item that gets broadcast. It's pretty useless
// as it is: just an identifier and a version.
#[derive(Debug)]
struct VersionedKey {
// A realistic broadcast would contain actual data
// but we're not interested in the contents here,
// just how it all behaves.
data: [u8; 10],
}
impl VersionedKey {
fn new(key: u64, version: u16) -> Self {
let mut data = [0u8; 10];
let mut buf = &mut data[..];
buf.put_u64(key);
buf.put_u16(version);
Self { data }
}
fn key(&self) -> u64 {
let mut buf = &self.data[..];
buf.get_u64()
}
fn version(&self) -> u16 {
let mut buf = &self.data[8..];
buf.get_u16()
}
fn from_bytes(mut src: impl Buf) -> core::result::Result<Self, &'static str> {
if src.remaining() < 10 {
Err("buffer too small")
} else {
let mut data = [0u8; 10];
let mut buf = &mut data[..];
buf.put_u64(src.get_u64());
buf.put_u16(src.get_u16());
Ok(Self { data })
}
}
}
// Invalidation based on same key => higher version
impl Invalidates for VersionedKey {
fn invalidates(&self, other: &Self) -> bool {
self.key() == other.key() && self.version() > other.version()
}
}
impl AsRef<[u8]> for VersionedKey {
fn as_ref(&self) -> &[u8] {
&self.data[..]
}
}
// Notice how if we don't need to cache the full broadcast here
// if VersionKey was very large it wouldn't matter: all we care
// about here is whether the broadcast is new information or
// not.
use alloc::collections::BTreeMap;
struct Handler(BTreeMap<u64, u16>);
impl BroadcastHandler<ID> for Handler {
type Broadcast = VersionedKey;
type Error = &'static str;
fn receive_item(
&mut self,
data: impl Buf,
) -> core::result::Result<Option<Self::Broadcast>, Self::Error> {
let decoded = VersionedKey::from_bytes(data)?;
let is_new_information = self
.0
.get(&decoded.key())
// If the version we know about is smaller
.map(|&version| version < decoded.version())
// Or we never seen the key before
.unwrap_or(true);
if is_new_information {
self.0.insert(decoded.key(), decoded.version());
Ok(Some(decoded))
} else {
Ok(None)
}
}
fn should_add_broadcast_data(&self, member: &ID) -> bool {
// never broadcast to member ID=3
let blacklisted = ID::new(3);
!blacklisted.eq(member)
}
}
// Now we can get use our custom broadcasts
let mut foca = Foca::with_custom_broadcast(
ID::new(1),
config(),
rng(),
codec(),
Handler(BTreeMap::new()),
);
assert!(
foca.add_broadcast(b"hue").is_err(),
"Adding garbage shouldn't work"
);
assert_eq!(
Ok(()),
foca.add_broadcast(VersionedKey::new(420, 0).as_ref()),
);
assert_eq!(
1,
foca.custom_broadcast_backlog(),
"Adding a new custom broadcast should increase the backlog"
);
assert_eq!(
Ok(()),
foca.add_broadcast(VersionedKey::new(420, 1).as_ref()),
);
assert_eq!(
1,
foca.custom_broadcast_backlog(),
"But receiving a new version should simply replace the existing one"
);
// Let's add one more custom broadcast because testing with N=1
// is pretty lousy :-)
assert_eq!(
Ok(()),
foca.add_broadcast(VersionedKey::new(710, 1).as_ref()),
);
// Now let's see if the custom broadcasts actually get
// disseminated.
let other_id = ID::new(2);
let mut other_foca = Foca::with_custom_broadcast(
other_id,
config(),
rng(),
codec(),
Handler(BTreeMap::new()),
);
// Teach the original foca about this new `other_foca`
let mut runtime = InMemoryRuntime::new();
assert_eq!(Ok(()), foca.apply(Member::alive(other_id), &mut runtime));
// Now foca will talk to other_foca. The encoded data
// should contain our custom broadcasts.
assert_eq!(Ok(()), foca.gossip(&mut runtime));
let data_for_another_foca = runtime
.take_data(other_id)
.expect("foca only knows about other_foca");
assert_eq!(
0,
other_foca.custom_broadcast_backlog(),
"other_foca custom broadcast backlog should start empty"
);
assert_eq!(
Ok(()),
other_foca.handle_data(&data_for_another_foca, &mut runtime)
);
assert_eq!(
2,
other_foca.custom_broadcast_backlog(),
"Should have received two new custom broadcasts"
);
drop(data_for_another_foca);
// Now we'll talk to member ID=3, but since our handler
// has a custom implementation for `should_add_broadcast_data`
// that yields false for this identity, we want
// no _broadcast_ data to be sent to them, everything else
// should flow normally.
let isolated_member = ID::new(3);
let mut isolated_foca = Foca::with_custom_broadcast(
isolated_member,
config(),
rng(),
codec(),
Handler(BTreeMap::new()),
);
runtime.clear();
// Add the isolated member to the cluster
assert_eq!(
Ok(()),
foca.apply(Member::alive(isolated_member), &mut runtime)
);
// Since there are just a few members, calling gossip
// will definitely choose this member
assert_eq!(Ok(()), foca.gossip(&mut runtime));
let data_for_isolated_member = runtime
.take_data(isolated_member)
.expect("config has num_indirect_probes > 1");
assert_eq!(
Ok(()),
isolated_foca.handle_data(&data_for_isolated_member, &mut runtime)
);
assert_eq!(
0,
isolated_foca.custom_broadcast_backlog(),
"Should not have received any custom broadcast"
);
assert_eq!(
2,
isolated_foca.num_members(),
"But cluster updates should have arrived normally"
);
runtime.clear();
// `foca` (ID=1) is presently seeing two members:
// - ID=2, allowed to receive broadcasts
// - ID=3, which never receives broadcasts
// So if we call broadcast()
assert_eq!(Ok(()), foca.broadcast(&mut runtime));
// We NO message to the isolated member
assert!(runtime.take_data(isolated_member).is_none());
// And one Broadcast message to ID=2
let broadcast_message = runtime
.take_data(other_id)
.expect("shoud've sent a message to ID=2");
let header = codec()
.decode_header(&broadcast_message[..])
.expect("valid payload");
assert_eq!(
header.message,
Message::Broadcast,
"broadcast() should trigger Broadcast messages"
);
// And, of course, ID=2 should be able to handle
// such message
assert_eq!(
Ok(()),
other_foca.handle_data(&broadcast_message, &mut runtime)
);
}
#[test]
fn can_recover_from_incomplete_probe_cycle() {
// Here we get a foca in the middle of a probe cycle. The correct
// sequencing should submit `_send_indirect_probe`
let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2, config());
let mut runtime = InMemoryRuntime::new();
// ... but we'll manually craft a ProbeRandomMember event instead
// to trigger the validation failure
assert_eq!(
Err(Error::IncompleteProbeCycle),
foca.handle_timer(Timer::ProbeRandomMember(foca.timer_token()), &mut runtime)
);
// This situation should lead to two things happening:
// 1. the probe state should become valid again
assert!(foca.probe().validate(), "didn't recover probe state");
// 2. should've scheduled a new probe
assert!(
runtime
.find_scheduling(|t| matches!(t, Timer::ProbeRandomMember(_)))
.is_some(),
"didn't submit a new probe event"
);
}
#[test]
fn declaring_a_member_as_down_notifies_them() {
let config = {
let mut c = Config::simple();
c.notify_down_members = true;
c
};
let (mut foca, probed, send_indirect_probe) = craft_probing_foca(2, config);
let mut runtime = InMemoryRuntime::new();
// `probed` did NOT reply with an Ack before the timer
assert_eq!(Ok(()), foca.handle_timer(send_indirect_probe, &mut runtime));
// ... and nothing happens for the indirect cycle
runtime.clear();
// So by the time the ChangeSuspectToDown timer fires
assert_eq!(
Ok(()),
foca.handle_timer(
Timer::ChangeSuspectToDown {
member_id: probed,
incarnation: Incarnation::default(),
token: foca.timer_token()
},
&mut runtime
)
);
// The runtime should be instructed to send a TurnUndead message to `probed`
expect_message!(runtime, probed, Message::<ID>::TurnUndead);
}
#[test]
fn message_from_down_member_is_replied_with_turn_undead() {
let config = {
let mut c = config();
c.notify_down_members = true;
c
};
let mut runtime = InMemoryRuntime::new();
// We have a simple foca instance
let mut foca = Foca::new(ID::new(1), config, rng(), codec());
let down_id = ID::new(2);
// That knows that ID=2 is down
assert_eq!(Ok(()), foca.apply(Member::down(down_id), &mut runtime));
// And we have a message from member ID=2 to ID=1
let header = Header {
src: down_id,
src_incarnation: 1,
dst: ID::new(1),
message: Message::Announce,
};
let mut msg = Vec::new();
codec()
.encode_header(&header, &mut msg)
.expect("codec works fine");
// When foca receives such message
assert_eq!(Ok(()), foca.handle_data(&msg[..], &mut runtime));
// It should send a message to ID=2 notifying it
expect_message!(runtime, down_id, Message::<ID>::TurnUndead);
}
// There are multiple "do this thing periodically" settings. This
// helps test those. Takes:
// - something that knows which configuration to set
// - something that knows which event should be sent
// - the message that should be sent
fn check_periodic_behaviour<F, G>(config_setter: F, mut event_maker: G, message: Message<ID>)
where
F: Fn(&mut Config, config::PeriodicParams),
G: FnMut(TimerToken) -> Timer<ID>,
{
let frequency = Duration::from_millis(500);
let num_members = NonZeroUsize::new(2).unwrap();
let params = config::PeriodicParams {
frequency,
num_members,
};
let mut config = config();
// A foca with the given periodic config
config_setter(&mut config, params);
let mut foca = Foca::new(ID::new(1), config, rng(), codec());
let mut runtime = InMemoryRuntime::new();
// When it becomes active (i.e.: has at least one active member)
assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(2)), &mut runtime));
assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(3)), &mut runtime));
// Should schedule the given event
expect_scheduling!(runtime, event_maker(foca.timer_token()), frequency);
runtime.clear();
// After the event fires
assert_eq!(
Ok(()),
foca.handle_timer(event_maker(foca.timer_token()), &mut runtime)
);
// It should've scheduled the event again
expect_scheduling!(runtime, event_maker(foca.timer_token()), frequency);
// And sent the message to `num_members` random members
// (since num_members=2 and this instance only knows about two, we know
// which should've been picked)
expect_message!(runtime, ID::new(2), message);
expect_message!(runtime, ID::new(3), message);
}
#[test]
fn periodic_gossip_behaviour() {
check_periodic_behaviour(
|c: &mut Config, p: config::PeriodicParams| {
c.periodic_gossip = Some(p);
},
|t: TimerToken| -> Timer<ID> { Timer::PeriodicGossip(t) },
Message::Gossip,
);
}
#[test]
fn periodic_announce_behaviour() {
check_periodic_behaviour(
|c: &mut Config, p: config::PeriodicParams| {
c.periodic_announce = Some(p);
},
|t: TimerToken| -> Timer<ID> { Timer::PeriodicAnnounce(t) },
Message::Announce,
);
}
#[test]
fn periodic_announce_cannot_be_enabled_at_runtime() {
let mut c = config();
assert!(c.periodic_announce.is_none());
// A foca instance that's running without periodic announce
let mut foca = Foca::new(ID::new(1), c.clone(), rng(), codec());
c.periodic_announce = Some(config::PeriodicParams {
frequency: Duration::from_secs(5),
num_members: NonZeroUsize::new(1).unwrap(),
});
// Must not be able to enable it during runtime
assert_eq!(Err(Error::InvalidConfig), foca.set_config(c.clone()));
// However, a foca that starts with periodic announce enabled
let mut foca = Foca::new(ID::new(1), c, rng(), codec());
// Is able to turn it off
assert_eq!(Ok(()), foca.set_config(config()));
}
#[test]
fn periodic_gossip_cannot_be_enabled_at_runtime() {
let mut c = config();
assert!(c.periodic_gossip.is_none());
// A foca instance that's running without periodic gossip
let mut foca = Foca::new(ID::new(1), c.clone(), rng(), codec());
c.periodic_gossip = Some(config::PeriodicParams {
frequency: Duration::from_secs(5),
num_members: NonZeroUsize::new(1).unwrap(),
});
// Must not be able to enable it during runtime
assert_eq!(Err(Error::InvalidConfig), foca.set_config(c.clone()));
// However, a foca that starts with periodic gossip enabled
let mut foca = Foca::new(ID::new(1), c, rng(), codec());
// Is able to turn it off
assert_eq!(Ok(()), foca.set_config(config()));
}
#[test]
fn cannot_learn_about_own_previous_identity() {
// We have an identity
let id = ID::new(1).rejoinable();
// And it's renewed version
let renewed = id.renew().unwrap();
// So that they are not the same
assert_ne!(id, renewed);
// But have the same prefix
assert!(id.has_same_prefix(&renewed));
// If we have an instance running with the renewed
// id as its identity
let mut foca = Foca::new(renewed, config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
// Learning anything about its previous identity
assert_eq!(
Ok(()),
foca.apply_many(core::iter::once(Member::alive(id)), &mut runtime)
);
// shouldn't change the cluster state
assert_eq!(
0,
foca.num_members(),
"shouldn't have considered a previous identity as a new member"
);
}
// assuming fixed-length identity
fn encoded_feed_header_len() -> usize {
let header = Header {
src: ID::new(1),
src_incarnation: 0,
dst: ID::new(3),
message: Message::Feed,
};
let mut msg = Vec::new();
codec()
.encode_header(&header, &mut msg)
.expect("codec works fine");
msg.len()
}
#[test]
fn feed_does_not_contain_trailing_jumk() {
let mut config = config();
// we want a max packet size that can definitely fit
// feed header, a u16 (num_updates) and not enough
// to fit the rest of the message
// This way we can exercise what happens when encoding
// a feed message goes above the max length
config.max_packet_size = NonZeroUsize::new(
encoded_feed_header_len()
// num_updates
+ 2
// so there's SOME extra space to try and encode a Member
// but not enough to fit all the metadata
+2,
)
.expect("non-zero");
// So now we will craft a scenario where one instance
// announces to another and then verify that we can handle
// the reply with no errors
let mut foca_one = Foca::new(ID::new(1), config.clone(), rng(), codec());
let mut runtime = InMemoryRuntime::new();
// let's assume that foca_one knows about another member, ID=3'
// so that the feed reply contains at least one member
assert_eq!(
Ok(()),
foca_one.apply(Member::alive(ID::new(3)), &mut runtime)
);
// ID=2 announces to our instance
let msg = encode((
Header {
src: ID::new(2),
src_incarnation: Incarnation::default(),
dst: ID::new(1),
message: Message::Announce,
},
Vec::default(),
));
assert_eq!(Ok(()), foca_one.handle_data(&msg, &mut runtime));
// now the runtime should've been instructed to send a feed to
// ID=2
let data = runtime
.take_data(ID::new(2))
.expect("foca_one reply for foca_two");
let mut foca_two = Foca::new(ID::new(2), config, rng(), codec());
// and foca_two should be able to read it just fine
// (originally would fail with BroadcastsDisabledError)
assert_eq!(Ok(()), foca_two.handle_data(&data, &mut runtime));
}
}