serf_core/serf/
api.rs

1use std::sync::atomic::Ordering;
2
3use futures::{FutureExt, StreamExt};
4use memberlist_core::{
5  bytes::{BufMut, Bytes, BytesMut},
6  tracing,
7  transport::{MaybeResolvedAddress, Node},
8  types::{Meta, OneOrMore, SmallVec},
9  CheapClone,
10};
11use smol_str::SmolStr;
12
13use crate::{
14  delegate::TransformDelegate,
15  error::{Error, JoinError},
16  event::EventProducer,
17  types::{LeaveMessage, Member, MessageType, SerfMessage, Tags, UserEventMessage},
18};
19
20use super::*;
21
22impl<T> Serf<T>
23where
24  T: Transport,
25{
26  /// Creates a new Serf instance with the given transport and options.
27  pub async fn new(
28    transport: T::Options,
29    opts: Options,
30  ) -> Result<Self, Error<T, DefaultDelegate<T>>> {
31    Self::new_in(
32      None,
33      None,
34      transport,
35      opts,
36      #[cfg(any(test, feature = "test"))]
37      None,
38    )
39    .await
40  }
41
42  /// Creates a new Serf instance with the given transport and options.
43  pub async fn with_event_producer(
44    transport: T::Options,
45    opts: Options,
46    ev: EventProducer<T, DefaultDelegate<T>>,
47  ) -> Result<Self, Error<T, DefaultDelegate<T>>> {
48    Self::new_in(
49      Some(ev.tx),
50      None,
51      transport,
52      opts,
53      #[cfg(any(test, feature = "test"))]
54      None,
55    )
56    .await
57  }
58}
59
60impl<T, D> Serf<T, D>
61where
62  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
63  T: Transport,
64{
65  /// Creates a new Serf instance with the given transport and options.
66  pub async fn with_delegate(
67    transport: T::Options,
68    opts: Options,
69    delegate: D,
70  ) -> Result<Self, Error<T, D>> {
71    Self::new_in(
72      None,
73      Some(delegate),
74      transport,
75      opts,
76      #[cfg(any(test, feature = "test"))]
77      None,
78    )
79    .await
80  }
81
82  /// Creates a new Serf instance with the given transport, options, event sender, and delegate.
83  pub async fn with_event_producer_and_delegate(
84    transport: T::Options,
85    opts: Options,
86    ev: EventProducer<T, D>,
87    delegate: D,
88  ) -> Result<Self, Error<T, D>> {
89    Self::new_in(
90      Some(ev.tx),
91      Some(delegate),
92      transport,
93      opts,
94      #[cfg(any(test, feature = "test"))]
95      None,
96    )
97    .await
98  }
99
100  /// Returns the local node's ID
101  #[inline]
102  pub fn local_id(&self) -> &T::Id {
103    self.inner.memberlist.local_id()
104  }
105
106  /// Returns the local node's ID and the advertised address
107  #[inline]
108  pub fn advertise_node(&self) -> Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress> {
109    self.inner.memberlist.advertise_node()
110  }
111
112  /// A predicate that determines whether or not encryption
113  /// is enabled, which can be possible in one of 2 cases:
114  ///   - Single encryption key passed at agent start (no persistence)
115  ///   - Keyring file provided at agent start
116  #[inline]
117  #[cfg(feature = "encryption")]
118  #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
119  pub fn encryption_enabled(&self) -> bool {
120    self.inner.memberlist.encryption_enabled()
121  }
122
123  /// Returns a receiver that can be used to wait for
124  /// Serf to shutdown.
125  #[inline]
126  pub fn shutdown_rx(&self) -> async_channel::Receiver<()> {
127    self.inner.shutdown_rx.clone()
128  }
129
130  /// The current state of this Serf instance.
131  #[inline]
132  pub fn state(&self) -> SerfState {
133    *self.inner.state.lock()
134  }
135
136  /// Returns a point-in-time snapshot of the members of this cluster.
137  #[inline]
138  pub async fn members(
139    &self,
140  ) -> OneOrMore<Member<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>> {
141    self
142      .inner
143      .members
144      .read()
145      .await
146      .states
147      .values()
148      .map(|s| s.member.cheap_clone())
149      .collect()
150  }
151
152  /// Used to provide operator debugging information
153  #[inline]
154  pub async fn stats(&self) -> Stats {
155    let (num_members, num_failed, num_left, health_score) = {
156      let members = self.inner.members.read().await;
157      let num_members = members.states.len();
158      let num_failed = members.failed_members.len();
159      let num_left = members.left_members.len();
160      let health_score = self.inner.memberlist.health_score();
161      (num_members, num_failed, num_left, health_score)
162    };
163
164    #[cfg(not(feature = "encryption"))]
165    let encrypted = false;
166    #[cfg(feature = "encryption")]
167    let encrypted = self.inner.memberlist.encryption_enabled();
168
169    Stats {
170      members: num_members,
171      failed: num_failed,
172      left: num_left,
173      health_score,
174      member_time: self.inner.clock.time().into(),
175      event_time: self.inner.event_clock.time().into(),
176      query_time: self.inner.query_clock.time().into(),
177      intent_queue: self.inner.broadcasts.num_queued().await,
178      event_queue: self.inner.event_broadcasts.num_queued().await,
179      query_queue: self.inner.query_broadcasts.num_queued().await,
180      encrypted,
181      coordinate_resets: self
182        .inner
183        .coord_core
184        .as_ref()
185        .map(|coord| coord.client.stats().resets),
186    }
187  }
188
189  /// Returns the number of nodes in the serf cluster, regardless of
190  /// their health or status.
191  #[inline]
192  pub async fn num_members(&self) -> usize {
193    self.inner.members.read().await.states.len()
194  }
195
196  /// Returns the key manager for the current serf instance
197  #[cfg(feature = "encryption")]
198  #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
199  #[inline]
200  pub fn key_manager(&self) -> &crate::key_manager::KeyManager<T, D> {
201    &self.inner.key_manager
202  }
203
204  /// Returns the Member information for the local node
205  #[inline]
206  pub async fn local_member(
207    &self,
208  ) -> Member<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress> {
209    self
210      .inner
211      .members
212      .read()
213      .await
214      .states
215      .get(self.inner.memberlist.local_id())
216      .unwrap()
217      .member
218      .cheap_clone()
219  }
220
221  /// Used to dynamically update the tags associated with
222  /// the local node. This will propagate the change to the rest of
223  /// the cluster. Blocks until a the message is broadcast out.
224  #[inline]
225  pub async fn set_tags(&self, tags: Tags) -> Result<(), Error<T, D>> {
226    // Check that the meta data length is okay
227    let tags_encoded_len = <D as TransformDelegate>::tags_encoded_len(&tags);
228    if tags_encoded_len > Meta::MAX_SIZE {
229      return Err(Error::tags_too_large(tags_encoded_len));
230    }
231    // update the config
232    self.inner.opts.tags.store(Arc::new(tags));
233
234    // trigger a memberlist update
235    self
236      .inner
237      .memberlist
238      .update_node(self.inner.opts.broadcast_timeout)
239      .await
240      .map_err(From::from)
241  }
242
243  /// Used to broadcast a custom user event with a given
244  /// name and payload. If the configured size limit is exceeded and error will be returned.
245  /// If coalesce is enabled, nodes are allowed to coalesce this event.
246  #[inline]
247  pub async fn user_event(
248    &self,
249    name: impl Into<SmolStr>,
250    payload: impl Into<Bytes>,
251    coalesce: bool,
252  ) -> Result<(), Error<T, D>> {
253    let name: SmolStr = name.into();
254    let payload: Bytes = payload.into();
255    let payload_size_before_encoding = name.len() + payload.len();
256
257    // Check size before encoding to prevent needless encoding and return early if it's over the specified limit.
258    if payload_size_before_encoding > self.inner.opts.max_user_event_size {
259      return Err(Error::user_event_limit_too_large(
260        self.inner.opts.max_user_event_size,
261      ));
262    }
263
264    if payload_size_before_encoding > USER_EVENT_SIZE_LIMIT {
265      return Err(Error::user_event_too_large(USER_EVENT_SIZE_LIMIT));
266    }
267
268    // Create a message
269    let msg = UserEventMessage {
270      ltime: self.inner.event_clock.time(),
271      name: name.clone(),
272      payload,
273      cc: coalesce,
274    };
275
276    // Start broadcasting the event
277    let len = <D as TransformDelegate>::message_encoded_len(&msg);
278
279    // Check the size after encoding to be sure again that
280    // we're not attempting to send over the specified size limit.
281    if len > self.inner.opts.max_user_event_size {
282      return Err(Error::raw_user_event_too_large(len));
283    }
284
285    if len > USER_EVENT_SIZE_LIMIT {
286      return Err(Error::raw_user_event_too_large(len));
287    }
288
289    let mut raw = BytesMut::with_capacity(len + 1); // + 1 for message type byte
290    raw.put_u8(MessageType::UserEvent as u8);
291    raw.resize(len + 1, 0);
292
293    let actual_encoded_len = <D as TransformDelegate>::encode_message(&msg, &mut raw[1..])
294      .map_err(Error::transform_delegate)?;
295    debug_assert_eq!(
296      actual_encoded_len, len,
297      "expected encoded len {} mismatch the actual encoded len {}",
298      len, actual_encoded_len
299    );
300
301    self.inner.event_clock.increment();
302
303    // Process update locally
304    self.handle_user_event(msg).await;
305
306    self
307      .inner
308      .event_broadcasts
309      .queue_broadcast(SerfBroadcast {
310        msg: raw.freeze(),
311        notify_tx: None,
312      })
313      .await;
314    Ok(())
315  }
316
317  /// Used to broadcast a new query. The query must be fairly small,
318  /// and an error will be returned if the size limit is exceeded. This is only
319  /// available with protocol version 4 and newer. Query parameters are optional,
320  /// and if not provided, a sane set of defaults will be used.
321  pub async fn query(
322    &self,
323    name: impl Into<SmolStr>,
324    payload: impl Into<Bytes>,
325    params: Option<QueryParam<T::Id>>,
326  ) -> Result<QueryResponse<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>, Error<T, D>>
327  {
328    self
329      .query_in(name.into(), payload.into(), params, None)
330      .await
331  }
332
333  /// Joins an existing Serf cluster. Returns the id of node
334  /// successfully contacted. If `ignore_old` is true, then any
335  /// user messages sent prior to the join will be ignored.
336  pub async fn join(
337    &self,
338    node: Node<T::Id, MaybeResolvedAddress<T>>,
339    ignore_old: bool,
340  ) -> Result<Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>, Error<T, D>> {
341    // Do a quick state check
342    let current_state = self.state();
343    if current_state != SerfState::Alive {
344      return Err(Error::bad_join_status(current_state));
345    }
346
347    // Hold the joinLock, this is to make eventJoinIgnore safe
348    let _join_lock = self.inner.join_lock.lock().await;
349
350    // Ignore any events from a potential join. This is safe since we hold
351    // the joinLock and nobody else can be doing a Join
352    if ignore_old {
353      self.inner.event_join_ignore.store(true, Ordering::SeqCst);
354    }
355
356    // Have memberlist attempt to join
357    match self.inner.memberlist.join(node).await {
358      Ok(node) => {
359        // Start broadcasting the update
360        if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
361          if ignore_old {
362            self.inner.event_join_ignore.store(false, Ordering::SeqCst);
363          }
364          return Err(e);
365        }
366        if ignore_old {
367          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
368        }
369
370        Ok(node)
371      }
372      Err(e) => {
373        if ignore_old {
374          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
375        }
376        Err(Error::from(e))
377      }
378    }
379  }
380
381  /// Joins an existing Serf cluster. Returns the id of nodes
382  /// successfully contacted. If `ignore_old` is true, then any
383  /// user messages sent prior to the join will be ignored.
384  pub async fn join_many(
385    &self,
386    existing: impl Iterator<Item = Node<T::Id, MaybeResolvedAddress<T>>>,
387    ignore_old: bool,
388  ) -> Result<
389    SmallVec<Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>,
390    JoinError<T, D>,
391  > {
392    // Do a quick state check
393    let current_state = self.state();
394    if current_state != SerfState::Alive {
395      return Err(JoinError {
396        joined: SmallVec::new(),
397        errors: existing
398          .into_iter()
399          .map(|node| (node, Error::bad_join_status(current_state)))
400          .collect(),
401        broadcast_error: None,
402      });
403    }
404
405    // Hold the joinLock, this is to make eventJoinIgnore safe
406    let _join_lock = self.inner.join_lock.lock().await;
407
408    // Ignore any events from a potential join. This is safe since we hold
409    // the joinLock and nobody else can be doing a Join
410    if ignore_old {
411      self.inner.event_join_ignore.store(true, Ordering::SeqCst);
412    }
413
414    // Have memberlist attempt to join
415    match self.inner.memberlist.join_many(existing).await {
416      Ok(joined) => {
417        // Start broadcasting the update
418        if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
419          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
420          return Err(JoinError {
421            joined,
422            errors: Default::default(),
423            broadcast_error: Some(e),
424          });
425        }
426        self.inner.event_join_ignore.store(false, Ordering::SeqCst);
427        Ok(joined)
428      }
429      Err(e) => {
430        let (joined, errors) = e.into();
431        // If we joined any nodes, broadcast the join message
432        if !joined.is_empty() {
433          // Start broadcasting the update
434          if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
435            self.inner.event_join_ignore.store(false, Ordering::SeqCst);
436            return Err(JoinError {
437              joined,
438              errors: errors
439                .into_iter()
440                .map(|(addr, err)| (addr, err.into()))
441                .collect(),
442              broadcast_error: Some(e),
443            });
444          }
445
446          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
447          Err(JoinError {
448            joined,
449            errors: errors
450              .into_iter()
451              .map(|(addr, err)| (addr, err.into()))
452              .collect(),
453            broadcast_error: None,
454          })
455        } else {
456          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
457          Err(JoinError {
458            joined,
459            errors: errors
460              .into_iter()
461              .map(|(addr, err)| (addr, err.into()))
462              .collect(),
463            broadcast_error: None,
464          })
465        }
466      }
467    }
468  }
469
470  /// Gracefully exits the cluster. It is safe to call this multiple
471  /// times.
472  /// If the Leave broadcast timeout, Leave() will try to finish the sequence as best effort.
473  pub async fn leave(&self) -> Result<(), Error<T, D>> {
474    // Check the current state
475    {
476      let mut s = self.inner.state.lock();
477      match *s {
478        SerfState::Left => return Ok(()),
479        SerfState::Leaving => return Err(Error::bad_leave_status(*s)),
480        SerfState::Shutdown => return Err(Error::bad_leave_status(*s)),
481        _ => {
482          // Set the state to leaving
483          *s = SerfState::Leaving;
484        }
485      }
486    }
487
488    // If we have a snapshot, mark we are leaving
489    if let Some(ref snap) = self.inner.snapshot {
490      snap.leave().await;
491    }
492
493    // Construct the message for the graceful leave
494    let msg = LeaveMessage {
495      ltime: self.inner.clock.time(),
496      id: self.inner.memberlist.local_id().cheap_clone(),
497      prune: false,
498    };
499
500    self.inner.clock.increment();
501
502    // Process the leave locally
503    self.handle_node_leave_intent(&msg).await;
504
505    let msg = SerfMessage::Leave(msg);
506
507    // Only broadcast the leave message if there is at least one
508    // other node alive.
509    if self.has_alive_members().await {
510      let (notify_tx, notify_rx) = async_channel::bounded(1);
511      self.broadcast(msg, Some(notify_tx)).await?;
512
513      futures::select! {
514        _ = notify_rx.recv().fuse() => {
515          // We got a response, so we are done
516        }
517        _ = <T::Runtime as RuntimeLite>::sleep(self.inner.opts.broadcast_timeout).fuse() => {
518          tracing::warn!("serf: timeout while waiting for graceful leave");
519        }
520      }
521    }
522
523    // Attempt the memberlist leave
524    if let Err(e) = self
525      .inner
526      .memberlist
527      .leave(self.inner.opts.broadcast_timeout)
528      .await
529    {
530      tracing::warn!("serf: timeout waiting for leave broadcast: {}", e);
531    }
532
533    // Wait for the leave to propagate through the cluster. The broadcast
534    // timeout is how long we wait for the message to go out from our own
535    // queue, but this wait is for that message to propagate through the
536    // cluster. In particular, we want to stay up long enough to service
537    // any probes from other nodes before they learn about us leaving.
538    <T::Runtime as RuntimeLite>::sleep(self.inner.opts.leave_propagate_delay).await;
539
540    // Transition to Left only if we not already shutdown
541    {
542      let mut s = self.inner.state.lock();
543      match *s {
544        SerfState::Shutdown => {}
545        _ => {
546          *s = SerfState::Left;
547        }
548      }
549    }
550    Ok(())
551  }
552
553  /// Forcibly removes a failed node from the cluster
554  /// immediately, instead of waiting for the reaper to eventually reclaim it.
555  /// This also has the effect that Serf will no longer attempt to reconnect
556  /// to this node.
557  pub async fn remove_failed_node(&self, id: T::Id) -> Result<(), Error<T, D>> {
558    self.force_leave(id, false).await
559  }
560
561  /// Forcibly removes a failed node from the cluster
562  /// immediately, instead of waiting for the reaper to eventually reclaim it.
563  /// This also has the effect that Serf will no longer attempt to reconnect
564  /// to this node.
565  pub async fn remove_failed_node_prune(&self, id: T::Id) -> Result<(), Error<T, D>> {
566    self.force_leave(id, true).await
567  }
568
569  /// Forcefully shuts down the Serf instance, stopping all network
570  /// activity and background maintenance associated with the instance.
571  ///
572  /// This is not a graceful shutdown, and should be preceded by a call
573  /// to Leave. Otherwise, other nodes in the cluster will detect this node's
574  /// exit as a node failure.
575  ///
576  /// It is safe to call this method multiple times.
577  pub async fn shutdown(&self) -> Result<(), Error<T, D>> {
578    {
579      let mut s = self.inner.state.lock();
580      match *s {
581        SerfState::Shutdown => return Ok(()),
582        SerfState::Left => {}
583        _ => {
584          tracing::warn!("serf: shutdown without a leave");
585        }
586      }
587
588      // Wait to close the shutdown channel until after we've shut down the
589      // memberlist and its associated network resources, since the shutdown
590      // channel signals that we are cleaned up outside of Serf.
591      *s = SerfState::Shutdown;
592    }
593    self.inner.memberlist.shutdown().await?;
594    self.inner.shutdown_tx.close();
595
596    // Wait for the snapshoter to finish if we have one
597    if let Some(ref snap) = self.inner.snapshot {
598      snap.wait().await;
599    }
600
601    loop {
602      if let Ok(mut handles) = self.inner.handles.try_borrow_mut() {
603        let mut futs = core::mem::take(&mut *handles);
604        while futs.next().await.is_some() {}
605        break;
606      }
607    }
608
609    Ok(())
610  }
611
612  /// Returns the network coordinate of the local node.
613  pub fn cooridate(&self) -> Result<Coordinate, Error<T, D>> {
614    if let Some(ref coord) = self.inner.coord_core {
615      return Ok(coord.client.get_coordinate());
616    }
617
618    Err(Error::coordinates_disabled())
619  }
620
621  /// Returns the network coordinate for the node with the given
622  /// name. This will only be valid if `disable_coordinates` is set to `false`.
623  pub fn cached_coordinate(&self, id: &T::Id) -> Result<Option<Coordinate>, Error<T, D>> {
624    if let Some(ref coord) = self.inner.coord_core {
625      return Ok(coord.cache.read().get(id).cloned());
626    }
627
628    Err(Error::coordinates_disabled())
629  }
630
631  /// Returns the underlying [`Memberlist`] instance
632  #[inline]
633  pub fn memberlist(&self) -> &Memberlist<T, SerfDelegate<T, D>> {
634    &self.inner.memberlist
635  }
636}
637
638#[viewit::viewit(vis_all = "", getters(vis_all = "pub", prefix = "get"), setters(skip))]
639#[cfg_attr(feature = "async-graphql", derive(async_graphql::SimpleObject))]
640#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
641pub struct Stats {
642  members: usize,
643  failed: usize,
644  left: usize,
645  health_score: usize,
646  member_time: u64,
647  event_time: u64,
648  query_time: u64,
649  intent_queue: usize,
650  event_queue: usize,
651  query_queue: usize,
652  encrypted: bool,
653  #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
654  coordinate_resets: Option<usize>,
655}