memberlist_core/
lib.rs

1#![doc = include_str!("../README.md")]
2#![doc(html_logo_url = "https://raw.githubusercontent.com/al8n/memberlist/main/art/logo_72x72.png")]
3#![forbid(unsafe_code)]
4#![deny(warnings, missing_docs)]
5#![allow(clippy::type_complexity, unexpected_cfgs)]
6#![cfg_attr(docsrs, feature(doc_cfg))]
7#![cfg_attr(docsrs, allow(unused_attributes))]
8
9mod api;
10mod awareness;
11mod base;
12mod broadcast;
13mod network;
14mod options;
15mod state;
16mod suspicion;
17
18/// Trait can be implemented to hook into the memberlist lifecycle.
19pub mod delegate;
20/// Error related to memberlist
21pub mod error;
22/// The keyring implementation.
23#[cfg(feature = "encryption")]
24#[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
25pub mod keyring;
26/// The types used in memberlist
27pub mod proto;
28/// The transimit queue implementation.
29pub mod queue;
30/// The transport layer for memberlist
31pub mod transport;
32/// The utils used in memberlist
33pub mod util;
34
35pub use agnostic_lite;
36pub use base::*;
37pub use broadcast::*;
38pub use bytes;
39pub use futures;
40#[cfg(feature = "metrics")]
41#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
42pub use metrics;
43pub use network::META_MAX_SIZE;
44pub use nodecraft::CheapClone;
45pub use options::Options;
46pub use tracing;
47
48use std::time::Duration;
49
50#[cfg(windows)]
51type Epoch = system_epoch::SystemTimeEpoch;
52
53#[cfg(not(windows))]
54type Epoch = instant_epoch::InstantEpoch;
55
56#[cfg(windows)]
57mod system_epoch {
58  use super::*;
59  use std::time::SystemTime;
60
61  type SystemTimeEpochInner = SystemTime;
62
63  #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
64  pub(crate) struct SystemTimeEpoch(SystemTimeEpochInner);
65
66  impl core::fmt::Debug for SystemTimeEpoch {
67    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
68      self.0.fmt(f)
69    }
70  }
71
72  impl core::ops::Sub for SystemTimeEpoch {
73    type Output = Duration;
74
75    fn sub(self, rhs: Self) -> Duration {
76      self.0.duration_since(rhs.0).unwrap()
77    }
78  }
79
80  impl core::ops::Sub<Duration> for SystemTimeEpoch {
81    type Output = Self;
82
83    fn sub(self, rhs: Duration) -> Self {
84      Self(self.0 - rhs)
85    }
86  }
87
88  impl core::ops::SubAssign<Duration> for SystemTimeEpoch {
89    fn sub_assign(&mut self, rhs: Duration) {
90      self.0 -= rhs;
91    }
92  }
93
94  impl core::ops::Add<Duration> for SystemTimeEpoch {
95    type Output = Self;
96
97    fn add(self, rhs: Duration) -> Self {
98      SystemTimeEpoch(self.0 + rhs)
99    }
100  }
101
102  impl core::ops::AddAssign<Duration> for SystemTimeEpoch {
103    fn add_assign(&mut self, rhs: Duration) {
104      self.0 += rhs;
105    }
106  }
107
108  impl SystemTimeEpoch {
109    pub(crate) fn now() -> Self {
110      Self(SystemTimeEpochInner::now())
111    }
112
113    pub(crate) fn elapsed(&self) -> Duration {
114      self.0.elapsed().unwrap()
115    }
116
117    #[cfg(any(feature = "test", test))]
118    pub(crate) fn checked_duration_since(&self, earlier: Self) -> Option<Duration> {
119      self.0.duration_since(earlier.0).ok()
120    }
121  }
122}
123
124#[cfg(not(windows))]
125mod instant_epoch {
126  use super::*;
127  use std::time::Instant;
128
129  type InstantEpochInner = Instant;
130
131  #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
132  pub(crate) struct InstantEpoch(InstantEpochInner);
133
134  impl core::fmt::Debug for InstantEpoch {
135    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
136      self.0.fmt(f)
137    }
138  }
139
140  impl core::ops::Sub for InstantEpoch {
141    type Output = Duration;
142
143    fn sub(self, rhs: Self) -> Duration {
144      self.0 - rhs.0
145    }
146  }
147
148  impl core::ops::Sub<Duration> for InstantEpoch {
149    type Output = Self;
150
151    fn sub(self, rhs: Duration) -> Self {
152      Self(self.0 - rhs)
153    }
154  }
155
156  impl core::ops::SubAssign<Duration> for InstantEpoch {
157    fn sub_assign(&mut self, rhs: Duration) {
158      self.0 -= rhs;
159    }
160  }
161
162  impl core::ops::Add<Duration> for InstantEpoch {
163    type Output = Self;
164
165    fn add(self, rhs: Duration) -> Self {
166      InstantEpoch(self.0 + rhs)
167    }
168  }
169
170  impl core::ops::AddAssign<Duration> for InstantEpoch {
171    fn add_assign(&mut self, rhs: Duration) {
172      self.0 += rhs;
173    }
174  }
175
176  impl InstantEpoch {
177    pub(crate) fn now() -> Self {
178      Self(InstantEpochInner::now())
179    }
180
181    pub(crate) fn elapsed(&self) -> Duration {
182      self.0.elapsed()
183    }
184
185    #[cfg(any(feature = "test", test))]
186    pub(crate) fn checked_duration_since(&self, earlier: Self) -> Option<Duration> {
187      self.0.checked_duration_since(earlier.0)
188    }
189  }
190}
191
192/// All unit test fns are exported in the `tests` module.
193/// This module is used for users want to use other async runtime,
194/// and want to use the test if memberlist also works with their runtime.
195#[cfg(feature = "test")]
196#[cfg_attr(docsrs, doc(cfg(feature = "test")))]
197pub mod tests {
198  use std::net::SocketAddr;
199
200  #[cfg(not(windows))]
201  use parking_lot::Mutex;
202  pub use paste;
203
204  use self::{delegate::Delegate, error::Error, transport::Transport};
205  use super::*;
206
207  /// Re-export the all unit test cases for state
208  pub mod state {
209    pub use crate::state::tests::*;
210  }
211
212  /// Re-export the all unit test cases for memberlist
213  pub mod memberlist {
214    pub use crate::base::tests::*;
215  }
216
217  #[cfg(any(feature = "test", test))]
218  #[cfg_attr(docsrs, doc(cfg(any(feature = "test", test))))]
219  #[macro_export]
220  #[doc(hidden)]
221  macro_rules! unit_tests {
222    ($runtime:ty => $run:ident($($fn:ident), +$(,)?)) => {
223      $(
224        ::memberlist_core::tests::paste::paste! {
225          #[test]
226          fn [< test_ $fn >] () {
227            $run($fn::<$runtime>());
228          }
229        }
230      )*
231    };
232  }
233
234  #[cfg(any(feature = "test", test))]
235  #[cfg_attr(docsrs, doc(cfg(any(feature = "test", test))))]
236  #[macro_export]
237  #[doc(hidden)]
238  macro_rules! unit_tests_with_expr {
239    ($run:ident($(
240      $(#[$outer:meta])*
241      $fn:ident( $expr:expr )
242    ), +$(,)?)) => {
243      $(
244        ::memberlist_core::tests::paste::paste! {
245          #[test]
246          $(#[$outer])*
247          fn [< test_ $fn >] () {
248            $run(async move {
249              $expr
250            });
251          }
252        }
253      )*
254    };
255  }
256
257  /// Any error type used for testing.
258  pub type AnyError = Box<dyn std::error::Error + Send + Sync + 'static>;
259
260  /// The kind of address
261  pub enum AddressKind {
262    /// V4
263    V4,
264    /// V6
265    V6,
266  }
267
268  impl core::fmt::Display for AddressKind {
269    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
270      match self {
271        Self::V4 => write!(f, "v4"),
272        Self::V6 => write!(f, "v6"),
273      }
274    }
275  }
276
277  impl AddressKind {
278    /// Get the next address
279    pub fn next(&self, network: u8) -> SocketAddr {
280      match self {
281        Self::V4 => next_socket_addr_v4(network),
282        Self::V6 => next_socket_addr_v6(),
283      }
284    }
285  }
286
287  #[cfg(not(windows))]
288  static IPV4_BIND_NUM: Mutex<usize> = Mutex::new(10);
289  #[cfg(not(windows))]
290  static IPV6_BIND_NUM: Mutex<usize> = Mutex::new(10);
291
292  /// Returns the next socket addr v4
293  pub fn next_socket_addr_v4(_network: u8) -> SocketAddr {
294    #[cfg(not(windows))]
295    {
296      let mut mu = IPV4_BIND_NUM.lock();
297      let addr: SocketAddr = format!("127.0.{}.{}:0", _network, *mu).parse().unwrap();
298      *mu += 1;
299      if *mu > 255 {
300        *mu = 10;
301      }
302
303      addr
304    }
305
306    #[cfg(windows)]
307    "127.0.0.1:0".parse().unwrap()
308  }
309
310  /// Returns the next socket addr v6
311  pub fn next_socket_addr_v6() -> SocketAddr {
312    #[cfg(not(windows))]
313    {
314      let mut mu = IPV6_BIND_NUM.lock();
315      let addr: SocketAddr = format!("[fc00::1:{}]:0", *mu).parse().unwrap();
316      *mu += 1;
317      if *mu > 255 {
318        *mu = 10;
319      }
320
321      addr
322    }
323
324    #[cfg(windows)]
325    "[::1]:0".parse().unwrap()
326  }
327
328  /// Run the unit test with a given async runtime sequentially.
329  pub fn run<B, F>(block_on: B, fut: F)
330  where
331    B: FnOnce(F) -> F::Output,
332    F: std::future::Future<Output = ()>,
333  {
334    // initialize_tests_tracing();
335    block_on(fut);
336  }
337
338  /// Initialize the tracing for the unit tests.
339  pub fn initialize_tests_tracing() {
340    use std::sync::Once;
341    static TRACE: Once = Once::new();
342    TRACE.call_once(|| {
343      let filter = std::env::var("MEMBERLIST_TESTING_LOG").unwrap_or_else(|_| "info".to_owned());
344      tracing::subscriber::set_global_default(
345        tracing_subscriber::fmt::fmt()
346          .without_time()
347          .with_line_number(true)
348          .with_env_filter(filter)
349          .with_file(false)
350          .with_target(true)
351          .with_ansi(true)
352          .finish(),
353      )
354      .unwrap();
355    });
356  }
357
358  /// Returns a [`Memberlist`] but not alive self for testing purposes.
359  pub async fn get_memberlist<T, D>(
360    t: T,
361    d: D,
362    opts: Options,
363  ) -> Result<Memberlist<T, D>, Error<T, D>>
364  where
365    D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
366    T: Transport,
367  {
368    crate::Memberlist::new_in(t, Some(d), opts)
369      .await
370      .map(|(_, _, this)| this)
371  }
372}