1#![doc = include_str!("../README.md")]
2#![doc(html_logo_url = "https://raw.githubusercontent.com/al8n/memberlist/main/art/logo_72x72.png")]
3#![forbid(unsafe_code)]
4#![deny(warnings, missing_docs)]
5#![allow(clippy::type_complexity, unexpected_cfgs)]
6#![cfg_attr(docsrs, feature(doc_cfg))]
7#![cfg_attr(docsrs, allow(unused_attributes))]
8
9mod api;
10mod awareness;
11mod base;
12mod broadcast;
13mod network;
14mod options;
15mod state;
16mod suspicion;
17
18pub mod delegate;
20pub mod error;
22#[cfg(feature = "encryption")]
24#[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
25pub mod keyring;
26pub mod proto;
28pub mod queue;
30pub mod transport;
32pub mod util;
34
35pub use agnostic_lite;
36pub use base::*;
37pub use broadcast::*;
38pub use bytes;
39pub use futures;
40#[cfg(feature = "metrics")]
41#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
42pub use metrics;
43pub use network::META_MAX_SIZE;
44pub use nodecraft::CheapClone;
45pub use options::Options;
46pub use tracing;
47
48use std::time::Duration;
49
50#[cfg(windows)]
51type Epoch = system_epoch::SystemTimeEpoch;
52
53#[cfg(not(windows))]
54type Epoch = instant_epoch::InstantEpoch;
55
56#[cfg(windows)]
57mod system_epoch {
58 use super::*;
59 use std::time::SystemTime;
60
61 type SystemTimeEpochInner = SystemTime;
62
63 #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
64 pub(crate) struct SystemTimeEpoch(SystemTimeEpochInner);
65
66 impl core::fmt::Debug for SystemTimeEpoch {
67 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
68 self.0.fmt(f)
69 }
70 }
71
72 impl core::ops::Sub for SystemTimeEpoch {
73 type Output = Duration;
74
75 fn sub(self, rhs: Self) -> Duration {
76 self.0.duration_since(rhs.0).unwrap()
77 }
78 }
79
80 impl core::ops::Sub<Duration> for SystemTimeEpoch {
81 type Output = Self;
82
83 fn sub(self, rhs: Duration) -> Self {
84 Self(self.0 - rhs)
85 }
86 }
87
88 impl core::ops::SubAssign<Duration> for SystemTimeEpoch {
89 fn sub_assign(&mut self, rhs: Duration) {
90 self.0 -= rhs;
91 }
92 }
93
94 impl core::ops::Add<Duration> for SystemTimeEpoch {
95 type Output = Self;
96
97 fn add(self, rhs: Duration) -> Self {
98 SystemTimeEpoch(self.0 + rhs)
99 }
100 }
101
102 impl core::ops::AddAssign<Duration> for SystemTimeEpoch {
103 fn add_assign(&mut self, rhs: Duration) {
104 self.0 += rhs;
105 }
106 }
107
108 impl SystemTimeEpoch {
109 pub(crate) fn now() -> Self {
110 Self(SystemTimeEpochInner::now())
111 }
112
113 pub(crate) fn elapsed(&self) -> Duration {
114 self.0.elapsed().unwrap()
115 }
116
117 #[cfg(any(feature = "test", test))]
118 pub(crate) fn checked_duration_since(&self, earlier: Self) -> Option<Duration> {
119 self.0.duration_since(earlier.0).ok()
120 }
121 }
122}
123
124#[cfg(not(windows))]
125mod instant_epoch {
126 use super::*;
127 use std::time::Instant;
128
129 type InstantEpochInner = Instant;
130
131 #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
132 pub(crate) struct InstantEpoch(InstantEpochInner);
133
134 impl core::fmt::Debug for InstantEpoch {
135 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
136 self.0.fmt(f)
137 }
138 }
139
140 impl core::ops::Sub for InstantEpoch {
141 type Output = Duration;
142
143 fn sub(self, rhs: Self) -> Duration {
144 self.0 - rhs.0
145 }
146 }
147
148 impl core::ops::Sub<Duration> for InstantEpoch {
149 type Output = Self;
150
151 fn sub(self, rhs: Duration) -> Self {
152 Self(self.0 - rhs)
153 }
154 }
155
156 impl core::ops::SubAssign<Duration> for InstantEpoch {
157 fn sub_assign(&mut self, rhs: Duration) {
158 self.0 -= rhs;
159 }
160 }
161
162 impl core::ops::Add<Duration> for InstantEpoch {
163 type Output = Self;
164
165 fn add(self, rhs: Duration) -> Self {
166 InstantEpoch(self.0 + rhs)
167 }
168 }
169
170 impl core::ops::AddAssign<Duration> for InstantEpoch {
171 fn add_assign(&mut self, rhs: Duration) {
172 self.0 += rhs;
173 }
174 }
175
176 impl InstantEpoch {
177 pub(crate) fn now() -> Self {
178 Self(InstantEpochInner::now())
179 }
180
181 pub(crate) fn elapsed(&self) -> Duration {
182 self.0.elapsed()
183 }
184
185 #[cfg(any(feature = "test", test))]
186 pub(crate) fn checked_duration_since(&self, earlier: Self) -> Option<Duration> {
187 self.0.checked_duration_since(earlier.0)
188 }
189 }
190}
191
192#[cfg(feature = "test")]
196#[cfg_attr(docsrs, doc(cfg(feature = "test")))]
197pub mod tests {
198 use std::net::SocketAddr;
199
200 #[cfg(not(windows))]
201 use parking_lot::Mutex;
202 pub use paste;
203
204 use self::{delegate::Delegate, error::Error, transport::Transport};
205 use super::*;
206
207 pub mod state {
209 pub use crate::state::tests::*;
210 }
211
212 pub mod memberlist {
214 pub use crate::base::tests::*;
215 }
216
217 #[cfg(any(feature = "test", test))]
218 #[cfg_attr(docsrs, doc(cfg(any(feature = "test", test))))]
219 #[macro_export]
220 #[doc(hidden)]
221 macro_rules! unit_tests {
222 ($runtime:ty => $run:ident($($fn:ident), +$(,)?)) => {
223 $(
224 ::memberlist_core::tests::paste::paste! {
225 #[test]
226 fn [< test_ $fn >] () {
227 $run($fn::<$runtime>());
228 }
229 }
230 )*
231 };
232 }
233
234 #[cfg(any(feature = "test", test))]
235 #[cfg_attr(docsrs, doc(cfg(any(feature = "test", test))))]
236 #[macro_export]
237 #[doc(hidden)]
238 macro_rules! unit_tests_with_expr {
239 ($run:ident($(
240 $(#[$outer:meta])*
241 $fn:ident( $expr:expr )
242 ), +$(,)?)) => {
243 $(
244 ::memberlist_core::tests::paste::paste! {
245 #[test]
246 $(#[$outer])*
247 fn [< test_ $fn >] () {
248 $run(async move {
249 $expr
250 });
251 }
252 }
253 )*
254 };
255 }
256
257 pub type AnyError = Box<dyn std::error::Error + Send + Sync + 'static>;
259
260 pub enum AddressKind {
262 V4,
264 V6,
266 }
267
268 impl core::fmt::Display for AddressKind {
269 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
270 match self {
271 Self::V4 => write!(f, "v4"),
272 Self::V6 => write!(f, "v6"),
273 }
274 }
275 }
276
277 impl AddressKind {
278 pub fn next(&self, network: u8) -> SocketAddr {
280 match self {
281 Self::V4 => next_socket_addr_v4(network),
282 Self::V6 => next_socket_addr_v6(),
283 }
284 }
285 }
286
287 #[cfg(not(windows))]
288 static IPV4_BIND_NUM: Mutex<usize> = Mutex::new(10);
289 #[cfg(not(windows))]
290 static IPV6_BIND_NUM: Mutex<usize> = Mutex::new(10);
291
292 pub fn next_socket_addr_v4(_network: u8) -> SocketAddr {
294 #[cfg(not(windows))]
295 {
296 let mut mu = IPV4_BIND_NUM.lock();
297 let addr: SocketAddr = format!("127.0.{}.{}:0", _network, *mu).parse().unwrap();
298 *mu += 1;
299 if *mu > 255 {
300 *mu = 10;
301 }
302
303 addr
304 }
305
306 #[cfg(windows)]
307 "127.0.0.1:0".parse().unwrap()
308 }
309
310 pub fn next_socket_addr_v6() -> SocketAddr {
312 #[cfg(not(windows))]
313 {
314 let mut mu = IPV6_BIND_NUM.lock();
315 let addr: SocketAddr = format!("[fc00::1:{}]:0", *mu).parse().unwrap();
316 *mu += 1;
317 if *mu > 255 {
318 *mu = 10;
319 }
320
321 addr
322 }
323
324 #[cfg(windows)]
325 "[::1]:0".parse().unwrap()
326 }
327
328 pub fn run<B, F>(block_on: B, fut: F)
330 where
331 B: FnOnce(F) -> F::Output,
332 F: std::future::Future<Output = ()>,
333 {
334 block_on(fut);
336 }
337
338 pub fn initialize_tests_tracing() {
340 use std::sync::Once;
341 static TRACE: Once = Once::new();
342 TRACE.call_once(|| {
343 let filter = std::env::var("MEMBERLIST_TESTING_LOG").unwrap_or_else(|_| "info".to_owned());
344 tracing::subscriber::set_global_default(
345 tracing_subscriber::fmt::fmt()
346 .without_time()
347 .with_line_number(true)
348 .with_env_filter(filter)
349 .with_file(false)
350 .with_target(true)
351 .with_ansi(true)
352 .finish(),
353 )
354 .unwrap();
355 });
356 }
357
358 pub async fn get_memberlist<T, D>(
360 t: T,
361 d: D,
362 opts: Options,
363 ) -> Result<Memberlist<T, D>, Error<T, D>>
364 where
365 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
366 T: Transport,
367 {
368 crate::Memberlist::new_in(t, Some(d), opts)
369 .await
370 .map(|(_, _, this)| this)
371 }
372}