Skip to main content

memberlist_core/
api.rs

1use std::{
2  sync::{Arc, atomic::Ordering},
3  time::Duration,
4};
5
6use agnostic_lite::{RuntimeLite, time::Instant};
7use bytes::Bytes;
8use futures::{FutureExt, StreamExt};
9use smallvec_wrapper::OneOrMore;
10
11use super::{
12  Options,
13  base::Memberlist,
14  delegate::{Delegate, VoidDelegate},
15  error::Error,
16  network::META_MAX_SIZE,
17  proto::{Alive, Dead, MaybeResolvedAddress, Message, Meta, NodeState, Ping, SmallVec},
18  state::AckMessage,
19  transport::{AddressResolver, CheapClone, Node, Transport},
20};
21
22impl<T, D> Memberlist<T, D>
23where
24  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
25  T: Transport,
26{
27  /// Returns the local node ID.
28  #[inline]
29  pub fn local_id(&self) -> &T::Id {
30    &self.inner.id
31  }
32
33  /// Returns the local node address
34  #[inline]
35  pub fn local_address(&self) -> &<T::Resolver as AddressResolver>::Address {
36    self.inner.transport.local_address()
37  }
38
39  /// Returns a [`Node`] with the local id and the advertise address of local node.
40  #[inline]
41  pub fn advertise_node(&self) -> Node<T::Id, T::ResolvedAddress> {
42    Node::new(self.inner.id.clone(), self.inner.advertise.clone())
43  }
44
45  /// Returns the advertise address of local node.
46  #[inline]
47  pub fn advertise_address(&self) -> &T::ResolvedAddress {
48    &self.inner.advertise
49  }
50
51  /// Returns the keyring (only used for encryption) of the node
52  #[cfg(feature = "encryption")]
53  #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
54  #[inline]
55  pub fn keyring(&self) -> Option<&super::keyring::Keyring> {
56    self.inner.keyring.as_ref()
57  }
58
59  /// Returns `true` if the node enables encryption.
60  #[cfg(feature = "encryption")]
61  #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
62  #[inline]
63  pub fn encryption_enabled(&self) -> bool {
64    self.inner.keyring.is_some()
65      && self.inner.opts.encryption_algo.is_some()
66      && self.inner.opts.gossip_verify_outgoing
67  }
68
69  /// Returns the delegate, if any.
70  #[inline]
71  pub fn delegate(&self) -> Option<&D> {
72    self.delegate.as_deref()
73  }
74
75  /// Returns the local node instance state.
76  #[inline]
77  pub async fn local_state(&self) -> Option<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
78    let nodes = self.inner.nodes.read().await;
79    nodes
80      .node_map
81      .get(&self.inner.id)
82      .map(|&idx| nodes.nodes[idx].state.server.clone())
83  }
84
85  /// Returns the node state of the given id. (if any).
86  pub async fn by_id(&self, id: &T::Id) -> Option<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
87    let members = self.inner.nodes.read().await;
88
89    members
90      .node_map
91      .get(id)
92      .map(|&idx| members.nodes[idx].state.server.clone())
93  }
94
95  /// Returns a list of all known nodes.
96  #[inline]
97  pub async fn members(&self) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
98    self
99      .inner
100      .nodes
101      .read()
102      .await
103      .nodes
104      .iter()
105      .map(|n| n.state.server.clone())
106      .collect()
107  }
108
109  /// Returns number of members
110  #[inline]
111  pub async fn num_members(&self) -> usize {
112    self.inner.nodes.read().await.nodes.len()
113  }
114
115  /// Returns a list of all known nodes that are online.
116  pub async fn online_members(&self) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
117    self
118      .inner
119      .nodes
120      .read()
121      .await
122      .nodes
123      .iter()
124      .filter(|n| !n.dead_or_left())
125      .map(|n| n.state.server.clone())
126      .collect()
127  }
128
129  /// Returns the number of online members.
130  pub async fn num_online_members(&self) -> usize {
131    self
132      .inner
133      .nodes
134      .read()
135      .await
136      .nodes
137      .iter()
138      .filter(|n| !n.dead_or_left())
139      .count()
140  }
141
142  /// Returns a list of all known nodes that match the given predicate.
143  pub async fn members_by(
144    &self,
145    mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> bool,
146  ) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
147    self
148      .inner
149      .nodes
150      .read()
151      .await
152      .nodes
153      .iter()
154      .filter(|n| f(&n.state))
155      .map(|n| n.state.server.clone())
156      .collect()
157  }
158
159  /// Returns the number of members match the given predicate.
160  pub async fn num_members_by(
161    &self,
162    mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> bool,
163  ) -> usize {
164    self
165      .inner
166      .nodes
167      .read()
168      .await
169      .nodes
170      .iter()
171      .filter(|n| f(&n.state))
172      .count()
173  }
174
175  /// Returns a list of map result on all known members that match the given predicate.
176  pub async fn members_map_by<O>(
177    &self,
178    mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> Option<O>,
179  ) -> SmallVec<O> {
180    self
181      .inner
182      .nodes
183      .read()
184      .await
185      .nodes
186      .iter()
187      .filter_map(|n| f(&n.state))
188      .collect()
189  }
190}
191
192impl<T> Memberlist<T>
193where
194  T: Transport,
195{
196  /// Create a new memberlist with the given transport and options.
197  #[inline]
198  pub async fn new(
199    transport_options: T::Options,
200    opts: Options,
201  ) -> Result<Self, Error<T, VoidDelegate<T::Id, T::ResolvedAddress>>> {
202    Self::create(None, transport_options, opts).await
203  }
204}
205
206impl<T, D> Memberlist<T, D>
207where
208  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
209  T: Transport,
210{
211  /// Create a new memberlist with the given transport, delegate and options.
212  #[inline]
213  pub async fn with_delegate(
214    delegate: D,
215    transport_options: T::Options,
216    opts: Options,
217  ) -> Result<Self, Error<T, D>> {
218    Self::create(Some(delegate), transport_options, opts).await
219  }
220
221  pub(crate) async fn create(
222    delegate: Option<D>,
223    transport_options: T::Options,
224    opts: Options,
225  ) -> Result<Self, Error<T, D>> {
226    let transport = T::new(transport_options).await.map_err(Error::Transport)?;
227    let (shutdown_rx, advertise, this) = Self::new_in(transport, delegate, opts).await?;
228    let meta = if let Some(d) = &this.delegate {
229      d.node_meta(META_MAX_SIZE).await
230    } else {
231      Meta::empty()
232    };
233
234    if meta.len() > META_MAX_SIZE {
235      panic!("NodeState meta data provided is longer than the limit");
236    }
237
238    let alive = Alive::new(
239      this.next_incarnation(),
240      Node::new(this.inner.id.clone(), this.inner.advertise.clone()),
241    )
242    .with_meta(meta)
243    .with_protocol_version(this.inner.opts.protocol_version)
244    .with_delegate_version(this.inner.opts.delegate_version);
245    this.alive_node(alive, None, true).await;
246    this.schedule(shutdown_rx).await;
247    tracing::debug!(local = %this.inner.id, advertise_addr = %advertise, "memberlist: node is living");
248    Ok(this)
249  }
250
251  /// Leave will broadcast a leave message but will not shutdown the background
252  /// listeners, meaning the node will continue participating in gossip and state
253  /// updates.
254  ///
255  /// This will block until the leave message is successfully broadcasted to
256  /// a member of the cluster, if any exist or until a specified timeout
257  /// is reached.
258  ///
259  /// This method is safe to call multiple times, but must not be called
260  /// after the cluster is already shut down.
261  ///
262  /// Returns `true` if the node has successfully left the cluster by this call.
263  pub async fn leave(&self, timeout: Duration) -> Result<bool, Error<T, D>> {
264    if self.has_shutdown() {
265      return Ok(false);
266    }
267
268    if self.has_left() {
269      return Ok(false);
270    }
271
272    if !self.has_left() {
273      self.inner.hot.leave.store(true, Ordering::Release);
274
275      let mut memberlist = self.inner.nodes.write().await;
276      if let Some(&idx) = memberlist.node_map.get(&self.inner.id) {
277        // This dead message is special, because NodeState and From are the
278        // same. This helps other nodes figure out that a node left
279        // intentionally. When NodeState equals From, other nodes know for
280        // sure this node is gone.
281
282        let state = &memberlist.nodes[idx];
283        let d = Dead::new(
284          state.state.incarnation.load(Ordering::Acquire),
285          state.id().cheap_clone(),
286          state.id().cheap_clone(),
287        );
288
289        self.dead_node(&mut memberlist, d).await?;
290        let any_alive = memberlist.any_alive();
291        drop(memberlist);
292
293        // Block until the broadcast goes out
294        if any_alive {
295          if timeout > Duration::ZERO {
296            futures::select! {
297              _ = self.inner.leave_broadcast_rx.recv().fuse() => {},
298              _ = <T::Runtime as RuntimeLite>::sleep(timeout).fuse() => {
299                return Err(Error::LeaveTimeout);
300              }
301            }
302          } else if let Err(e) = self.inner.leave_broadcast_rx.recv().await {
303            tracing::error!("memberlist: failed to receive leave broadcast: {}", e);
304          }
305        }
306      } else {
307        tracing::warn!("memberlist: leave but we're not a member");
308      }
309    }
310
311    Ok(true)
312  }
313
314  /// Join directly by contacting the given node address,
315  /// Returns the resolved address if successfully joined, or an error if the address could not be reached.
316  pub async fn join(
317    &self,
318    addr: MaybeResolvedAddress<T::Address, T::ResolvedAddress>,
319  ) -> Result<T::ResolvedAddress, Error<T, D>> {
320    if self.has_left() || self.has_shutdown() {
321      return Err(Error::NotRunning);
322    }
323
324    let addr = match addr {
325      MaybeResolvedAddress::Resolved(addr) => addr,
326      MaybeResolvedAddress::Unresolved(addr) => self
327        .inner
328        .transport
329        .resolve(&addr)
330        .await
331        .map_err(Error::Transport)?,
332    };
333
334    self.push_pull_node(&addr, true).await.map(|_| addr)
335  }
336
337  /// Used to take an existing Memberlist and attempt to join a cluster
338  /// by contacting all the given hosts and performing a state sync. Initially,
339  /// the Memberlist only contains our own state, so doing this will cause
340  /// remote nodes to become aware of the existence of this node, effectively
341  /// joining the cluster.
342  ///
343  /// On success, returns a list of all addresses that were successfully joined with resolved addresses.
344  /// On error, returns a list of addresses that were successfully joined with resolved addresses and the error.
345  pub async fn join_many(
346    &self,
347    existing: impl Iterator<Item = MaybeResolvedAddress<T::Address, T::ResolvedAddress>>,
348  ) -> Result<SmallVec<T::ResolvedAddress>, (SmallVec<T::ResolvedAddress>, Error<T, D>)> {
349    if self.has_left() || self.has_shutdown() {
350      return Err((Default::default(), Error::NotRunning));
351    }
352
353    let estimated_total = existing.size_hint().0;
354
355    let futs = existing
356      .into_iter()
357      .map(|addr| {
358        async move {
359          let resolved_addr = match addr {
360            MaybeResolvedAddress::Resolved(addr) => addr,
361            MaybeResolvedAddress::Unresolved(addr) => {
362              match self.inner.transport.resolve(&addr).await {
363                Ok(addr) => addr,
364                Err(e) => {
365                  tracing::debug!(
366                    err = %e,
367                    "memberlist: failed to resolve address {}",
368                    addr,
369                  );
370                  return Err((MaybeResolvedAddress::<T::Address, T::ResolvedAddress>::unresolved(addr), Error::<T, D>::transport(e)))
371                }
372              }
373            }
374          };
375          tracing::info!(local = %self.inner.transport.local_id(), peer = %resolved_addr, "memberlist: start join...");
376          if let Err(e) = self.push_pull_node(&resolved_addr, true).await {
377            tracing::debug!(
378              local = %self.inner.id,
379              err = %e,
380              "memberlist: failed to join {}",
381              resolved_addr,
382            );
383            Err((MaybeResolvedAddress::Resolved(resolved_addr), e))
384          } else {
385            Ok(resolved_addr)
386          }
387        }
388      }).collect::<futures::stream::FuturesUnordered<_>>();
389
390    let successes = std::cell::RefCell::new(SmallVec::with_capacity(estimated_total));
391    let errors = futs
392      .filter_map(|rst| async {
393        match rst {
394          Ok(addr) => {
395            successes.borrow_mut().push(addr);
396            None
397          }
398          Err((_, e)) => Some(e),
399        }
400      })
401      .collect::<OneOrMore<_>>()
402      .await;
403
404    match Error::try_from_one_or_more(errors) {
405      Ok(()) => Ok(successes.into_inner()),
406      Err(e) => Err((successes.into_inner(), e)),
407    }
408  }
409
410  /// Gives this instance's idea of how well it is meeting the soft
411  /// real-time requirements of the protocol. Lower numbers are better, and zero
412  /// means "totally healthy".
413  #[inline]
414  pub fn health_score(&self) -> usize {
415    self.inner.awareness.get_health_score() as usize
416  }
417
418  /// Used to trigger re-advertising the local node. This is
419  /// primarily used with a Delegate to support dynamic updates to the local
420  /// meta data.  This will block until the update message is successfully
421  /// broadcasted to a member of the cluster, if any exist or until a specified
422  /// timeout is reached.
423  pub async fn update_node(&self, timeout: Duration) -> Result<(), Error<T, D>> {
424    if self.has_left() || self.has_shutdown() {
425      return Err(Error::NotRunning);
426    }
427
428    // Get the node meta data
429    let meta = if let Some(delegate) = &self.delegate {
430      let meta = delegate.node_meta(META_MAX_SIZE).await;
431      if meta.len() > META_MAX_SIZE {
432        panic!("node meta data provided is longer than the limit");
433      }
434      meta
435    } else {
436      Meta::empty()
437    };
438
439    // Get the existing node
440    // unwrap safe here this is self
441    let node = {
442      let members = self.inner.nodes.read().await;
443
444      let idx = *members.node_map.get(&self.inner.id).unwrap();
445
446      let state = &members.nodes[idx].state;
447      Node::new(state.id().cheap_clone(), state.address().cheap_clone())
448    };
449
450    // Format a new alive message
451    let alive = Alive::new(self.next_incarnation(), node)
452      .with_meta(meta)
453      .with_protocol_version(self.inner.opts.protocol_version)
454      .with_delegate_version(self.inner.opts.delegate_version);
455    let (notify_tx, notify_rx) = async_channel::bounded(1);
456    self.alive_node(alive, Some(notify_tx), true).await;
457
458    // Wait for the broadcast or a timeout
459    if self.any_alive().await {
460      if timeout > Duration::ZERO {
461        let _ = <T::Runtime as RuntimeLite>::timeout(timeout, notify_rx.recv())
462          .await
463          .map_err(|_| Error::UpdateTimeout)?;
464      } else {
465        let _ = notify_rx.recv().await;
466      }
467    }
468
469    Ok(())
470  }
471
472  /// Uses the unreliable packet-oriented interface of the transport
473  /// to target a user message at the given node (this does not use the gossip
474  /// mechanism). The maximum size of the message depends on the configured
475  /// `packet_buffer_size` for this memberlist instance.
476  ///
477  /// See also [`send_reliable`](Memberlist::send_reliable).
478  #[inline]
479  pub async fn send(&self, to: &T::ResolvedAddress, msg: Bytes) -> Result<(), Error<T, D>> {
480    self.send_many(to, std::iter::once(msg)).await
481  }
482
483  /// Uses the unreliable packet-oriented interface of the transport
484  /// to target a user message at the given node (this does not use the gossip
485  /// mechanism). The maximum size of the message depends on the configured
486  /// `packet_buffer_size` for this memberlist instance.
487  #[inline]
488  pub async fn send_many(
489    &self,
490    to: &T::ResolvedAddress,
491    msgs: impl Iterator<Item = Bytes>,
492  ) -> Result<(), Error<T, D>> {
493    if self.has_left() || self.has_shutdown() {
494      return Err(Error::NotRunning);
495    }
496
497    let stream = self
498      .transport_send_packets(to, msgs.map(Message::UserData).collect::<OneOrMore<_>>())
499      .await;
500    futures::pin_mut!(stream);
501    match stream.next().await {
502      None => Ok(()),
503      Some(Ok(_)) => Ok(()),
504      Some(Err(e)) => Err(e),
505    }
506  }
507
508  /// Uses the reliable stream-oriented interface of the transport to
509  /// target a user message at the given node (this does not use the gossip
510  /// mechanism). Delivery is guaranteed if no error is returned, and there is no
511  /// limit on the size of the message.
512  ///
513  /// See also [`send_many_reliable`](Memberlist::send_many_reliable).
514  #[inline]
515  pub async fn send_reliable(
516    &self,
517    to: &T::ResolvedAddress,
518    msg: Bytes,
519  ) -> Result<(), Error<T, D>> {
520    self.send_many_reliable(to, std::iter::once(msg)).await
521  }
522
523  /// Uses the reliable stream-oriented interface of the transport to
524  /// target a user message at the given node (this does not use the gossip
525  /// mechanism). Delivery is guaranteed if no error is returned, and there is no
526  /// limit on the size of the message.
527  #[inline]
528  pub async fn send_many_reliable(
529    &self,
530    to: &T::ResolvedAddress,
531    msgs: impl Iterator<Item = Bytes>,
532  ) -> Result<(), Error<T, D>> {
533    if self.has_left() || self.has_shutdown() {
534      return Err(Error::NotRunning);
535    }
536    self
537      .send_user_msg(to, msgs.map(Message::UserData).collect())
538      .await
539  }
540
541  /// Initiates a ping to the node with the specified node.
542  pub async fn ping(&self, node: Node<T::Id, T::ResolvedAddress>) -> Result<Duration, Error<T, D>> {
543    // Prepare a ping message and setup an ack handler.
544    let self_addr = self.get_advertise();
545    let ping = Ping::new(
546      self.next_sequence_number(),
547      Node::new(self.inner.transport.local_id().clone(), self_addr.clone()),
548      node.clone(),
549    );
550
551    let (ack_tx, ack_rx) = async_channel::bounded(self.inner.opts.indirect_checks + 1);
552    self.inner.ack_manager.set_probe_channels(
553      ping.sequence_number(),
554      ack_tx,
555      None,
556      <T::Runtime as RuntimeLite>::now(),
557      self.inner.opts.probe_interval,
558    );
559
560    // Send a ping to the node.
561    // Wait to send or timeout.
562    match <T::Runtime as RuntimeLite>::timeout(self.inner.opts.probe_timeout, async {
563      let stream = self.send_packets(node.address(), ping.into()).await;
564      futures::pin_mut!(stream);
565      let errs = stream.collect::<OneOrMore<_>>().await;
566      let num_errs = errs.len();
567
568      match num_errs {
569        0 => Ok(()),
570        _ => match errs.into_either() {
571          either::Either::Left([e]) => Err(e),
572          either::Either::Right(e) => Err(Error::Multiple(e.into_vec().into())),
573        },
574      }
575    })
576    .await
577    {
578      Ok(Ok(())) => {}
579      Ok(Err(e)) => return Err(e),
580      Err(_) => {
581        // If we timed out, return Error.
582        tracing::debug!(
583          "memberlist: failed ping {} by packet (timeout reached)",
584          node
585        );
586        return Err(Error::Lost(node));
587      }
588    }
589
590    // Mark the sent time here, which should be after any pre-processing and
591    // system calls to do the actual send. This probably under-reports a bit,
592    // but it's the best we can do.
593    let sent = <T::Runtime as RuntimeLite>::now();
594
595    // Wait for response or timeout.
596    futures::select! {
597      v = ack_rx.recv().fuse() => {
598        // If we got a response, update the RTT.
599        if let Ok(AckMessage { complete, .. }) = v {
600          if complete {
601            return Ok(sent.elapsed());
602          }
603        }
604      }
605      _ = <T::Runtime as RuntimeLite>::sleep(self.inner.opts.probe_timeout).fuse() => {}
606    }
607
608    // If we timed out, return Error.
609    tracing::debug!(
610      "memberlist: failed ping {} by packet (timeout reached)",
611      node
612    );
613    Err(Error::Lost(node))
614  }
615
616  /// Stop any background maintenance of network activity
617  /// for this memberlist, causing it to appear "dead". A leave message
618  /// will not be broadcasted prior, so the cluster being left will have
619  /// to detect this node's shutdown using probing. If you wish to more
620  /// gracefully exit the cluster, call Leave prior to shutting down.
621  ///
622  /// This method is safe to call multiple times.
623  pub async fn shutdown(&self) -> Result<(), Error<T, D>> {
624    self.inner.shutdown().await.map_err(Error::Transport)
625  }
626}