memberlist_core/
base.rs

1use std::{
2  collections::{HashMap, VecDeque},
3  sync::{
4    Arc,
5    atomic::{AtomicBool, AtomicU32, AtomicUsize},
6  },
7};
8
9use agnostic_lite::{AsyncSpawner, RuntimeLite};
10use async_channel::{Receiver, Sender};
11use async_lock::{Mutex, RwLock};
12
13use atomic_refcell::AtomicRefCell;
14use futures::stream::FuturesUnordered;
15use nodecraft::{CheapClone, Node, resolver::AddressResolver};
16
17use super::{
18  Options,
19  awareness::Awareness,
20  broadcast::MemberlistBroadcast,
21  delegate::{Delegate, VoidDelegate},
22  error::Error,
23  proto::{Message, PushNodeState, TinyVec},
24  queue::TransmitLimitedQueue,
25  state::{AckManager, LocalNodeState},
26  suspicion::Suspicion,
27  transport::Transport,
28};
29
30#[cfg(feature = "encryption")]
31use super::keyring::Keyring;
32
33#[cfg(any(test, feature = "test"))]
34pub(crate) mod tests;
35
36#[viewit::viewit]
37pub(crate) struct HotData {
38  sequence_num: AtomicU32,
39  incarnation: AtomicU32,
40  push_pull_req: AtomicU32,
41  leave: AtomicBool,
42  num_nodes: Arc<AtomicU32>,
43}
44
45impl HotData {
46  fn new() -> Self {
47    Self {
48      sequence_num: AtomicU32::new(0),
49      incarnation: AtomicU32::new(0),
50      num_nodes: Arc::new(AtomicU32::new(0)),
51      push_pull_req: AtomicU32::new(0),
52      leave: AtomicBool::new(false),
53    }
54  }
55}
56
57#[viewit::viewit]
58pub(crate) struct MessageHandoff<I, A> {
59  msg: Message<I, A>,
60  from: A,
61}
62
63#[viewit::viewit]
64pub(crate) struct MessageQueue<I, A> {
65  /// high priority messages queue
66  high: VecDeque<MessageHandoff<I, A>>,
67  /// low priority messages queue
68  low: VecDeque<MessageHandoff<I, A>>,
69}
70
71impl<I, A> MessageQueue<I, A> {
72  const fn new() -> Self {
73    Self {
74      high: VecDeque::new(),
75      low: VecDeque::new(),
76    }
77  }
78}
79
80// #[viewit::viewit]
81pub(crate) struct Member<T, D>
82where
83  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
84  T: Transport,
85{
86  pub(crate) state: LocalNodeState<T::Id, T::ResolvedAddress>,
87  pub(crate) suspicion: Option<Suspicion<T, D>>,
88}
89
90impl<T, D> core::fmt::Debug for Member<T, D>
91where
92  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
93  T: Transport,
94{
95  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96    f.debug_struct("Member")
97      .field("state", &self.state)
98      .finish()
99  }
100}
101
102impl<T, D> core::ops::Deref for Member<T, D>
103where
104  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
105  T: Transport,
106{
107  type Target = LocalNodeState<T::Id, T::ResolvedAddress>;
108
109  fn deref(&self) -> &Self::Target {
110    &self.state
111  }
112}
113
114pub(crate) struct Members<T, D>
115where
116  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
117  T: Transport,
118{
119  pub(crate) local: Node<T::Id, T::ResolvedAddress>,
120  pub(crate) nodes: TinyVec<Member<T, D>>,
121  pub(crate) node_map: HashMap<T::Id, usize>,
122}
123
124impl<T, D> core::ops::Index<usize> for Members<T, D>
125where
126  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
127  T: Transport,
128{
129  type Output = Member<T, D>;
130
131  fn index(&self, index: usize) -> &Self::Output {
132    &self.nodes[index]
133  }
134}
135
136impl<T, D> core::ops::IndexMut<usize> for Members<T, D>
137where
138  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
139  T: Transport,
140{
141  fn index_mut(&mut self, index: usize) -> &mut Self::Output {
142    &mut self.nodes[index]
143  }
144}
145
146impl<T, D> rand::seq::IndexedRandom for Members<T, D>
147where
148  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
149  T: Transport,
150{
151  fn len(&self) -> usize {
152    self.nodes.len()
153  }
154}
155
156impl<T, D> rand::seq::SliceRandom for Members<T, D>
157where
158  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
159  T: Transport,
160{
161  fn shuffle<R>(&mut self, rng: &mut R)
162  where
163    R: rand::Rng + ?Sized,
164  {
165    // Sample a number uniformly between 0 and `ubound`. Uses 32-bit sampling where
166    // possible, primarily in order to produce the same output on 32-bit and 64-bit
167    // platforms.
168    #[inline]
169    fn gen_index<R: rand::Rng + ?Sized>(rng: &mut R, ubound: usize) -> usize {
170      if ubound <= (u32::MAX as usize) {
171        rng.random_range(0..ubound as u32) as usize
172      } else {
173        rng.random_range(0..ubound)
174      }
175    }
176
177    for i in (1..self.nodes.len()).rev() {
178      // invariant: elements with index > i have been locked in place.
179      let ridx = gen_index(rng, i + 1);
180      let curr = self.node_map.get_mut(self.nodes[i].state.id()).unwrap();
181      *curr = ridx;
182      let target = self.node_map.get_mut(self.nodes[ridx].state.id()).unwrap();
183      *target = i;
184      self.nodes.swap(i, ridx);
185    }
186  }
187
188  fn partial_shuffle<R>(
189    &mut self,
190    _rng: &mut R,
191    _amount: usize,
192  ) -> (&mut [Self::Output], &mut [Self::Output])
193  where
194    Self::Output: Sized,
195    R: rand::Rng + ?Sized,
196  {
197    unreachable!()
198  }
199}
200
201impl<T, D> Members<T, D>
202where
203  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
204  T: Transport,
205{
206  fn new(local: Node<T::Id, T::ResolvedAddress>) -> Self {
207    Self {
208      nodes: TinyVec::new(),
209      node_map: HashMap::new(),
210      local,
211    }
212  }
213}
214
215impl<T, D> Members<T, D>
216where
217  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
218  T: Transport,
219{
220  pub(crate) fn any_alive(&self) -> bool {
221    for m in self.nodes.iter() {
222      if !m.dead_or_left() && m.id().ne(self.local.id()) {
223        return true;
224      }
225    }
226
227    false
228  }
229}
230
231pub(crate) struct MemberlistCore<T, D>
232where
233  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
234  T: Transport,
235{
236  pub(crate) id: T::Id,
237  pub(crate) hot: HotData,
238  pub(crate) awareness: Awareness,
239  pub(crate) broadcast:
240    TransmitLimitedQueue<MemberlistBroadcast<T::Id, T::ResolvedAddress>, Arc<AtomicU32>>,
241  pub(crate) leave_broadcast_tx: Sender<()>,
242  pub(crate) leave_broadcast_rx: Receiver<()>,
243  pub(crate) handles: AtomicRefCell<
244    FuturesUnordered<<<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>>,
245  >,
246  pub(crate) probe_index: AtomicUsize,
247  pub(crate) handoff_tx: Sender<()>,
248  pub(crate) handoff_rx: Receiver<()>,
249  pub(crate) queue: Mutex<MessageQueue<T::Id, T::ResolvedAddress>>,
250  pub(crate) nodes: Arc<RwLock<Members<T, D>>>,
251  pub(crate) ack_manager: AckManager<T::Runtime>,
252  pub(crate) transport: Arc<T>,
253  /// We do not call send directly, just directly drop it.
254  pub(crate) shutdown_tx: Sender<()>,
255  pub(crate) advertise: T::ResolvedAddress,
256  pub(crate) opts: Arc<Options>,
257  #[cfg(feature = "encryption")]
258  pub(crate) keyring: Option<Keyring>,
259}
260
261impl<T, D> MemberlistCore<T, D>
262where
263  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
264  T: Transport,
265{
266  pub(crate) async fn shutdown(&self) -> Result<(), T::Error> {
267    if !self.shutdown_tx.close() {
268      return Ok(());
269    }
270
271    // Shut down the transport first, which should block until it's
272    // completely torn down. If we kill the memberlist-side handlers
273    // those I/O handlers might get stuck.
274    if let Err(e) = self.transport.shutdown().await {
275      tracing::error!(err=%e, "memberlist: failed to shutdown transport");
276      return Err(e);
277    }
278
279    Ok(())
280  }
281}
282
283impl<T, D> Drop for MemberlistCore<T, D>
284where
285  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
286  T: Transport,
287{
288  fn drop(&mut self) {
289    self.shutdown_tx.close();
290  }
291}
292
293/// A cluster membership and member failure detection using a gossip based protocol.
294///
295/// The use cases for such a library are far-reaching: all distributed systems
296/// require membership, and memberlist is a re-usable solution to managing
297/// cluster membership and node failure detection.
298///
299/// memberlist is eventually consistent but converges quickly on average.
300/// The speed at which it converges can be heavily tuned via various knobs
301/// on the protocol. Node failures are detected and network partitions are partially
302/// tolerated by attempting to communicate to potentially dead nodes through
303/// multiple routes.
304pub struct Memberlist<
305  T,
306  D = VoidDelegate<
307    <T as Transport>::Id,
308    <<T as Transport>::Resolver as AddressResolver>::ResolvedAddress,
309  >,
310> where
311  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
312  T: Transport,
313{
314  pub(crate) inner: Arc<MemberlistCore<T, D>>,
315  pub(crate) delegate: Option<Arc<D>>,
316}
317
318impl<T, D> Clone for Memberlist<T, D>
319where
320  T: Transport,
321  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
322{
323  fn clone(&self) -> Self {
324    Self {
325      inner: self.inner.clone(),
326      delegate: self.delegate.clone(),
327    }
328  }
329}
330
331impl<T, D> Memberlist<T, D>
332where
333  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
334  T: Transport,
335{
336  pub(crate) async fn new_in(
337    transport: T,
338    delegate: Option<D>,
339    opts: Options,
340  ) -> Result<(Receiver<()>, T::ResolvedAddress, Self), Error<T, D>> {
341    let (handoff_tx, handoff_rx) = async_channel::bounded(1);
342    let (leave_broadcast_tx, leave_broadcast_rx) = async_channel::bounded(1);
343
344    // Get the final advertise address from the transport, which may need
345    // to see which address we bound to. We'll refresh this each time we
346    // send out an alive message.
347    let advertise = transport.advertise_address();
348    let id = transport.local_id();
349    let node = Node::new(id.clone(), advertise.clone());
350    let awareness = Awareness::new(
351      opts.awareness_max_multiplier as isize,
352      #[cfg(feature = "metrics")]
353      Arc::new(vec![]),
354    );
355    let hot = HotData::new();
356    let num_nodes = hot.num_nodes.clone();
357    let broadcast = TransmitLimitedQueue::new(opts.retransmit_mult, num_nodes);
358
359    let (shutdown_tx, shutdown_rx) = async_channel::bounded(1);
360
361    #[cfg(feature = "encryption")]
362    let keyring = match (opts.primary_key, opts.secret_keys.is_empty()) {
363      (None, false) => {
364        tracing::warn!("memberlist: using first key in keyring as primary key");
365        let mut iter = opts.secret_keys.iter().copied();
366        let pk = iter.next().unwrap();
367        let keyring = Keyring::with_keys(pk, iter);
368        Some(keyring)
369      }
370      (Some(pk), true) => Some(Keyring::new(pk)),
371      (Some(pk), false) => Some(Keyring::with_keys(pk, opts.secret_keys.iter().copied())),
372      (None, true) => None,
373    };
374
375    let this = Memberlist {
376      inner: Arc::new(MemberlistCore {
377        id: id.cheap_clone(),
378        hot,
379        awareness,
380        broadcast,
381        leave_broadcast_tx,
382        leave_broadcast_rx,
383        probe_index: AtomicUsize::new(0),
384        handles: AtomicRefCell::new(FuturesUnordered::new()),
385        handoff_tx,
386        handoff_rx,
387        queue: Mutex::new(MessageQueue::new()),
388        nodes: Arc::new(RwLock::new(Members::new(node))),
389        ack_manager: AckManager::new(),
390        shutdown_tx,
391        advertise: advertise.cheap_clone(),
392        transport: Arc::new(transport),
393        opts: Arc::new(opts),
394        #[cfg(feature = "encryption")]
395        keyring,
396      }),
397      delegate: delegate.map(Arc::new),
398    };
399
400    {
401      let handles = this.inner.handles.borrow();
402      handles.push(this.stream_listener(shutdown_rx.clone()));
403      handles.push(this.packet_handler(shutdown_rx.clone()));
404      handles.push(this.packet_listener(shutdown_rx.clone()));
405      #[cfg(feature = "metrics")]
406      handles.push(this.check_broadcast_queue_depth(shutdown_rx.clone()));
407    }
408
409    Ok((shutdown_rx, this.inner.advertise.cheap_clone(), this))
410  }
411}
412
413// private impelementation
414impl<T, D> Memberlist<T, D>
415where
416  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
417  T: Transport,
418{
419  #[inline]
420  pub(crate) fn get_advertise(&self) -> &T::ResolvedAddress {
421    &self.inner.advertise
422  }
423
424  /// Check for any other alive node.
425  #[inline]
426  pub(crate) async fn any_alive(&self) -> bool {
427    self
428      .inner
429      .nodes
430      .read()
431      .await
432      .nodes
433      .iter()
434      .any(|n| !n.state.dead_or_left() && n.state.server.id().ne(&self.inner.id))
435  }
436
437  #[cfg(feature = "metrics")]
438  fn check_broadcast_queue_depth(
439    &self,
440    shutdown_rx: Receiver<()>,
441  ) -> <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()> {
442    use futures::{FutureExt, StreamExt};
443
444    let queue_check_interval = self.inner.opts.queue_check_interval;
445    let this = self.clone();
446
447    <T::Runtime as RuntimeLite>::spawn(async move {
448      let tick = <T::Runtime as RuntimeLite>::interval(queue_check_interval);
449      futures::pin_mut!(tick);
450      loop {
451        futures::select! {
452          _ = shutdown_rx.recv().fuse() => {
453            tracing::debug!("memberlist: broadcast queue checker exits");
454            return;
455          },
456          _ = tick.next().fuse() => {
457            let numq = this.inner.broadcast.num_queued().await;
458            metrics::histogram!("memberlist.queue.broadcasts").record(numq as f64);
459          }
460        }
461      }
462    })
463  }
464
465  pub(crate) async fn verify_protocol(
466    &self,
467    _remote: &[PushNodeState<T::Id, T::ResolvedAddress>],
468  ) -> Result<(), Error<T, D>> {
469    // TODO: now we do not need to handle this situation, because there is no update
470    // on protocol.
471    Ok(())
472  }
473}