#![doc = include_str!("../README.md")]
#![doc(html_logo_url = "https://raw.githubusercontent.com/al8n/memberlist/main/art/logo_72x72.png")]
#![forbid(unsafe_code)]
#![deny(warnings, missing_docs)]
#![allow(clippy::type_complexity, unexpected_cfgs)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(docsrs, allow(unused_attributes))]
mod api;
mod awareness;
mod base;
mod broadcast;
mod network;
mod options;
mod state;
mod suspicion;
pub mod delegate;
pub mod error;
#[cfg(feature = "encryption")]
#[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
pub mod keyring;
pub mod proto;
pub mod queue;
pub mod transport;
pub mod util;
pub use agnostic_lite;
pub use base::*;
pub use broadcast::*;
pub use bytes;
pub use futures;
#[cfg(feature = "metrics")]
#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
pub use metrics;
pub use network::META_MAX_SIZE;
pub use nodecraft::CheapClone;
pub use options::Options;
pub use tracing;
use std::time::Duration;
#[cfg(windows)]
type Epoch = system_epoch::SystemTimeEpoch;
#[cfg(not(windows))]
type Epoch = instant_epoch::InstantEpoch;
#[cfg(windows)]
mod system_epoch {
use super::*;
use std::time::SystemTime;
type SystemTimeEpochInner = SystemTime;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct SystemTimeEpoch(SystemTimeEpochInner);
impl core::fmt::Debug for SystemTimeEpoch {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.0.fmt(f)
}
}
impl core::ops::Sub for SystemTimeEpoch {
type Output = Duration;
fn sub(self, rhs: Self) -> Duration {
self.0.duration_since(rhs.0).unwrap()
}
}
impl core::ops::Sub<Duration> for SystemTimeEpoch {
type Output = Self;
fn sub(self, rhs: Duration) -> Self {
Self(self.0 - rhs)
}
}
impl core::ops::SubAssign<Duration> for SystemTimeEpoch {
fn sub_assign(&mut self, rhs: Duration) {
self.0 -= rhs;
}
}
impl core::ops::Add<Duration> for SystemTimeEpoch {
type Output = Self;
fn add(self, rhs: Duration) -> Self {
SystemTimeEpoch(self.0 + rhs)
}
}
impl core::ops::AddAssign<Duration> for SystemTimeEpoch {
fn add_assign(&mut self, rhs: Duration) {
self.0 += rhs;
}
}
impl SystemTimeEpoch {
pub(crate) fn now() -> Self {
Self(SystemTimeEpochInner::now())
}
pub(crate) fn elapsed(&self) -> Duration {
self.0.elapsed().unwrap()
}
#[cfg(any(feature = "test", test))]
pub(crate) fn checked_duration_since(&self, earlier: Self) -> Option<Duration> {
self.0.duration_since(earlier.0).ok()
}
}
}
#[cfg(not(windows))]
mod instant_epoch {
use super::*;
use std::time::Instant;
type InstantEpochInner = Instant;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct InstantEpoch(InstantEpochInner);
impl core::fmt::Debug for InstantEpoch {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.0.fmt(f)
}
}
impl core::ops::Sub for InstantEpoch {
type Output = Duration;
fn sub(self, rhs: Self) -> Duration {
self.0 - rhs.0
}
}
impl core::ops::Sub<Duration> for InstantEpoch {
type Output = Self;
fn sub(self, rhs: Duration) -> Self {
Self(self.0 - rhs)
}
}
impl core::ops::SubAssign<Duration> for InstantEpoch {
fn sub_assign(&mut self, rhs: Duration) {
self.0 -= rhs;
}
}
impl core::ops::Add<Duration> for InstantEpoch {
type Output = Self;
fn add(self, rhs: Duration) -> Self {
InstantEpoch(self.0 + rhs)
}
}
impl core::ops::AddAssign<Duration> for InstantEpoch {
fn add_assign(&mut self, rhs: Duration) {
self.0 += rhs;
}
}
impl InstantEpoch {
pub(crate) fn now() -> Self {
Self(InstantEpochInner::now())
}
pub(crate) fn elapsed(&self) -> Duration {
self.0.elapsed()
}
#[cfg(any(feature = "test", test))]
pub(crate) fn checked_duration_since(&self, earlier: Self) -> Option<Duration> {
self.0.checked_duration_since(earlier.0)
}
}
}
#[cfg(feature = "test")]
#[cfg_attr(docsrs, doc(cfg(feature = "test")))]
pub mod tests {
use std::net::SocketAddr;
#[cfg(not(windows))]
use parking_lot::Mutex;
pub use paste;
use self::{delegate::Delegate, error::Error, transport::Transport};
use super::*;
pub mod state {
pub use crate::state::tests::*;
}
pub mod memberlist {
pub use crate::base::tests::*;
}
#[cfg(any(feature = "test", test))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "test", test))))]
#[macro_export]
#[doc(hidden)]
macro_rules! unit_tests {
($runtime:ty => $run:ident($($fn:ident), +$(,)?)) => {
$(
::memberlist_core::tests::paste::paste! {
#[test]
fn [< test_ $fn >] () {
$run($fn::<$runtime>());
}
}
)*
};
}
#[cfg(any(feature = "test", test))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "test", test))))]
#[macro_export]
#[doc(hidden)]
macro_rules! unit_tests_with_expr {
($run:ident($(
$(#[$outer:meta])*
$fn:ident( $expr:expr )
), +$(,)?)) => {
$(
::memberlist_core::tests::paste::paste! {
#[test]
$(#[$outer])*
fn [< test_ $fn >] () {
$run(async move {
$expr
});
}
}
)*
};
}
pub type AnyError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub enum AddressKind {
V4,
V6,
}
impl core::fmt::Display for AddressKind {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::V4 => write!(f, "v4"),
Self::V6 => write!(f, "v6"),
}
}
}
impl AddressKind {
pub fn next(&self, network: u8) -> SocketAddr {
match self {
Self::V4 => next_socket_addr_v4(network),
Self::V6 => next_socket_addr_v6(),
}
}
}
#[cfg(not(windows))]
static IPV4_BIND_NUM: Mutex<usize> = Mutex::new(10);
#[cfg(not(windows))]
static IPV6_BIND_NUM: Mutex<usize> = Mutex::new(10);
pub fn next_socket_addr_v4(_network: u8) -> SocketAddr {
#[cfg(not(windows))]
{
let mut mu = IPV4_BIND_NUM.lock();
let addr: SocketAddr = format!("127.0.{}.{}:0", _network, *mu).parse().unwrap();
*mu += 1;
if *mu > 255 {
*mu = 10;
}
addr
}
#[cfg(windows)]
"127.0.0.1:0".parse().unwrap()
}
pub fn next_socket_addr_v6() -> SocketAddr {
#[cfg(not(windows))]
{
let mut mu = IPV6_BIND_NUM.lock();
let addr: SocketAddr = format!("[fc00::1:{}]:0", *mu).parse().unwrap();
*mu += 1;
if *mu > 255 {
*mu = 10;
}
addr
}
#[cfg(windows)]
"[::1]:0".parse().unwrap()
}
pub fn run<B, F>(block_on: B, fut: F)
where
B: FnOnce(F) -> F::Output,
F: std::future::Future<Output = ()>,
{
block_on(fut);
}
pub fn initialize_tests_tracing() {
use std::sync::Once;
static TRACE: Once = Once::new();
TRACE.call_once(|| {
let filter = std::env::var("MEMBERLIST_TESTING_LOG").unwrap_or_else(|_| "info".to_owned());
tracing::subscriber::set_global_default(
tracing_subscriber::fmt::fmt()
.without_time()
.with_line_number(true)
.with_env_filter(filter)
.with_file(false)
.with_target(true)
.with_ansi(true)
.finish(),
)
.unwrap();
});
}
pub async fn get_memberlist<T, D>(
t: T,
d: D,
opts: Options,
) -> Result<Memberlist<T, D>, Error<T, D>>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
crate::Memberlist::new_in(t, Some(d), opts)
.await
.map(|(_, _, this)| this)
}
}