serf_core/
options.rs

1use std::{path::PathBuf, sync::Arc, time::Duration};
2
3use arc_swap::ArcSwap;
4pub use memberlist_core::Options as MemberlistOptions;
5use smol_str::SmolStr;
6
7use super::types::{DelegateVersion, ProtocolVersion, Tags};
8
9fn tags(tags: &Arc<ArcSwap<Tags>>) -> Arc<Tags> {
10  tags.load().clone()
11}
12
13/// The configuration for creating a Serf instance.
14#[viewit::viewit(getters(vis_all = "pub"), setters(vis_all = "pub", prefix = "with"))]
15#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
16pub struct Options {
17  /// The tags for this role, if any. This is used to provide arbitrary
18  /// key/value metadata per-node. For example, a "role" tag may be used to
19  /// differentiate "load-balancer" from a "web" role as parts of the same cluster.
20  /// Tags are deprecating 'Role', and instead it acts as a special key in this
21  /// map.
22  #[viewit(
23    vis = "pub(crate)",
24    getter(
25      vis = "pub",
26      style = "ref",
27      result(converter(style = "ref", fn = "tags",), type = "Arc<Tags>",),
28      attrs(
29        doc = "Returns the tags for this role, if any. This is used to provide arbitrary key/value metadata per-node. For example, a \"role\" tag may be used to differentiate \"load-balancer\" from a \"web\" role as parts of the same cluster."
30      )
31    ),
32    setter(skip)
33  )]
34  #[cfg_attr(feature = "serde", serde(with = "tags_serde"))]
35  tags: Arc<ArcSwap<Tags>>,
36
37  /// The protocol version to speak
38  #[viewit(
39    getter(const, attrs(doc = "Returns the protocol version to speak")),
40    setter(attrs(doc = "Sets the protocol version to speak"))
41  )]
42  protocol_version: ProtocolVersion,
43
44  /// The delegate version to speak
45  #[viewit(
46    getter(const, attrs(doc = "Returns the delegate version to speak")),
47    setter(attrs(doc = "Sets the delegate version to speak"))
48  )]
49  delegate_version: DelegateVersion,
50
51  /// The amount of time to wait for a broadcast
52  /// message to be sent to the cluster. Broadcast messages are used for
53  /// things like leave messages and force remove messages. If this is not
54  /// set, a timeout of 5 seconds will be set.
55  #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
56  #[viewit(
57    getter(
58      const,
59      attrs(
60        doc = "Returns the amount of time to wait for a broadcast message to be sent to the cluster."
61      )
62    ),
63    setter(attrs(
64      doc = "Sets the amount of time to wait for a broadcast message to be sent to the cluster."
65    ))
66  )]
67  broadcast_timeout: Duration,
68
69  /// For our leave (node dead) message to propagate
70  /// through the cluster. In particular, we want to stay up long enough to
71  /// service any probes from other nodes before they learn about us
72  /// leaving and stop probing. Otherwise, we risk getting node failures as
73  /// we leave.
74  #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
75  #[viewit(
76    getter(const, attrs(doc = "Returns the leave propagate delay.")),
77    setter(attrs(doc = "Sets the leave propagate delay."))
78  )]
79  leave_propagate_delay: Duration,
80
81  /// The settings below relate to Serf's event coalescence feature. Serf
82  /// is able to coalesce multiple events into single events in order to
83  /// reduce the amount of noise that is sent along the event channel. For example
84  /// if five nodes quickly join, the event channel will be sent one EventMemberJoin
85  /// containing the five nodes rather than five individual EventMemberJoin
86  /// events. Coalescence can mitigate potential flapping behavior.
87  ///
88  /// Coalescence is disabled by default and can be enabled by setting
89  /// `coalesce_period`.
90  ///
91  /// `coalesce_period` specifies the time duration to coalesce events.
92  /// For example, if this is set to 5 seconds, then all events received
93  /// within 5 seconds that can be coalesced will be.
94  ///
95  #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
96  #[viewit(
97    getter(const, attrs(doc = "Returns the coalesce period.")),
98    setter(attrs(doc = "Sets the coalesce period."))
99  )]
100  coalesce_period: Duration,
101
102  /// specifies the duration of time where if no events
103  /// are received, coalescence immediately happens. For example, if
104  /// `coalesce_period` is set to 10 seconds but `quiescent_period` is set to 2
105  /// seconds, then the events will be coalesced and dispatched if no
106  /// new events are received within 2 seconds of the last event. Otherwise,
107  /// every event will always be delayed by at least 10 seconds.
108  #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
109  #[viewit(
110    getter(
111      const,
112      attrs(
113        doc = "Returns the specifies the duration of time where if no events are received, coalescence immediately happens."
114      )
115    ),
116    setter(attrs(
117      doc = "Sets specifies the duration of time where if no events are received, coalescence immediately happens."
118    ))
119  )]
120  quiescent_period: Duration,
121
122  /// The settings below relate to Serf's user event coalescing feature.
123  /// The settings operate like above but only affect user messages and
124  /// not the Member* messages that Serf generates.
125  #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
126  #[viewit(
127    getter(const, attrs(doc = "Returns the user event coalesce period.")),
128    setter(attrs(doc = "Sets the user event coalesce period."))
129  )]
130  user_coalesce_period: Duration,
131
132  /// The settings below relate to Serf's user event coalescing feature.
133  /// The settings operate like above but only affect user messages and
134  /// not the Member* messages that Serf generates.
135  #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
136  #[viewit(
137    getter(const, attrs(doc = "Returns the user quiescent period.")),
138    setter(attrs(doc = "Sets the user quiescent period."))
139  )]
140  user_quiescent_period: Duration,
141
142  /// The interval when the reaper runs. If this is not
143  /// set (it is zero), it will be set to a reasonable default.
144  #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
145  #[viewit(
146    getter(const, attrs(doc = "Returns the interval when the reaper runs.")),
147    setter(attrs(doc = "Sets the interval when the reaper runs."))
148  )]
149  reap_interval: Duration,
150
151  /// The interval when we attempt to reconnect
152  /// to failed nodes. If this is not set (it is zero), it will be set
153  /// to a reasonable default.
154  #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
155  #[viewit(
156    getter(
157      const,
158      attrs(doc = "Returns the interval when we attempt to reconnect to failed nodes.")
159    ),
160    setter(attrs(doc = "Sets the interval when we attempt to reconnect to failed nodes."))
161  )]
162  reconnect_interval: Duration,
163
164  /// The amount of time to attempt to reconnect to
165  /// a failed node before giving up and considering it completely gone.
166  #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
167  #[viewit(
168    getter(
169      const,
170      attrs(
171        doc = "Returns the amount of time to attempt to reconnect to a failed node before giving up and considering it completely gone."
172      )
173    ),
174    setter(attrs(
175      doc = "Sets the amount of time to attempt to reconnect to a failed node before giving up and considering it completely gone."
176    ))
177  )]
178  reconnect_timeout: Duration,
179
180  /// The amount of time to keep around nodes
181  /// that gracefully left as tombstones for syncing state with other
182  /// Serf nodes.
183  #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
184  #[viewit(
185    getter(
186      const,
187      attrs(
188        doc = "Returns the amount of time to keep around nodes that gracefully left as tombstones for syncing state with other Serf nodes."
189      )
190    ),
191    setter(attrs(
192      doc = "Sets the amount of time to keep around nodes that gracefully left as tombstones for syncing state with other Serf nodes."
193    ))
194  )]
195  tombstone_timeout: Duration,
196
197  /// The amount of time less than which we consider a node
198  /// being failed and rejoining looks like a flap for telemetry purposes.
199  /// This should be set less than a typical reboot time, but large enough
200  /// to see actual events, given our expected detection times for a failed
201  /// node.
202  #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
203  #[viewit(
204    getter(
205      const,
206      attrs(
207        doc = "Returns the amount of time less than which we consider a node being failed and rejoining looks like a flap for telemetry purposes."
208      )
209    ),
210    setter(attrs(
211      doc = "Sets the amount of time less than which we consider a node being failed and rejoining looks like a flap for telemetry purposes."
212    ))
213  )]
214  flap_timeout: Duration,
215
216  /// The interval at which we check the message
217  /// queue to apply the warning and max depth.
218  #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
219  #[viewit(
220    getter(
221      const,
222      attrs(
223        doc = "Returns the interval at which we check the message queue to apply the warning and max depth."
224      )
225    ),
226    setter(attrs(
227      doc = "Sets the interval at which we check the message queue to apply the warning and max depth."
228    ))
229  )]
230  queue_check_interval: Duration,
231
232  /// Used to generate warning message if the
233  /// number of queued messages to broadcast exceeds this number. This
234  /// is to provide the user feedback if events are being triggered
235  /// faster than they can be disseminated
236  #[viewit(
237    getter(const, attrs(doc = "Returns the queue depth warning.")),
238    setter(attrs(doc = "Sets the queue depth warning."))
239  )]
240  queue_depth_warning: usize,
241
242  /// Used to start dropping messages if the number
243  /// of queued messages to broadcast exceeds this number. This is to
244  /// prevent an unbounded growth of memory utilization
245  #[viewit(
246    getter(const, attrs(doc = "Returns the max queue depth.")),
247    setter(attrs(doc = "Sets the max queue depth."))
248  )]
249  max_queue_depth: usize,
250
251  /// if >0 will enforce a lower limit for dropping messages
252  /// and then the max will be max(MinQueueDepth, 2*SizeOfCluster). This
253  /// defaults to 0 which disables this dynamic sizing feature. If this is
254  /// >0 then `max_queue_depth` will be ignored.
255  #[viewit(
256    getter(
257      const,
258      attrs(
259        doc = "Returns if `>0` will enforce a lower limit for dropping messages and then the max will be `max(min_queue_depth, 2 * size_of_cluster)`. This defaults to 0 which disables this dynamic sizing feature. If this is `>0` then `max_queue_depth` will be ignored."
260      )
261    ),
262    setter(attrs(
263      doc = "Sets if `>0` will enforce a lower limit for dropping messages and then the max will be `max(min_queue_depth, 2 * size_of_cluster)`. This defaults to 0 which disables this dynamic sizing feature. If this is `>0` then `max_queue_depth` will be ignored."
264    ))
265  )]
266  min_queue_depth: usize,
267
268  /// Used to determine how long we store recent
269  /// join and leave intents. This is used to guard against the case where
270  /// Serf broadcasts an intent that arrives before the Memberlist event.
271  /// It is important that this not be too short to avoid continuous
272  /// rebroadcasting of dead events.
273  #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
274  #[viewit(
275    getter(
276      const,
277      attrs(doc = "Returns how long we store recent join and leave intents.")
278    ),
279    setter(attrs(doc = "Sets how long we store recent join and leave intents."))
280  )]
281  recent_intent_timeout: Duration,
282
283  /// Used to control how many events are buffered.
284  /// This is used to prevent re-delivery of events to a client. The buffer
285  /// must be large enough to handle all "recent" events, since Serf will
286  /// not deliver messages that are older than the oldest entry in the buffer.
287  /// Thus if a client is generating too many events, it's possible that the
288  /// buffer gets overrun and messages are not delivered.
289  #[viewit(
290    getter(const, attrs(doc = "Returns how many events are buffered.")),
291    setter(attrs(doc = "Sets how many events are buffered."))
292  )]
293  event_buffer_size: usize,
294
295  /// used to control how many queries are buffered.
296  /// This is used to prevent re-delivery of queries to a client. The buffer
297  /// must be large enough to handle all "recent" events, since Serf will not
298  /// deliver queries older than the oldest entry in the buffer.
299  /// Thus if a client is generating too many queries, it's possible that the
300  /// buffer gets overrun and messages are not delivered.
301  #[viewit(
302    getter(const, attrs(doc = "Returns how many queries are buffered.")),
303    setter(attrs(doc = "Sets how many queries are buffered."))
304  )]
305  query_buffer_size: usize,
306
307  /// Configures the default timeout multipler for a query to run if no
308  /// specific value is provided. Queries are real-time by nature, where the
309  /// reply is time sensitive. As a result, results are collected in an async
310  /// fashion, however the query must have a bounded duration. We want the timeout
311  /// to be long enough that all nodes have time to receive the message, run a handler,
312  /// and generate a reply. Once the timeout is exceeded, any further replies are ignored.
313  /// The default value is
314  ///
315  /// ```text
316  /// timeout = gossip_interval * query_timeout_mult * log(N+1)
317  /// ```
318  #[viewit(
319    getter(
320      const,
321      attrs(
322        doc = "Returns the default timeout multipler for a query to run if no specific value is provided."
323      )
324    ),
325    setter(attrs(
326      doc = "Sets the default timeout multipler for a query to run if no specific value is provided."
327    ))
328  )]
329  query_timeout_mult: usize,
330
331  /// Limit the outbound payload sizes for queries, respectively. These must fit
332  /// in a UDP packet with some additional overhead, so tuning these
333  /// past the default values of 1024 will depend on your network
334  /// configuration.
335  #[viewit(
336    getter(
337      const,
338      attrs(doc = "Returns the limit of the outbound payload sizes for queries.")
339    ),
340    setter(attrs(doc = "Sets the limit of the outbound payload sizes for queries."))
341  )]
342  query_response_size_limit: usize,
343
344  /// Limit the inbound payload sizes for queries, respectively. These must fit
345  /// in a UDP packet with some additional overhead, so tuning these
346  /// past the default values of 1024 will depend on your network
347  /// configuration.
348  #[viewit(
349    getter(
350      const,
351      attrs(doc = "Returns the limit of the inbound payload sizes for queries.")
352    ),
353    setter(attrs(doc = "Sets the limit of the inbound payload sizes for queries."))
354  )]
355  query_size_limit: usize,
356
357  /// The memberlist configuration that Serf will
358  /// use to do the underlying membership management and gossip.
359  #[viewit(
360    getter(
361      const,
362      style = "ref",
363      attrs(
364        doc = "Returns the memberlist configuration that Serf will use to do the underlying membership management and gossip."
365      )
366    ),
367    setter(attrs(
368      doc = "Sets the memberlist configuration that Serf will use to do the underlying membership management and gossip."
369    ))
370  )]
371  memberlist_options: MemberlistOptions,
372
373  /// If provided is used to snapshot live nodes as well
374  /// as lamport clock values. When Serf is started with a snapshot,
375  /// it will attempt to join all the previously known nodes until one
376  /// succeeds and will also avoid replaying old user events.
377  #[viewit(
378    getter(
379      const,
380      style = "ref",
381      result(converter(fn = "Option::as_ref"), type = "Option<&PathBuf>"),
382      attrs(doc = "Returns the path to the snapshot file.")
383    ),
384    setter(attrs(doc = "Sets the path to the snapshot file."))
385  )]
386  snapshot_path: Option<PathBuf>,
387
388  /// Controls our interaction with the snapshot file.
389  /// When set to false (default), a leave causes a Serf to not rejoin
390  /// the cluster until an explicit join is received. If this is set to
391  /// true, we ignore the leave, and rejoin the cluster on start.
392  #[viewit(
393    getter(
394      const,
395      attrs(doc = "Returns if Serf will rejoin the cluster after a leave.")
396    ),
397    setter(attrs(doc = "Sets if Serf will rejoin the cluster after a leave."))
398  )]
399  rejoin_after_leave: bool,
400
401  /// Controls if Serf will actively attempt
402  /// to resolve a name conflict. Since each Serf member must have a unique
403  /// name, a cluster can run into issues if multiple nodes claim the same
404  /// name. Without automatic resolution, Serf merely logs some warnings, but
405  /// otherwise does not take any action. Automatic resolution detects the
406  /// conflict and issues a special query which asks the cluster for the
407  /// Name -> IP:Port mapping. If there is a simple majority of votes, that
408  /// node stays while the other node will leave the cluster and exit.
409  #[viewit(
410    getter(
411      const,
412      attrs(doc = "Returns if Serf will attempt to resolve a name conflict.")
413    ),
414    setter(attrs(doc = "Sets if Serf will attempt to resolve a name conflict."))
415  )]
416  enable_id_conflict_resolution: bool,
417
418  /// Controls if Serf will maintain an estimate of this
419  /// node's network coordinate internally. A network coordinate is useful
420  /// for estimating the network distance (i.e. round trip time) between
421  /// two nodes. Enabling this option adds some overhead to ping messages.
422  #[viewit(
423    getter(
424      const,
425      attrs(
426        doc = "Returns if Serf will maintain an estimate of this node's network coordinate internally."
427      )
428    ),
429    setter(attrs(
430      doc = "Sets if Serf will maintain an estimate of this node's network coordinate internally."
431    ))
432  )]
433  disable_coordinates: bool,
434
435  /// Provides the location of a writable file where Serf can
436  /// persist changes to the encryption keyring.
437  #[cfg(feature = "encryption")]
438  #[viewit(
439    getter(
440      const,
441      style = "ref",
442      result(converter(fn = "Option::as_ref"), type = "Option<&PathBuf>"),
443      attrs(
444        doc = "Returns the location of a writable file where Serf can persist changes to the encryption keyring.",
445        cfg(feature = "encryption")
446      )
447    ),
448    setter(attrs(
449      doc = "Sets the location of a writable file where Serf can persist changes to the encryption keyring.",
450      cfg(feature = "encryption")
451    ))
452  )]
453  keyring_file: Option<PathBuf>,
454
455  /// Maximum byte size limit of user event `name` + `payload` in bytes.
456  /// It's optimal to be relatively small, since it's going to be gossiped through the cluster.
457  #[viewit(
458    getter(
459      const,
460      attrs(
461        doc = "Returns the maximum byte size limit of user event `name` + `payload` in bytes."
462      )
463    ),
464    setter(attrs(
465      doc = "Sets the maximum byte size limit of user event `name` + `payload` in bytes."
466    ))
467  )]
468  max_user_event_size: usize,
469}
470
471impl Default for Options {
472  #[inline]
473  fn default() -> Self {
474    Self::new()
475  }
476}
477
478impl Clone for Options {
479  #[inline]
480  fn clone(&self) -> Self {
481    Self {
482      memberlist_options: self.memberlist_options.clone(),
483      #[cfg(feature = "encryption")]
484      keyring_file: self.keyring_file.clone(),
485      snapshot_path: self.snapshot_path.clone(),
486      tags: self.tags.clone(),
487      ..*self
488    }
489  }
490}
491
492impl Options {
493  /// Returns a new instance of `Options` with default configurations.
494  #[inline]
495  pub fn new() -> Self {
496    Self {
497      tags: Arc::new(ArcSwap::from_pointee(Tags::default())),
498      protocol_version: ProtocolVersion::V1,
499      delegate_version: DelegateVersion::V1,
500      broadcast_timeout: Duration::from_secs(5),
501      leave_propagate_delay: Duration::from_secs(1),
502      coalesce_period: Duration::ZERO,
503      quiescent_period: Duration::ZERO,
504      user_coalesce_period: Duration::ZERO,
505      user_quiescent_period: Duration::ZERO,
506      reap_interval: Duration::from_secs(15),
507      reconnect_interval: Duration::from_secs(30),
508      reconnect_timeout: Duration::from_secs(3600 * 24),
509      tombstone_timeout: Duration::from_secs(3600 * 24),
510      flap_timeout: Duration::from_secs(60),
511      queue_check_interval: Duration::from_secs(30),
512      queue_depth_warning: 128,
513      max_queue_depth: 4096,
514      min_queue_depth: 0,
515      recent_intent_timeout: Duration::from_secs(60 * 5),
516      event_buffer_size: 512,
517      query_buffer_size: 512,
518      query_timeout_mult: 16,
519      query_response_size_limit: 1024,
520      query_size_limit: 1024,
521      memberlist_options: MemberlistOptions::lan(),
522      snapshot_path: None,
523      rejoin_after_leave: false,
524      enable_id_conflict_resolution: true,
525      disable_coordinates: false,
526      #[cfg(feature = "encryption")]
527      keyring_file: None,
528      max_user_event_size: 512,
529    }
530  }
531
532  /// Sets the tags for this node.
533  #[inline]
534  pub fn with_tags<K: Into<SmolStr>, V: Into<SmolStr>>(
535    self,
536    tags: impl Iterator<Item = (K, V)>,
537  ) -> Self {
538    self
539      .tags
540      .store(Arc::new(tags.map(|(k, v)| (k.into(), v.into())).collect()));
541    self
542  }
543
544  #[inline]
545  pub(crate) fn queue_opts(&self) -> QueueOptions {
546    QueueOptions {
547      max_queue_depth: self.max_queue_depth,
548      min_queue_depth: self.min_queue_depth,
549      check_interval: self.queue_check_interval,
550      depth_warning: self.queue_depth_warning,
551      #[cfg(feature = "metrics")]
552      metric_labels: self.memberlist_options.metric_labels().clone(),
553    }
554  }
555}
556
557#[derive(Debug, Clone, PartialEq, Eq, Hash)]
558pub(crate) struct QueueOptions {
559  pub(crate) max_queue_depth: usize,
560  pub(crate) min_queue_depth: usize,
561  pub(crate) check_interval: Duration,
562  pub(crate) depth_warning: usize,
563  #[cfg(feature = "metrics")]
564  pub(crate) metric_labels: Arc<memberlist_core::proto::MetricLabels>,
565}
566
567#[cfg(feature = "serde")]
568mod tags_serde {
569  use std::sync::Arc;
570
571  use arc_swap::ArcSwap;
572  use serde::{Deserialize, Deserializer, Serialize, Serializer};
573
574  use crate::types::Tags;
575
576  pub fn serialize<S>(tags: &Arc<ArcSwap<Tags>>, serializer: S) -> Result<S::Ok, S::Error>
577  where
578    S: Serializer,
579  {
580    let tags = tags.load();
581    Tags::serialize(&**tags, serializer)
582  }
583
584  pub fn deserialize<'de, D>(deserializer: D) -> Result<Arc<ArcSwap<Tags>>, D::Error>
585  where
586    D: Deserializer<'de>,
587  {
588    Tags::deserialize(deserializer).map(|map| Arc::new(ArcSwap::from_pointee(map)))
589  }
590}