Skip to main content

serf_core/serf/
api.rs

1use std::sync::atomic::Ordering;
2
3use futures::{FutureExt, StreamExt};
4use memberlist_core::{
5  CheapClone,
6  bytes::Bytes,
7  proto::{Data, MaybeResolvedAddress, Meta, Node, OneOrMore, SmallVec},
8  tracing,
9};
10use smol_str::SmolStr;
11
12use crate::{
13  error::Error,
14  event::EventProducer,
15  types::{LeaveMessage, Member, Tags, UserEventMessage},
16};
17
18use super::*;
19
20impl<T> Serf<T>
21where
22  T: Transport,
23{
24  /// Creates a new Serf instance with the given transport and options.
25  pub async fn new(
26    transport: T::Options,
27    opts: Options,
28  ) -> Result<Self, Error<T, DefaultDelegate<T>>> {
29    Self::new_in(
30      None,
31      None,
32      transport,
33      opts,
34      #[cfg(any(test, feature = "test"))]
35      None,
36    )
37    .await
38  }
39
40  /// Creates a new Serf instance with the given transport and options.
41  pub async fn with_event_producer(
42    transport: T::Options,
43    opts: Options,
44    ev: EventProducer<T, DefaultDelegate<T>>,
45  ) -> Result<Self, Error<T, DefaultDelegate<T>>> {
46    Self::new_in(
47      Some(ev.tx),
48      None,
49      transport,
50      opts,
51      #[cfg(any(test, feature = "test"))]
52      None,
53    )
54    .await
55  }
56}
57
58impl<T, D> Serf<T, D>
59where
60  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
61  T: Transport,
62{
63  /// Creates a new Serf instance with the given transport and options.
64  pub async fn with_delegate(
65    transport: T::Options,
66    opts: Options,
67    delegate: D,
68  ) -> Result<Self, Error<T, D>> {
69    Self::new_in(
70      None,
71      Some(delegate),
72      transport,
73      opts,
74      #[cfg(any(test, feature = "test"))]
75      None,
76    )
77    .await
78  }
79
80  /// Creates a new Serf instance with the given transport, options, event sender, and delegate.
81  pub async fn with_event_producer_and_delegate(
82    transport: T::Options,
83    opts: Options,
84    ev: EventProducer<T, D>,
85    delegate: D,
86  ) -> Result<Self, Error<T, D>> {
87    Self::new_in(
88      Some(ev.tx),
89      Some(delegate),
90      transport,
91      opts,
92      #[cfg(any(test, feature = "test"))]
93      None,
94    )
95    .await
96  }
97
98  /// Returns the local node's ID
99  #[inline]
100  pub fn local_id(&self) -> &T::Id {
101    self.inner.memberlist.local_id()
102  }
103
104  /// Returns the local node's ID and the advertised address
105  #[inline]
106  pub fn advertise_node(&self) -> Node<T::Id, T::ResolvedAddress> {
107    self.inner.memberlist.advertise_node()
108  }
109
110  /// A predicate that determines whether or not encryption
111  /// is enabled, which can be possible in one of 2 cases:
112  ///   - Single encryption key passed at agent start (no persistence)
113  ///   - Keyring file provided at agent start
114  #[inline]
115  #[cfg(feature = "encryption")]
116  #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
117  pub fn encryption_enabled(&self) -> bool {
118    self.inner.memberlist.encryption_enabled()
119  }
120
121  /// Returns a receiver that can be used to wait for
122  /// Serf to shutdown.
123  #[inline]
124  pub fn shutdown_rx(&self) -> async_channel::Receiver<()> {
125    self.inner.shutdown_rx.clone()
126  }
127
128  /// The current state of this Serf instance.
129  #[inline]
130  pub fn state(&self) -> SerfState {
131    *self.inner.state.lock()
132  }
133
134  /// Returns a point-in-time snapshot of the members of this cluster.
135  #[inline]
136  pub async fn members(&self) -> OneOrMore<Member<T::Id, T::ResolvedAddress>> {
137    self
138      .inner
139      .members
140      .read()
141      .await
142      .states
143      .values()
144      .map(|s| s.member.cheap_clone())
145      .collect()
146  }
147
148  /// Used to provide operator debugging information
149  #[inline]
150  pub async fn stats(&self) -> Stats {
151    let (num_members, num_failed, num_left, health_score) = {
152      let members = self.inner.members.read().await;
153      let num_members = members.states.len();
154      let num_failed = members.failed_members.len();
155      let num_left = members.left_members.len();
156      let health_score = self.inner.memberlist.health_score();
157      (num_members, num_failed, num_left, health_score)
158    };
159
160    #[cfg(not(feature = "encryption"))]
161    let encrypted = false;
162    #[cfg(feature = "encryption")]
163    let encrypted = self.inner.memberlist.encryption_enabled();
164
165    Stats {
166      members: num_members,
167      failed: num_failed,
168      left: num_left,
169      health_score,
170      member_time: self.inner.clock.time().into(),
171      event_time: self.inner.event_clock.time().into(),
172      query_time: self.inner.query_clock.time().into(),
173      intent_queue: self.inner.broadcasts.num_queued().await,
174      event_queue: self.inner.event_broadcasts.num_queued().await,
175      query_queue: self.inner.query_broadcasts.num_queued().await,
176      encrypted,
177      coordinate_resets: self
178        .inner
179        .coord_core
180        .as_ref()
181        .map(|coord| coord.client.stats().resets),
182    }
183  }
184
185  /// Returns the number of nodes in the serf cluster, regardless of
186  /// their health or status.
187  #[inline]
188  pub async fn num_members(&self) -> usize {
189    self.inner.members.read().await.states.len()
190  }
191
192  /// Returns the key manager for the current serf instance
193  #[cfg(feature = "encryption")]
194  #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
195  #[inline]
196  pub fn key_manager(&self) -> &crate::key_manager::KeyManager<T, D> {
197    &self.inner.key_manager
198  }
199
200  /// Returns the Member information for the local node
201  #[inline]
202  pub async fn local_member(&self) -> Member<T::Id, T::ResolvedAddress> {
203    self
204      .inner
205      .members
206      .read()
207      .await
208      .states
209      .get(self.inner.memberlist.local_id())
210      .unwrap()
211      .member
212      .cheap_clone()
213  }
214
215  /// Used to dynamically update the tags associated with
216  /// the local node. This will propagate the change to the rest of
217  /// the cluster. Blocks until a the message is broadcast out.
218  #[inline]
219  pub async fn set_tags(&self, tags: Tags) -> Result<(), Error<T, D>> {
220    // Check that the meta data length is okay
221    let tags_encoded_len = tags.encoded_len_with_length_delimited();
222    if tags_encoded_len > Meta::MAX_SIZE {
223      return Err(Error::tags_too_large(tags_encoded_len));
224    }
225    // update the config
226    self.inner.opts.tags.store(Arc::new(tags));
227
228    // trigger a memberlist update
229    self
230      .inner
231      .memberlist
232      .update_node(self.inner.opts.broadcast_timeout)
233      .await
234      .map_err(From::from)
235  }
236
237  /// Used to broadcast a custom user event with a given
238  /// name and payload. If the configured size limit is exceeded and error will be returned.
239  /// If coalesce is enabled, nodes are allowed to coalesce this event.
240  #[inline]
241  pub async fn user_event(
242    &self,
243    name: impl Into<SmolStr>,
244    payload: impl Into<Bytes>,
245    coalesce: bool,
246  ) -> Result<(), Error<T, D>> {
247    let name: SmolStr = name.into();
248    let payload: Bytes = payload.into();
249    let payload_size_before_encoding = name.len() + payload.len();
250
251    // Check size before encoding to prevent needless encoding and return early if it's over the specified limit.
252    if payload_size_before_encoding > self.inner.opts.max_user_event_size {
253      return Err(Error::user_event_limit_too_large(
254        self.inner.opts.max_user_event_size,
255      ));
256    }
257
258    if payload_size_before_encoding > USER_EVENT_SIZE_LIMIT {
259      return Err(Error::user_event_too_large(USER_EVENT_SIZE_LIMIT));
260    }
261
262    // Create a message
263    let msg = UserEventMessage {
264      ltime: self.inner.event_clock.time(),
265      name: name.clone(),
266      payload,
267      cc: coalesce,
268    };
269
270    // Start broadcasting the event
271    let len = crate::types::encoded_message_len(&msg);
272
273    // Check the size after encoding to be sure again that
274    // we're not attempting to send over the specified size limit.
275    if len > self.inner.opts.max_user_event_size {
276      return Err(Error::raw_user_event_too_large(len));
277    }
278
279    if len > USER_EVENT_SIZE_LIMIT {
280      return Err(Error::raw_user_event_too_large(len));
281    }
282
283    let raw = crate::types::encode_message_to_bytes(&msg)?;
284
285    self.inner.event_clock.increment();
286
287    // Process update locally
288    self.handle_user_event(either::Either::Right(msg)).await;
289
290    self
291      .inner
292      .event_broadcasts
293      .queue_broadcast(SerfBroadcast {
294        msg: raw,
295        notify_tx: None,
296      })
297      .await;
298    Ok(())
299  }
300
301  /// Used to broadcast a new query. The query must be fairly small,
302  /// and an error will be returned if the size limit is exceeded. Query parameters are optional,
303  /// and if not provided, a sane set of defaults will be used.
304  pub async fn query(
305    &self,
306    name: impl Into<SmolStr>,
307    payload: impl Into<Bytes>,
308    params: Option<QueryParam<T::Id>>,
309  ) -> Result<QueryResponse<T::Id, T::ResolvedAddress>, Error<T, D>> {
310    self
311      .query_in(name.into(), payload.into(), params, None)
312      .await
313  }
314
315  /// Joins an existing Serf cluster. Returns the resolved address of node
316  /// successfully contacted. If `ignore_old` is true, then any
317  /// user messages sent prior to the join will be ignored.
318  pub async fn join(
319    &self,
320    node: MaybeResolvedAddress<T::Address, T::ResolvedAddress>,
321    ignore_old: bool,
322  ) -> Result<T::ResolvedAddress, Error<T, D>> {
323    // Do a quick state check
324    let current_state = self.state();
325    if current_state != SerfState::Alive {
326      return Err(Error::bad_join_status(current_state));
327    }
328
329    // Hold the joinLock, this is to make eventJoinIgnore safe
330    let _join_lock = self.inner.join_lock.lock().await;
331
332    // Ignore any events from a potential join. This is safe since we hold
333    // the joinLock and nobody else can be doing a Join
334    if ignore_old {
335      self.inner.event_join_ignore.store(true, Ordering::SeqCst);
336    }
337
338    // Have memberlist attempt to join
339    match self.inner.memberlist.join(node).await {
340      Ok(node) => {
341        // Start broadcasting the update
342        if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
343          if ignore_old {
344            self.inner.event_join_ignore.store(false, Ordering::SeqCst);
345          }
346          return Err(e);
347        }
348        if ignore_old {
349          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
350        }
351
352        Ok(node)
353      }
354      Err(e) => {
355        if ignore_old {
356          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
357        }
358        Err(Error::from(e))
359      }
360    }
361  }
362
363  /// Joins an existing Serf cluster. Returns the resolved address of nodes
364  /// successfully contacted. If `ignore_old` is true, then any
365  /// user messages sent prior to the join will be ignored.
366  pub async fn join_many(
367    &self,
368    existing: impl Iterator<Item = MaybeResolvedAddress<T::Address, T::ResolvedAddress>>,
369    ignore_old: bool,
370  ) -> Result<SmallVec<T::ResolvedAddress>, (SmallVec<T::ResolvedAddress>, Error<T, D>)> {
371    // Do a quick state check
372    let current_state = self.state();
373    if current_state != SerfState::Alive {
374      return Err((SmallVec::new(), Error::bad_join_status(current_state)));
375    }
376
377    // Hold the joinLock, this is to make eventJoinIgnore safe
378    let _join_lock = self.inner.join_lock.lock().await;
379
380    // Ignore any events from a potential join. This is safe since we hold
381    // the joinLock and nobody else can be doing a Join
382    if ignore_old {
383      self.inner.event_join_ignore.store(true, Ordering::SeqCst);
384    }
385
386    // Have memberlist attempt to join
387    match self.inner.memberlist.join_many(existing).await {
388      Ok(joined) => {
389        // Start broadcasting the update
390        if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
391          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
392          return Err((joined, e));
393        }
394        self.inner.event_join_ignore.store(false, Ordering::SeqCst);
395        Ok(joined)
396      }
397      Err((joined, err)) => {
398        // If we joined any nodes, broadcast the join message
399        if !joined.is_empty() {
400          // Start broadcasting the update
401          if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
402            self.inner.event_join_ignore.store(false, Ordering::SeqCst);
403            return Err((
404              joined,
405              Error::Multiple(std::sync::Arc::from_iter([err.into(), e])),
406            ));
407          }
408
409          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
410          Err((joined, Error::from(err)))
411        } else {
412          self.inner.event_join_ignore.store(false, Ordering::SeqCst);
413          Err((joined, Error::from(err)))
414        }
415      }
416    }
417  }
418
419  /// Gracefully exits the cluster. It is safe to call this multiple
420  /// times.
421  /// If the Leave broadcast timeout, Leave() will try to finish the sequence as best effort.
422  pub async fn leave(&self) -> Result<(), Error<T, D>> {
423    // Check the current state
424    {
425      let mut s = self.inner.state.lock();
426      match *s {
427        SerfState::Left => return Ok(()),
428        SerfState::Leaving => return Err(Error::bad_leave_status(*s)),
429        SerfState::Shutdown => return Err(Error::bad_leave_status(*s)),
430        _ => {
431          // Set the state to leaving
432          *s = SerfState::Leaving;
433        }
434      }
435    }
436
437    // If we have a snapshot, mark we are leaving
438    if let Some(ref snap) = self.inner.snapshot {
439      snap.leave().await;
440    }
441
442    // Construct the message for the graceful leave
443    let msg = LeaveMessage {
444      ltime: self.inner.clock.time(),
445      id: self.inner.memberlist.local_id().cheap_clone(),
446      prune: false,
447    };
448
449    self.inner.clock.increment();
450
451    // Process the leave locally
452    self.handle_node_leave_intent(&msg).await;
453
454    // Only broadcast the leave message if there is at least one
455    // other node alive.
456    if self.has_alive_members().await {
457      let (notify_tx, notify_rx) = async_channel::bounded(1);
458      let msg = crate::types::encode_message_to_bytes(&msg)?;
459      self.broadcast(msg, Some(notify_tx)).await?;
460
461      futures::select! {
462        _ = notify_rx.recv().fuse() => {
463          // We got a response, so we are done
464        }
465        _ = <T::Runtime as RuntimeLite>::sleep(self.inner.opts.broadcast_timeout).fuse() => {
466          tracing::warn!("serf: timeout while waiting for graceful leave");
467        }
468      }
469    }
470
471    // Attempt the memberlist leave
472    if let Err(e) = self
473      .inner
474      .memberlist
475      .leave(self.inner.opts.broadcast_timeout)
476      .await
477    {
478      tracing::warn!("serf: timeout waiting for leave broadcast: {}", e);
479    }
480
481    // Wait for the leave to propagate through the cluster. The broadcast
482    // timeout is how long we wait for the message to go out from our own
483    // queue, but this wait is for that message to propagate through the
484    // cluster. In particular, we want to stay up long enough to service
485    // any probes from other nodes before they learn about us leaving.
486    <T::Runtime as RuntimeLite>::sleep(self.inner.opts.leave_propagate_delay).await;
487
488    // Transition to Left only if we not already shutdown
489    {
490      let mut s = self.inner.state.lock();
491      match *s {
492        SerfState::Shutdown => {}
493        _ => {
494          *s = SerfState::Left;
495        }
496      }
497    }
498    Ok(())
499  }
500
501  /// Forcibly removes a failed node from the cluster
502  /// immediately, instead of waiting for the reaper to eventually reclaim it.
503  /// This also has the effect that Serf will no longer attempt to reconnect
504  /// to this node.
505  pub async fn remove_failed_node(&self, id: T::Id) -> Result<(), Error<T, D>> {
506    self.force_leave(id, false).await
507  }
508
509  /// Forcibly removes a failed node from the cluster
510  /// immediately, instead of waiting for the reaper to eventually reclaim it.
511  /// This also has the effect that Serf will no longer attempt to reconnect
512  /// to this node.
513  pub async fn remove_failed_node_prune(&self, id: T::Id) -> Result<(), Error<T, D>> {
514    self.force_leave(id, true).await
515  }
516
517  /// Forcefully shuts down the Serf instance, stopping all network
518  /// activity and background maintenance associated with the instance.
519  ///
520  /// This is not a graceful shutdown, and should be preceded by a call
521  /// to Leave. Otherwise, other nodes in the cluster will detect this node's
522  /// exit as a node failure.
523  ///
524  /// It is safe to call this method multiple times.
525  pub async fn shutdown(&self) -> Result<(), Error<T, D>> {
526    {
527      let mut s = self.inner.state.lock();
528      match *s {
529        SerfState::Shutdown => return Ok(()),
530        SerfState::Left => {}
531        _ => {
532          tracing::warn!("serf: shutdown without a leave");
533        }
534      }
535
536      // Wait to close the shutdown channel until after we've shut down the
537      // memberlist and its associated network resources, since the shutdown
538      // channel signals that we are cleaned up outside of Serf.
539      *s = SerfState::Shutdown;
540    }
541    self.inner.memberlist.shutdown().await?;
542    self.inner.shutdown_tx.close();
543
544    // Wait for the snapshoter to finish if we have one
545    if let Some(ref snap) = self.inner.snapshot {
546      snap.wait().await;
547    }
548
549    loop {
550      if let Ok(mut handles) = self.inner.handles.try_borrow_mut() {
551        let mut futs = core::mem::take(&mut *handles);
552        while futs.next().await.is_some() {}
553        break;
554      }
555    }
556
557    Ok(())
558  }
559
560  /// Returns the network coordinate of the local node.
561  pub fn cooridate(&self) -> Result<Coordinate, Error<T, D>> {
562    if let Some(ref coord) = self.inner.coord_core {
563      return Ok(coord.client.get_coordinate());
564    }
565
566    Err(Error::coordinates_disabled())
567  }
568
569  /// Returns the network coordinate for the node with the given
570  /// name. This will only be valid if `disable_coordinates` is set to `false`.
571  pub fn cached_coordinate(&self, id: &T::Id) -> Result<Option<Coordinate>, Error<T, D>> {
572    if let Some(ref coord) = self.inner.coord_core {
573      return Ok(coord.cache.read().get(id).cloned());
574    }
575
576    Err(Error::coordinates_disabled())
577  }
578
579  /// Returns the underlying [`Memberlist`] instance
580  #[inline]
581  pub fn memberlist(&self) -> &Memberlist<T, SerfDelegate<T, D>> {
582    &self.inner.memberlist
583  }
584}
585
586#[viewit::viewit(vis_all = "", getters(vis_all = "pub", prefix = "get"), setters(skip))]
587#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
588pub struct Stats {
589  members: usize,
590  failed: usize,
591  left: usize,
592  health_score: usize,
593  member_time: u64,
594  event_time: u64,
595  query_time: u64,
596  intent_queue: usize,
597  event_queue: usize,
598  query_queue: usize,
599  encrypted: bool,
600  #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
601  coordinate_resets: Option<usize>,
602}