serf_core/serf/
api.rs

1use std::sync::atomic::Ordering;
2
3use futures::{FutureExt, StreamExt};
4use memberlist_core::{
5  CheapClone,
6  bytes::Bytes,
7  proto::{Data, MaybeResolvedAddress, Meta, Node, OneOrMore, SmallVec},
8  tracing,
9};
10use smol_str::SmolStr;
11
12use crate::{
13  error::Error,
14  event::EventProducer,
15  types::{LeaveMessage, Member, Tags, UserEventMessage},
16};
17
18use super::*;
19
20impl<T> Serf<T>
21where
22  T: Transport,
23{
24  /// Creates a new Serf instance with the given transport and options.
25  pub async fn new(
26    transport: T::Options,
27    opts: Options,
28  ) -> Result<Self, Error<T, DefaultDelegate<T>>> {
29    Self::new_in(
30      None,
31      None,
32      transport,
33      opts,
34      #[cfg(any(test, feature = "test"))]
35      None,
36    )
37    .await
38  }
39
40  /// Creates a new Serf instance with the given transport and options.
41  pub async fn with_event_producer(
42    transport: T::Options,
43    opts: Options,
44    ev: EventProducer<T, DefaultDelegate<T>>,
45  ) -> Result<Self, Error<T, DefaultDelegate<T>>> {
46    Self::new_in(
47      Some(ev.tx),
48      None,
49      transport,
50      opts,
51      #[cfg(any(test, feature = "test"))]
52      None,
53    )
54    .await
55  }
56}
57
58impl<T, D> Serf<T, D>
59where
60  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
61  T: Transport,
62{
63  /// Creates a new Serf instance with the given transport and options.
64  pub async fn with_delegate(
65    transport: T::Options,
66    opts: Options,
67    delegate: D,
68  ) -> Result<Self, Error<T, D>> {
69    Self::new_in(
70      None,
71      Some(delegate),
72      transport,
73      opts,
74      #[cfg(any(test, feature = "test"))]
75      None,
76    )
77    .await
78  }
79
80  /// Creates a new Serf instance with the given transport, options, event sender, and delegate.
81  pub async fn with_event_producer_and_delegate(
82    transport: T::Options,
83    opts: Options,
84    ev: EventProducer<T, D>,
85    delegate: D,
86  ) -> Result<Self, Error<T, D>> {
87    Self::new_in(
88      Some(ev.tx),
89      Some(delegate),
90      transport,
91      opts,
92      #[cfg(any(test, feature = "test"))]
93      None,
94    )
95    .await
96  }
97
98  /// Returns the local node's ID
99  #[inline]
100  pub fn local_id(&self) -> &T::Id {
101    self.inner.memberlist.local_id()
102  }
103
104  /// Returns the local node's ID and the advertised address
105  #[inline]
106  pub fn advertise_node(&self) -> Node<T::Id, T::ResolvedAddress> {
107    self.inner.memberlist.advertise_node()
108  }
109
110  /// A predicate that determines whether or not encryption
111  /// is enabled, which can be possible in one of 2 cases:
112  ///   - Single encryption key passed at agent start (no persistence)
113  ///   - Keyring file provided at agent start
114  #[inline]
115  #[cfg(feature = "encryption")]
116  #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
117  pub fn encryption_enabled(&self) -> bool {
118    self.inner.memberlist.encryption_enabled()
119  }
120
121  /// Returns a receiver that can be used to wait for
122  /// Serf to shutdown.
123  #[inline]
124  pub fn shutdown_rx(&self) -> async_channel::Receiver<()> {
125    self.inner.shutdown_rx.clone()
126  }
127
128  /// The current state of this Serf instance.
129  #[inline]
130  pub fn state(&self) -> SerfState {
131    *self.inner.state.lock()
132  }
133
134  /// Returns a point-in-time snapshot of the members of this cluster.
135  #[inline]
136  pub async fn members(&self) -> OneOrMore<Member<T::Id, T::ResolvedAddress>> {
137    self
138      .inner
139      .members
140      .read()
141      .await
142      .states
143      .values()
144      .map(|s| s.member.cheap_clone())
145      .collect()
146  }
147
148  /// Used to provide operator debugging information
149  #[inline]
150  pub async fn stats(&self) -> Stats {
151    let (num_members, num_failed, num_left, health_score) = {
152      let members = self.inner.members.read().await;
153      let num_members = members.states.len();
154      let num_failed = members.failed_members.len();
155      let num_left = members.left_members.len();
156      let health_score = self.inner.memberlist.health_score();
157      (num_members, num_failed, num_left, health_score)
158    };
159
160    #[cfg(not(feature = "encryption"))]
161    let encrypted = false;
162    #[cfg(feature = "encryption")]
163    let encrypted = self.inner.memberlist.encryption_enabled();
164
165    Stats {
166      members: num_members,
167      failed: num_failed,
168      left: num_left,
169      health_score,
170      member_time: self.inner.clock.time().into(),
171      event_time: self.inner.event_clock.time().into(),
172      query_time: self.inner.query_clock.time().into(),
173      intent_queue: self.inner.broadcasts.num_queued().await,
174      event_queue: self.inner.event_broadcasts.num_queued().await,
175      query_queue: self.inner.query_broadcasts.num_queued().await,
176      encrypted,
177      coordinate_resets: self
178        .inner
179        .coord_core
180        .as_ref()
181        .map(|coord| coord.client.stats().resets),
182    }
183  }
184
185  /// Returns the number of nodes in the serf cluster, regardless of
186  /// their health or status.
187  #[inline]
188  pub async fn num_members(&self) -> usize {
189    self.inner.members.read().await.states.len()
190  }
191
192  /// Returns the key manager for the current serf instance
193  #[cfg(feature = "encryption")]
194  #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
195  #[inline]
196  pub fn key_manager(&self) -> &crate::key_manager::KeyManager<T, D> {
197    &self.inner.key_manager
198  }
199
200  /// Returns the Member information for the local node
201  #[inline]
202  pub async fn local_member(&self) -> Member<T::Id, T::ResolvedAddress> {
203    self
204      .inner
205      .members
206      .read()
207      .await
208      .states
209      .get(self.inner.memberlist.local_id())
210      .unwrap()
211      .member
212      .cheap_clone()
213  }
214
215  /// Used to dynamically update the tags associated with
216  /// the local node. This will propagate the change to the rest of
217  /// the cluster. Blocks until a the message is broadcast out.
218  #[inline]
219  pub async fn set_tags(&self, tags: Tags) -> Result<(), Error<T, D>> {
220    // Check that the meta data length is okay
221    let tags_encoded_len = tags.encoded_len_with_length_delimited();
222    if tags_encoded_len > Meta::MAX_SIZE {
223      return Err(Error::tags_too_large(tags_encoded_len));
224    }
225    // update the config
226    self.inner.opts.tags.store(Arc::new(tags));
227
228    // trigger a memberlist update
229    self
230      .inner
231      .memberlist
232      .update_node(self.inner.opts.broadcast_timeout)
233      .await
234      .map_err(From::from)
235  }
236
237  /// Used to broadcast a custom user event with a given
238  /// name and payload. If the configured size limit is exceeded and error will be returned.
239  /// If coalesce is enabled, nodes are allowed to coalesce this event.
240  #[inline]
241  pub async fn user_event(
242    &self,
243    name: impl Into<SmolStr>,
244    payload: impl Into<Bytes>,
245    coalesce: bool,
246  ) -> Result<(), Error<T, D>> {
247    let name: SmolStr = name.into();
248    let payload: Bytes = payload.into();
249    let payload_size_before_encoding = name.len() + payload.len();
250
251    // Check size before encoding to prevent needless encoding and return early if it's over the specified limit.
252    if payload_size_before_encoding > self.inner.opts.max_user_event_size {
253      return Err(Error::user_event_limit_too_large(
254        self.inner.opts.max_user_event_size,
255      ));
256    }
257
258    if payload_size_before_encoding > USER_EVENT_SIZE_LIMIT {
259      return Err(Error::user_event_too_large(USER_EVENT_SIZE_LIMIT));
260    }
261
262    // Create a message
263    let msg = UserEventMessage {
264      ltime: self.inner.event_clock.time(),
265      name: name.clone(),
266      payload,
267      cc: coalesce,
268    };
269
270    // Start broadcasting the event
271    let len = crate::types::encoded_message_len(&msg);
272
273    // Check the size after encoding to be sure again that
274    // we're not attempting to send over the specified size limit.
275    if len > self.inner.opts.max_user_event_size {
276      return Err(Error::raw_user_event_too_large(len));
277    }
278
279    if len > USER_EVENT_SIZE_LIMIT {
280      return Err(Error::raw_user_event_too_large(len));
281    }
282
283    let raw = crate::types::encode_message_to_bytes(&msg)?;
284
285    self.inner.event_clock.increment();
286
287    // Process update locally
288    self.handle_user_event(either::Either::Right(msg)).await;
289
290    self
291      .inner
292      .event_broadcasts
293      .queue_broadcast(SerfBroadcast {
294        msg: raw,
295        notify_tx: None,
296      })
297      .await;
298    Ok(())
299  }
300
301  /// Used to broadcast a new query. The query must be fairly small,
302  /// and an error will be returned if the size limit is exceeded. Query parameters are optional,
303  /// and if not provided, a sane set of defaults will be used.
304  pub async fn query(
305    &self,
306    name: impl Into<SmolStr>,
307    payload: impl Into<Bytes>,
308    params: Option<QueryParam<T::Id>>,
309  ) -> Result<QueryResponse<T::Id, T::ResolvedAddress>, Error<T, D>> {
310    self
311      .query_in(name.into(), payload.into(), params, None)
312      .await
313  }
314
315  /// Joins an existing Serf cluster. Returns the id of node
316  /// successfully contacted. If `ignore_old` is true, then any
317  /// user messages sent prior to the join will be ignored.
318  pub async fn join(
319    &self,
320    node: Node<T::Id, MaybeResolvedAddress<T::Address, T::ResolvedAddress>>,
321    ignore_old: bool,
322  ) -> Result<Node<T::Id, T::ResolvedAddress>, Error<T, D>> {
323    // Do a quick state check
324    let current_state = self.state();
325    if current_state != SerfState::Alive {
326      return Err(Error::bad_join_status(current_state));
327    }
328
329    // Hold the joinLock, this is to make eventJoinIgnore safe
330    let _join_lock = self.inner.join_lock.lock().await;
331
332    // Ignore any events from a potential join. This is safe since we hold
333    // the joinLock and nobody else can be doing a Join
334    if ignore_old {
335      self.inner.event_join_ignore.store(true, Ordering::SeqCst);
336    }
337
338    // Have memberlist attempt to join
339    match self.inner.memberlist.join(node).await {
340      Ok(node) => {
341        // Start broadcasting the update
342        if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
343          if ignore_old {
344            self.inner.event_join_ignore.store(false, Ordering::SeqCst);
345          }
346          return Err(e);
347        }
348        if ignore_old {
349          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
350        }
351
352        Ok(node)
353      }
354      Err(e) => {
355        if ignore_old {
356          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
357        }
358        Err(Error::from(e))
359      }
360    }
361  }
362
363  /// Joins an existing Serf cluster. Returns the id of nodes
364  /// successfully contacted. If `ignore_old` is true, then any
365  /// user messages sent prior to the join will be ignored.
366  pub async fn join_many(
367    &self,
368    existing: impl Iterator<Item = Node<T::Id, MaybeResolvedAddress<T::Address, T::ResolvedAddress>>>,
369    ignore_old: bool,
370  ) -> Result<
371    SmallVec<Node<T::Id, T::ResolvedAddress>>,
372    (SmallVec<Node<T::Id, T::ResolvedAddress>>, Error<T, D>),
373  > {
374    // Do a quick state check
375    let current_state = self.state();
376    if current_state != SerfState::Alive {
377      return Err((SmallVec::new(), Error::bad_join_status(current_state)));
378    }
379
380    // Hold the joinLock, this is to make eventJoinIgnore safe
381    let _join_lock = self.inner.join_lock.lock().await;
382
383    // Ignore any events from a potential join. This is safe since we hold
384    // the joinLock and nobody else can be doing a Join
385    if ignore_old {
386      self.inner.event_join_ignore.store(true, Ordering::SeqCst);
387    }
388
389    // Have memberlist attempt to join
390    match self.inner.memberlist.join_many(existing).await {
391      Ok(joined) => {
392        // Start broadcasting the update
393        if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
394          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
395          return Err((joined, e));
396        }
397        self.inner.event_join_ignore.store(false, Ordering::SeqCst);
398        Ok(joined)
399      }
400      Err((joined, err)) => {
401        // If we joined any nodes, broadcast the join message
402        if !joined.is_empty() {
403          // Start broadcasting the update
404          if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
405            self.inner.event_join_ignore.store(false, Ordering::SeqCst);
406            return Err((
407              joined,
408              Error::Multiple(std::sync::Arc::from_iter([err.into(), e])),
409            ));
410          }
411
412          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
413          Err((joined, Error::from(err)))
414        } else {
415          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
416          Err((joined, Error::from(err)))
417        }
418      }
419    }
420  }
421
422  /// Gracefully exits the cluster. It is safe to call this multiple
423  /// times.
424  /// If the Leave broadcast timeout, Leave() will try to finish the sequence as best effort.
425  pub async fn leave(&self) -> Result<(), Error<T, D>> {
426    // Check the current state
427    {
428      let mut s = self.inner.state.lock();
429      match *s {
430        SerfState::Left => return Ok(()),
431        SerfState::Leaving => return Err(Error::bad_leave_status(*s)),
432        SerfState::Shutdown => return Err(Error::bad_leave_status(*s)),
433        _ => {
434          // Set the state to leaving
435          *s = SerfState::Leaving;
436        }
437      }
438    }
439
440    // If we have a snapshot, mark we are leaving
441    if let Some(ref snap) = self.inner.snapshot {
442      snap.leave().await;
443    }
444
445    // Construct the message for the graceful leave
446    let msg = LeaveMessage {
447      ltime: self.inner.clock.time(),
448      id: self.inner.memberlist.local_id().cheap_clone(),
449      prune: false,
450    };
451
452    self.inner.clock.increment();
453
454    // Process the leave locally
455    self.handle_node_leave_intent(&msg).await;
456
457    // Only broadcast the leave message if there is at least one
458    // other node alive.
459    if self.has_alive_members().await {
460      let (notify_tx, notify_rx) = async_channel::bounded(1);
461      let msg = crate::types::encode_message_to_bytes(&msg)?;
462      self.broadcast(msg, Some(notify_tx)).await?;
463
464      futures::select! {
465        _ = notify_rx.recv().fuse() => {
466          // We got a response, so we are done
467        }
468        _ = <T::Runtime as RuntimeLite>::sleep(self.inner.opts.broadcast_timeout).fuse() => {
469          tracing::warn!("serf: timeout while waiting for graceful leave");
470        }
471      }
472    }
473
474    // Attempt the memberlist leave
475    if let Err(e) = self
476      .inner
477      .memberlist
478      .leave(self.inner.opts.broadcast_timeout)
479      .await
480    {
481      tracing::warn!("serf: timeout waiting for leave broadcast: {}", e);
482    }
483
484    // Wait for the leave to propagate through the cluster. The broadcast
485    // timeout is how long we wait for the message to go out from our own
486    // queue, but this wait is for that message to propagate through the
487    // cluster. In particular, we want to stay up long enough to service
488    // any probes from other nodes before they learn about us leaving.
489    <T::Runtime as RuntimeLite>::sleep(self.inner.opts.leave_propagate_delay).await;
490
491    // Transition to Left only if we not already shutdown
492    {
493      let mut s = self.inner.state.lock();
494      match *s {
495        SerfState::Shutdown => {}
496        _ => {
497          *s = SerfState::Left;
498        }
499      }
500    }
501    Ok(())
502  }
503
504  /// Forcibly removes a failed node from the cluster
505  /// immediately, instead of waiting for the reaper to eventually reclaim it.
506  /// This also has the effect that Serf will no longer attempt to reconnect
507  /// to this node.
508  pub async fn remove_failed_node(&self, id: T::Id) -> Result<(), Error<T, D>> {
509    self.force_leave(id, false).await
510  }
511
512  /// Forcibly removes a failed node from the cluster
513  /// immediately, instead of waiting for the reaper to eventually reclaim it.
514  /// This also has the effect that Serf will no longer attempt to reconnect
515  /// to this node.
516  pub async fn remove_failed_node_prune(&self, id: T::Id) -> Result<(), Error<T, D>> {
517    self.force_leave(id, true).await
518  }
519
520  /// Forcefully shuts down the Serf instance, stopping all network
521  /// activity and background maintenance associated with the instance.
522  ///
523  /// This is not a graceful shutdown, and should be preceded by a call
524  /// to Leave. Otherwise, other nodes in the cluster will detect this node's
525  /// exit as a node failure.
526  ///
527  /// It is safe to call this method multiple times.
528  pub async fn shutdown(&self) -> Result<(), Error<T, D>> {
529    {
530      let mut s = self.inner.state.lock();
531      match *s {
532        SerfState::Shutdown => return Ok(()),
533        SerfState::Left => {}
534        _ => {
535          tracing::warn!("serf: shutdown without a leave");
536        }
537      }
538
539      // Wait to close the shutdown channel until after we've shut down the
540      // memberlist and its associated network resources, since the shutdown
541      // channel signals that we are cleaned up outside of Serf.
542      *s = SerfState::Shutdown;
543    }
544    self.inner.memberlist.shutdown().await?;
545    self.inner.shutdown_tx.close();
546
547    // Wait for the snapshoter to finish if we have one
548    if let Some(ref snap) = self.inner.snapshot {
549      snap.wait().await;
550    }
551
552    loop {
553      if let Ok(mut handles) = self.inner.handles.try_borrow_mut() {
554        let mut futs = core::mem::take(&mut *handles);
555        while futs.next().await.is_some() {}
556        break;
557      }
558    }
559
560    Ok(())
561  }
562
563  /// Returns the network coordinate of the local node.
564  pub fn cooridate(&self) -> Result<Coordinate, Error<T, D>> {
565    if let Some(ref coord) = self.inner.coord_core {
566      return Ok(coord.client.get_coordinate());
567    }
568
569    Err(Error::coordinates_disabled())
570  }
571
572  /// Returns the network coordinate for the node with the given
573  /// name. This will only be valid if `disable_coordinates` is set to `false`.
574  pub fn cached_coordinate(&self, id: &T::Id) -> Result<Option<Coordinate>, Error<T, D>> {
575    if let Some(ref coord) = self.inner.coord_core {
576      return Ok(coord.cache.read().get(id).cloned());
577    }
578
579    Err(Error::coordinates_disabled())
580  }
581
582  /// Returns the underlying [`Memberlist`] instance
583  #[inline]
584  pub fn memberlist(&self) -> &Memberlist<T, SerfDelegate<T, D>> {
585    &self.inner.memberlist
586  }
587}
588
589#[viewit::viewit(vis_all = "", getters(vis_all = "pub", prefix = "get"), setters(skip))]
590#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
591pub struct Stats {
592  members: usize,
593  failed: usize,
594  left: usize,
595  health_score: usize,
596  member_time: u64,
597  event_time: u64,
598  query_time: u64,
599  intent_queue: usize,
600  event_queue: usize,
601  query_queue: usize,
602  encrypted: bool,
603  #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
604  coordinate_resets: Option<usize>,
605}