memberlist_core/
api.rs

1use std::{
2  sync::{Arc, atomic::Ordering},
3  time::Duration,
4};
5
6use agnostic_lite::{RuntimeLite, time::Instant};
7use bytes::Bytes;
8use futures::{FutureExt, StreamExt};
9use smallvec_wrapper::OneOrMore;
10
11use super::{
12  Options,
13  base::Memberlist,
14  delegate::{Delegate, VoidDelegate},
15  error::Error,
16  network::META_MAX_SIZE,
17  proto::{Alive, Dead, MaybeResolvedAddress, Message, Meta, NodeState, Ping, SmallVec},
18  state::AckMessage,
19  transport::{AddressResolver, CheapClone, Node, Transport},
20};
21
22impl<T, D> Memberlist<T, D>
23where
24  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
25  T: Transport,
26{
27  /// Returns the local node ID.
28  #[inline]
29  pub fn local_id(&self) -> &T::Id {
30    &self.inner.id
31  }
32
33  /// Returns the local node address
34  #[inline]
35  pub fn local_address(&self) -> &<T::Resolver as AddressResolver>::Address {
36    self.inner.transport.local_address()
37  }
38
39  /// Returns a [`Node`] with the local id and the advertise address of local node.
40  #[inline]
41  pub fn advertise_node(&self) -> Node<T::Id, T::ResolvedAddress> {
42    Node::new(self.inner.id.clone(), self.inner.advertise.clone())
43  }
44
45  /// Returns the advertise address of local node.
46  #[inline]
47  pub fn advertise_address(&self) -> &T::ResolvedAddress {
48    &self.inner.advertise
49  }
50
51  /// Returns the keyring (only used for encryption) of the node
52  #[cfg(feature = "encryption")]
53  #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
54  #[inline]
55  pub fn keyring(&self) -> Option<&super::keyring::Keyring> {
56    self.inner.keyring.as_ref()
57  }
58
59  /// Returns `true` if the node enables encryption.
60  #[cfg(feature = "encryption")]
61  #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
62  #[inline]
63  pub fn encryption_enabled(&self) -> bool {
64    self.inner.keyring.is_some()
65      && self.inner.opts.encryption_algo.is_some()
66      && self.inner.opts.gossip_verify_outgoing
67  }
68
69  /// Returns the delegate, if any.
70  #[inline]
71  pub fn delegate(&self) -> Option<&D> {
72    self.delegate.as_deref()
73  }
74
75  /// Returns the local node instance state.
76  #[inline]
77  pub async fn local_state(&self) -> Option<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
78    let nodes = self.inner.nodes.read().await;
79    nodes
80      .node_map
81      .get(&self.inner.id)
82      .map(|&idx| nodes.nodes[idx].state.server.clone())
83  }
84
85  /// Returns the node state of the given id. (if any).
86  pub async fn by_id(&self, id: &T::Id) -> Option<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
87    let members = self.inner.nodes.read().await;
88
89    members
90      .node_map
91      .get(id)
92      .map(|&idx| members.nodes[idx].state.server.clone())
93  }
94
95  /// Returns a list of all known nodes.
96  #[inline]
97  pub async fn members(&self) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
98    self
99      .inner
100      .nodes
101      .read()
102      .await
103      .nodes
104      .iter()
105      .map(|n| n.state.server.clone())
106      .collect()
107  }
108
109  /// Returns number of members
110  #[inline]
111  pub async fn num_members(&self) -> usize {
112    self.inner.nodes.read().await.nodes.len()
113  }
114
115  /// Returns a list of all known nodes that are online.
116  pub async fn online_members(&self) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
117    self
118      .inner
119      .nodes
120      .read()
121      .await
122      .nodes
123      .iter()
124      .filter(|n| !n.dead_or_left())
125      .map(|n| n.state.server.clone())
126      .collect()
127  }
128
129  /// Returns the number of online members.
130  pub async fn num_online_members(&self) -> usize {
131    self
132      .inner
133      .nodes
134      .read()
135      .await
136      .nodes
137      .iter()
138      .filter(|n| !n.dead_or_left())
139      .count()
140  }
141
142  /// Returns a list of all known nodes that match the given predicate.
143  pub async fn members_by(
144    &self,
145    mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> bool,
146  ) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
147    self
148      .inner
149      .nodes
150      .read()
151      .await
152      .nodes
153      .iter()
154      .filter(|n| f(&n.state))
155      .map(|n| n.state.server.clone())
156      .collect()
157  }
158
159  /// Returns the number of members match the given predicate.
160  pub async fn num_members_by(
161    &self,
162    mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> bool,
163  ) -> usize {
164    self
165      .inner
166      .nodes
167      .read()
168      .await
169      .nodes
170      .iter()
171      .filter(|n| f(&n.state))
172      .count()
173  }
174
175  /// Returns a list of map result on all known members that match the given predicate.
176  pub async fn members_map_by<O>(
177    &self,
178    mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> Option<O>,
179  ) -> SmallVec<O> {
180    self
181      .inner
182      .nodes
183      .read()
184      .await
185      .nodes
186      .iter()
187      .filter_map(|n| f(&n.state))
188      .collect()
189  }
190}
191
192impl<T> Memberlist<T>
193where
194  T: Transport,
195{
196  /// Create a new memberlist with the given transport and options.
197  #[inline]
198  pub async fn new(
199    transport_options: T::Options,
200    opts: Options,
201  ) -> Result<Self, Error<T, VoidDelegate<T::Id, T::ResolvedAddress>>> {
202    Self::create(None, transport_options, opts).await
203  }
204}
205
206impl<T, D> Memberlist<T, D>
207where
208  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
209  T: Transport,
210{
211  /// Create a new memberlist with the given transport, delegate and options.
212  #[inline]
213  pub async fn with_delegate(
214    delegate: D,
215    transport_options: T::Options,
216    opts: Options,
217  ) -> Result<Self, Error<T, D>> {
218    Self::create(Some(delegate), transport_options, opts).await
219  }
220
221  pub(crate) async fn create(
222    delegate: Option<D>,
223    transport_options: T::Options,
224    opts: Options,
225  ) -> Result<Self, Error<T, D>> {
226    let transport = T::new(transport_options).await.map_err(Error::Transport)?;
227    let (shutdown_rx, advertise, this) = Self::new_in(transport, delegate, opts).await?;
228    let meta = if let Some(d) = &this.delegate {
229      d.node_meta(META_MAX_SIZE).await
230    } else {
231      Meta::empty()
232    };
233
234    if meta.len() > META_MAX_SIZE {
235      panic!("NodeState meta data provided is longer than the limit");
236    }
237
238    let alive = Alive::new(
239      this.next_incarnation(),
240      Node::new(this.inner.id.clone(), this.inner.advertise.clone()),
241    )
242    .with_meta(meta)
243    .with_protocol_version(this.inner.opts.protocol_version)
244    .with_delegate_version(this.inner.opts.delegate_version);
245    this.alive_node(alive, None, true).await;
246    this.schedule(shutdown_rx).await;
247    tracing::debug!(local = %this.inner.id, advertise_addr = %advertise, "memberlist: node is living");
248    Ok(this)
249  }
250
251  /// Leave will broadcast a leave message but will not shutdown the background
252  /// listeners, meaning the node will continue participating in gossip and state
253  /// updates.
254  ///
255  /// This will block until the leave message is successfully broadcasted to
256  /// a member of the cluster, if any exist or until a specified timeout
257  /// is reached.
258  ///
259  /// This method is safe to call multiple times, but must not be called
260  /// after the cluster is already shut down.
261  ///
262  /// Returns `true` if the node has successfully left the cluster by this call.
263  pub async fn leave(&self, timeout: Duration) -> Result<bool, Error<T, D>> {
264    if self.has_shutdown() {
265      return Ok(false);
266    }
267
268    if self.has_left() {
269      return Ok(false);
270    }
271
272    if !self.has_left() {
273      self.inner.hot.leave.store(true, Ordering::Release);
274
275      let mut memberlist = self.inner.nodes.write().await;
276      if let Some(&idx) = memberlist.node_map.get(&self.inner.id) {
277        // This dead message is special, because NodeState and From are the
278        // same. This helps other nodes figure out that a node left
279        // intentionally. When NodeState equals From, other nodes know for
280        // sure this node is gone.
281
282        let state = &memberlist.nodes[idx];
283        let d = Dead::new(
284          state.state.incarnation.load(Ordering::Acquire),
285          state.id().cheap_clone(),
286          state.id().cheap_clone(),
287        );
288
289        self.dead_node(&mut memberlist, d).await?;
290        let any_alive = memberlist.any_alive();
291        drop(memberlist);
292
293        // Block until the broadcast goes out
294        if any_alive {
295          if timeout > Duration::ZERO {
296            futures::select! {
297              _ = self.inner.leave_broadcast_rx.recv().fuse() => {},
298              _ = <T::Runtime as RuntimeLite>::sleep(timeout).fuse() => {
299                return Err(Error::LeaveTimeout);
300              }
301            }
302          } else if let Err(e) = self.inner.leave_broadcast_rx.recv().await {
303            tracing::error!("memberlist: failed to receive leave broadcast: {}", e);
304          }
305        }
306      } else {
307        tracing::warn!("memberlist: leave but we're not a member");
308      }
309    }
310
311    Ok(true)
312  }
313
314  /// Join directly by contacting the given node id,
315  /// Returns the node if successfully joined, or an error if the node could not be reached.
316  pub async fn join(
317    &self,
318    node: Node<T::Id, MaybeResolvedAddress<T::Address, T::ResolvedAddress>>,
319  ) -> Result<Node<T::Id, T::ResolvedAddress>, Error<T, D>> {
320    if self.has_left() || self.has_shutdown() {
321      return Err(Error::NotRunning);
322    }
323
324    let (id, addr) = node.into_components();
325    let addr = match addr {
326      MaybeResolvedAddress::Resolved(addr) => addr,
327      MaybeResolvedAddress::Unresolved(addr) => self
328        .inner
329        .transport
330        .resolve(&addr)
331        .await
332        .map_err(Error::Transport)?,
333    };
334    let n = Node::new(id, addr);
335    self.push_pull_node(n.cheap_clone(), true).await.map(|_| n)
336  }
337
338  /// Used to take an existing Memberlist and attempt to join a cluster
339  /// by contacting all the given hosts and performing a state sync. Initially,
340  /// the Memberlist only contains our own state, so doing this will cause
341  /// remote nodes to become aware of the existence of this node, effectively
342  /// joining the cluster.
343  ///
344  /// On success, returns a list of all nodes that were successfully joined with resolved addresses.
345  /// On error, returns a list of nodes are successfully joined with resolved addresses and the error.
346  pub async fn join_many(
347    &self,
348    existing: impl Iterator<Item = Node<T::Id, MaybeResolvedAddress<T::Address, T::ResolvedAddress>>>,
349  ) -> Result<
350    SmallVec<Node<T::Id, T::ResolvedAddress>>,
351    (SmallVec<Node<T::Id, T::ResolvedAddress>>, Error<T, D>),
352  > {
353    if self.has_left() || self.has_shutdown() {
354      return Err((Default::default(), Error::NotRunning));
355    }
356
357    let estimated_total = existing.size_hint().0;
358
359    let futs = existing
360      .into_iter()
361      .map(|node| {
362        async move {
363          let (id, addr) = node.into_components();
364          let resolved_addr = match addr {
365            MaybeResolvedAddress::Resolved(addr) => addr,
366            MaybeResolvedAddress::Unresolved(addr) => {
367              match self.inner.transport.resolve(&addr).await {
368                Ok(addr) => addr,
369                Err(e) => {
370                  tracing::debug!(
371                    err = %e,
372                    "memberlist: failed to resolve address {}",
373                    addr,
374                  );
375                  return Err((Node::new(id, MaybeResolvedAddress::<T::Address, T::ResolvedAddress>::unresolved(addr)), Error::<T, D>::transport(e)))
376                }
377              }
378            }
379          };
380          let node = Node::new(id, resolved_addr);
381          tracing::info!(local = %self.inner.transport.local_id(), peer = %node, "memberlist: start join...");
382          if let Err(e) = self.push_pull_node(node.cheap_clone(), true).await {
383            tracing::debug!(
384              local = %self.inner.id,
385              err = %e,
386              "memberlist: failed to join {}",
387              node,
388            );
389            let (id, addr) = node.into_components();
390            Err((Node::new(id, MaybeResolvedAddress::Resolved(addr)), e))
391          } else {
392            Ok(node)
393          }
394        }
395      }).collect::<futures::stream::FuturesUnordered<_>>();
396
397    let successes = std::cell::RefCell::new(SmallVec::with_capacity(estimated_total));
398    let errors = futs
399      .filter_map(|rst| async {
400        match rst {
401          Ok(node) => {
402            successes.borrow_mut().push(node);
403            None
404          }
405          Err((_, e)) => Some(e),
406        }
407      })
408      .collect::<OneOrMore<_>>()
409      .await;
410
411    match Error::try_from_one_or_more(errors) {
412      Ok(()) => Ok(successes.into_inner()),
413      Err(e) => Err((successes.into_inner(), e)),
414    }
415  }
416
417  /// Gives this instance's idea of how well it is meeting the soft
418  /// real-time requirements of the protocol. Lower numbers are better, and zero
419  /// means "totally healthy".
420  #[inline]
421  pub fn health_score(&self) -> usize {
422    self.inner.awareness.get_health_score() as usize
423  }
424
425  /// Used to trigger re-advertising the local node. This is
426  /// primarily used with a Delegate to support dynamic updates to the local
427  /// meta data.  This will block until the update message is successfully
428  /// broadcasted to a member of the cluster, if any exist or until a specified
429  /// timeout is reached.
430  pub async fn update_node(&self, timeout: Duration) -> Result<(), Error<T, D>> {
431    if self.has_left() || self.has_shutdown() {
432      return Err(Error::NotRunning);
433    }
434
435    // Get the node meta data
436    let meta = if let Some(delegate) = &self.delegate {
437      let meta = delegate.node_meta(META_MAX_SIZE).await;
438      if meta.len() > META_MAX_SIZE {
439        panic!("node meta data provided is longer than the limit");
440      }
441      meta
442    } else {
443      Meta::empty()
444    };
445
446    // Get the existing node
447    // unwrap safe here this is self
448    let node = {
449      let members = self.inner.nodes.read().await;
450
451      let idx = *members.node_map.get(&self.inner.id).unwrap();
452
453      let state = &members.nodes[idx].state;
454      Node::new(state.id().cheap_clone(), state.address().cheap_clone())
455    };
456
457    // Format a new alive message
458    let alive = Alive::new(self.next_incarnation(), node)
459      .with_meta(meta)
460      .with_protocol_version(self.inner.opts.protocol_version)
461      .with_delegate_version(self.inner.opts.delegate_version);
462    let (notify_tx, notify_rx) = async_channel::bounded(1);
463    self.alive_node(alive, Some(notify_tx), true).await;
464
465    // Wait for the broadcast or a timeout
466    if self.any_alive().await {
467      if timeout > Duration::ZERO {
468        let _ = <T::Runtime as RuntimeLite>::timeout(timeout, notify_rx.recv())
469          .await
470          .map_err(|_| Error::UpdateTimeout)?;
471      } else {
472        let _ = notify_rx.recv().await;
473      }
474    }
475
476    Ok(())
477  }
478
479  /// Uses the unreliable packet-oriented interface of the transport
480  /// to target a user message at the given node (this does not use the gossip
481  /// mechanism). The maximum size of the message depends on the configured
482  /// `packet_buffer_size` for this memberlist instance.
483  ///
484  /// See also [`send_reliable`](Memberlist::send_reliable).
485  #[inline]
486  pub async fn send(&self, to: &T::ResolvedAddress, msg: Bytes) -> Result<(), Error<T, D>> {
487    self.send_many(to, std::iter::once(msg)).await
488  }
489
490  /// Uses the unreliable packet-oriented interface of the transport
491  /// to target a user message at the given node (this does not use the gossip
492  /// mechanism). The maximum size of the message depends on the configured
493  /// `packet_buffer_size` for this memberlist instance.
494  #[inline]
495  pub async fn send_many(
496    &self,
497    to: &T::ResolvedAddress,
498    msgs: impl Iterator<Item = Bytes>,
499  ) -> Result<(), Error<T, D>> {
500    if self.has_left() || self.has_shutdown() {
501      return Err(Error::NotRunning);
502    }
503
504    let stream = self
505      .transport_send_packets(to, msgs.map(Message::UserData).collect::<OneOrMore<_>>())
506      .await;
507    futures::pin_mut!(stream);
508    match stream.next().await {
509      None => Ok(()),
510      Some(Ok(_)) => Ok(()),
511      Some(Err(e)) => Err(e),
512    }
513  }
514
515  /// Uses the reliable stream-oriented interface of the transport to
516  /// target a user message at the given node (this does not use the gossip
517  /// mechanism). Delivery is guaranteed if no error is returned, and there is no
518  /// limit on the size of the message.
519  ///
520  /// See also [`send_many_reliable`](Memberlist::send_many_reliable).
521  #[inline]
522  pub async fn send_reliable(
523    &self,
524    to: &T::ResolvedAddress,
525    msg: Bytes,
526  ) -> Result<(), Error<T, D>> {
527    self.send_many_reliable(to, std::iter::once(msg)).await
528  }
529
530  /// Uses the reliable stream-oriented interface of the transport to
531  /// target a user message at the given node (this does not use the gossip
532  /// mechanism). Delivery is guaranteed if no error is returned, and there is no
533  /// limit on the size of the message.
534  #[inline]
535  pub async fn send_many_reliable(
536    &self,
537    to: &T::ResolvedAddress,
538    msgs: impl Iterator<Item = Bytes>,
539  ) -> Result<(), Error<T, D>> {
540    if self.has_left() || self.has_shutdown() {
541      return Err(Error::NotRunning);
542    }
543    self
544      .send_user_msg(to, msgs.map(Message::UserData).collect())
545      .await
546  }
547
548  /// Initiates a ping to the node with the specified node.
549  pub async fn ping(&self, node: Node<T::Id, T::ResolvedAddress>) -> Result<Duration, Error<T, D>> {
550    // Prepare a ping message and setup an ack handler.
551    let self_addr = self.get_advertise();
552    let ping = Ping::new(
553      self.next_sequence_number(),
554      Node::new(self.inner.transport.local_id().clone(), self_addr.clone()),
555      node.clone(),
556    );
557
558    let (ack_tx, ack_rx) = async_channel::bounded(self.inner.opts.indirect_checks + 1);
559    self.inner.ack_manager.set_probe_channels(
560      ping.sequence_number(),
561      ack_tx,
562      None,
563      <T::Runtime as RuntimeLite>::now(),
564      self.inner.opts.probe_interval,
565    );
566
567    // Send a ping to the node.
568    // Wait to send or timeout.
569    match <T::Runtime as RuntimeLite>::timeout(self.inner.opts.probe_timeout, async {
570      let stream = self.send_packets(node.address(), ping.into()).await;
571      futures::pin_mut!(stream);
572      let errs = stream.collect::<OneOrMore<_>>().await;
573      let num_errs = errs.len();
574
575      match num_errs {
576        0 => Ok(()),
577        _ => match errs.into_either() {
578          either::Either::Left([e]) => Err(e),
579          either::Either::Right(e) => Err(Error::Multiple(e.into_vec().into())),
580        },
581      }
582    })
583    .await
584    {
585      Ok(Ok(())) => {}
586      Ok(Err(e)) => return Err(e),
587      Err(_) => {
588        // If we timed out, return Error.
589        tracing::debug!(
590          "memberlist: failed ping {} by packet (timeout reached)",
591          node
592        );
593        return Err(Error::Lost(node));
594      }
595    }
596
597    // Mark the sent time here, which should be after any pre-processing and
598    // system calls to do the actual send. This probably under-reports a bit,
599    // but it's the best we can do.
600    let sent = <T::Runtime as RuntimeLite>::now();
601
602    // Wait for response or timeout.
603    futures::select! {
604      v = ack_rx.recv().fuse() => {
605        // If we got a response, update the RTT.
606        if let Ok(AckMessage { complete, .. }) = v {
607          if complete {
608            return Ok(sent.elapsed());
609          }
610        }
611      }
612      _ = <T::Runtime as RuntimeLite>::sleep(self.inner.opts.probe_timeout).fuse() => {}
613    }
614
615    // If we timed out, return Error.
616    tracing::debug!(
617      "memberlist: failed ping {} by packet (timeout reached)",
618      node
619    );
620    Err(Error::Lost(node))
621  }
622
623  /// Stop any background maintenance of network activity
624  /// for this memberlist, causing it to appear "dead". A leave message
625  /// will not be broadcasted prior, so the cluster being left will have
626  /// to detect this node's shutdown using probing. If you wish to more
627  /// gracefully exit the cluster, call Leave prior to shutting down.
628  ///
629  /// This method is safe to call multiple times.
630  pub async fn shutdown(&self) -> Result<(), Error<T, D>> {
631    self.inner.shutdown().await.map_err(Error::Transport)
632  }
633}