1use std::{path::PathBuf, sync::Arc, time::Duration};
2
3use arc_swap::ArcSwap;
4pub use memberlist_core::Options as MemberlistOptions;
5use smol_str::SmolStr;
6
7use super::types::{DelegateVersion, ProtocolVersion, Tags};
8
9fn tags(tags: &Arc<ArcSwap<Tags>>) -> Arc<Tags> {
10 tags.load().clone()
11}
12
13#[viewit::viewit(getters(vis_all = "pub"), setters(vis_all = "pub", prefix = "with"))]
15#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
16pub struct Options {
17 #[viewit(
23 vis = "pub(crate)",
24 getter(
25 vis = "pub",
26 style = "ref",
27 result(converter(style = "ref", fn = "tags",), type = "Arc<Tags>",),
28 attrs(
29 doc = "Returns the tags for this role, if any. This is used to provide arbitrary key/value metadata per-node. For example, a \"role\" tag may be used to differentiate \"load-balancer\" from a \"web\" role as parts of the same cluster."
30 )
31 ),
32 setter(skip)
33 )]
34 #[cfg_attr(feature = "serde", serde(with = "tags_serde"))]
35 tags: Arc<ArcSwap<Tags>>,
36
37 #[viewit(
39 getter(const, attrs(doc = "Returns the protocol version to speak")),
40 setter(attrs(doc = "Sets the protocol version to speak"))
41 )]
42 protocol_version: ProtocolVersion,
43
44 #[viewit(
46 getter(const, attrs(doc = "Returns the delegate version to speak")),
47 setter(attrs(doc = "Sets the delegate version to speak"))
48 )]
49 delegate_version: DelegateVersion,
50
51 #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
56 #[viewit(
57 getter(
58 const,
59 attrs(
60 doc = "Returns the amount of time to wait for a broadcast message to be sent to the cluster."
61 )
62 ),
63 setter(attrs(
64 doc = "Sets the amount of time to wait for a broadcast message to be sent to the cluster."
65 ))
66 )]
67 broadcast_timeout: Duration,
68
69 #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
75 #[viewit(
76 getter(const, attrs(doc = "Returns the leave propagate delay.")),
77 setter(attrs(doc = "Sets the leave propagate delay."))
78 )]
79 leave_propagate_delay: Duration,
80
81 #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
96 #[viewit(
97 getter(const, attrs(doc = "Returns the coalesce period.")),
98 setter(attrs(doc = "Sets the coalesce period."))
99 )]
100 coalesce_period: Duration,
101
102 #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
109 #[viewit(
110 getter(
111 const,
112 attrs(
113 doc = "Returns the specifies the duration of time where if no events are received, coalescence immediately happens."
114 )
115 ),
116 setter(attrs(
117 doc = "Sets specifies the duration of time where if no events are received, coalescence immediately happens."
118 ))
119 )]
120 quiescent_period: Duration,
121
122 #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
126 #[viewit(
127 getter(const, attrs(doc = "Returns the user event coalesce period.")),
128 setter(attrs(doc = "Sets the user event coalesce period."))
129 )]
130 user_coalesce_period: Duration,
131
132 #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
136 #[viewit(
137 getter(const, attrs(doc = "Returns the user quiescent period.")),
138 setter(attrs(doc = "Sets the user quiescent period."))
139 )]
140 user_quiescent_period: Duration,
141
142 #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
145 #[viewit(
146 getter(const, attrs(doc = "Returns the interval when the reaper runs.")),
147 setter(attrs(doc = "Sets the interval when the reaper runs."))
148 )]
149 reap_interval: Duration,
150
151 #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
155 #[viewit(
156 getter(
157 const,
158 attrs(doc = "Returns the interval when we attempt to reconnect to failed nodes.")
159 ),
160 setter(attrs(doc = "Sets the interval when we attempt to reconnect to failed nodes."))
161 )]
162 reconnect_interval: Duration,
163
164 #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
167 #[viewit(
168 getter(
169 const,
170 attrs(
171 doc = "Returns the amount of time to attempt to reconnect to a failed node before giving up and considering it completely gone."
172 )
173 ),
174 setter(attrs(
175 doc = "Sets the amount of time to attempt to reconnect to a failed node before giving up and considering it completely gone."
176 ))
177 )]
178 reconnect_timeout: Duration,
179
180 #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
184 #[viewit(
185 getter(
186 const,
187 attrs(
188 doc = "Returns the amount of time to keep around nodes that gracefully left as tombstones for syncing state with other Serf nodes."
189 )
190 ),
191 setter(attrs(
192 doc = "Sets the amount of time to keep around nodes that gracefully left as tombstones for syncing state with other Serf nodes."
193 ))
194 )]
195 tombstone_timeout: Duration,
196
197 #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
203 #[viewit(
204 getter(
205 const,
206 attrs(
207 doc = "Returns the amount of time less than which we consider a node being failed and rejoining looks like a flap for telemetry purposes."
208 )
209 ),
210 setter(attrs(
211 doc = "Sets the amount of time less than which we consider a node being failed and rejoining looks like a flap for telemetry purposes."
212 ))
213 )]
214 flap_timeout: Duration,
215
216 #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
219 #[viewit(
220 getter(
221 const,
222 attrs(
223 doc = "Returns the interval at which we check the message queue to apply the warning and max depth."
224 )
225 ),
226 setter(attrs(
227 doc = "Sets the interval at which we check the message queue to apply the warning and max depth."
228 ))
229 )]
230 queue_check_interval: Duration,
231
232 #[viewit(
237 getter(const, attrs(doc = "Returns the queue depth warning.")),
238 setter(attrs(doc = "Sets the queue depth warning."))
239 )]
240 queue_depth_warning: usize,
241
242 #[viewit(
246 getter(const, attrs(doc = "Returns the max queue depth.")),
247 setter(attrs(doc = "Sets the max queue depth."))
248 )]
249 max_queue_depth: usize,
250
251 #[viewit(
256 getter(
257 const,
258 attrs(
259 doc = "Returns if `>0` will enforce a lower limit for dropping messages and then the max will be `max(min_queue_depth, 2 * size_of_cluster)`. This defaults to 0 which disables this dynamic sizing feature. If this is `>0` then `max_queue_depth` will be ignored."
260 )
261 ),
262 setter(attrs(
263 doc = "Sets if `>0` will enforce a lower limit for dropping messages and then the max will be `max(min_queue_depth, 2 * size_of_cluster)`. This defaults to 0 which disables this dynamic sizing feature. If this is `>0` then `max_queue_depth` will be ignored."
264 ))
265 )]
266 min_queue_depth: usize,
267
268 #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
274 #[viewit(
275 getter(
276 const,
277 attrs(doc = "Returns how long we store recent join and leave intents.")
278 ),
279 setter(attrs(doc = "Sets how long we store recent join and leave intents."))
280 )]
281 recent_intent_timeout: Duration,
282
283 #[viewit(
290 getter(const, attrs(doc = "Returns how many events are buffered.")),
291 setter(attrs(doc = "Sets how many events are buffered."))
292 )]
293 event_buffer_size: usize,
294
295 #[viewit(
302 getter(const, attrs(doc = "Returns how many queries are buffered.")),
303 setter(attrs(doc = "Sets how many queries are buffered."))
304 )]
305 query_buffer_size: usize,
306
307 #[viewit(
319 getter(
320 const,
321 attrs(
322 doc = "Returns the default timeout multipler for a query to run if no specific value is provided."
323 )
324 ),
325 setter(attrs(
326 doc = "Sets the default timeout multipler for a query to run if no specific value is provided."
327 ))
328 )]
329 query_timeout_mult: usize,
330
331 #[viewit(
336 getter(
337 const,
338 attrs(doc = "Returns the limit of the outbound payload sizes for queries.")
339 ),
340 setter(attrs(doc = "Sets the limit of the outbound payload sizes for queries."))
341 )]
342 query_response_size_limit: usize,
343
344 #[viewit(
349 getter(
350 const,
351 attrs(doc = "Returns the limit of the inbound payload sizes for queries.")
352 ),
353 setter(attrs(doc = "Sets the limit of the inbound payload sizes for queries."))
354 )]
355 query_size_limit: usize,
356
357 #[viewit(
360 getter(
361 const,
362 style = "ref",
363 attrs(
364 doc = "Returns the memberlist configuration that Serf will use to do the underlying membership management and gossip."
365 )
366 ),
367 setter(attrs(
368 doc = "Sets the memberlist configuration that Serf will use to do the underlying membership management and gossip."
369 ))
370 )]
371 memberlist_options: MemberlistOptions,
372
373 #[viewit(
378 getter(
379 const,
380 style = "ref",
381 result(converter(fn = "Option::as_ref"), type = "Option<&PathBuf>"),
382 attrs(doc = "Returns the path to the snapshot file.")
383 ),
384 setter(attrs(doc = "Sets the path to the snapshot file."))
385 )]
386 snapshot_path: Option<PathBuf>,
387
388 #[viewit(
393 getter(
394 const,
395 attrs(doc = "Returns if Serf will rejoin the cluster after a leave.")
396 ),
397 setter(attrs(doc = "Sets if Serf will rejoin the cluster after a leave."))
398 )]
399 rejoin_after_leave: bool,
400
401 #[viewit(
410 getter(
411 const,
412 attrs(doc = "Returns if Serf will attempt to resolve a name conflict.")
413 ),
414 setter(attrs(doc = "Sets if Serf will attempt to resolve a name conflict."))
415 )]
416 enable_id_conflict_resolution: bool,
417
418 #[viewit(
423 getter(
424 const,
425 attrs(
426 doc = "Returns if Serf will maintain an estimate of this node's network coordinate internally."
427 )
428 ),
429 setter(attrs(
430 doc = "Sets if Serf will maintain an estimate of this node's network coordinate internally."
431 ))
432 )]
433 disable_coordinates: bool,
434
435 #[cfg(feature = "encryption")]
438 #[viewit(
439 getter(
440 const,
441 style = "ref",
442 result(converter(fn = "Option::as_ref"), type = "Option<&PathBuf>"),
443 attrs(
444 doc = "Returns the location of a writable file where Serf can persist changes to the encryption keyring.",
445 cfg(feature = "encryption")
446 )
447 ),
448 setter(attrs(
449 doc = "Sets the location of a writable file where Serf can persist changes to the encryption keyring.",
450 cfg(feature = "encryption")
451 ))
452 )]
453 keyring_file: Option<PathBuf>,
454
455 #[viewit(
458 getter(
459 const,
460 attrs(
461 doc = "Returns the maximum byte size limit of user event `name` + `payload` in bytes."
462 )
463 ),
464 setter(attrs(
465 doc = "Sets the maximum byte size limit of user event `name` + `payload` in bytes."
466 ))
467 )]
468 max_user_event_size: usize,
469}
470
471impl Default for Options {
472 #[inline]
473 fn default() -> Self {
474 Self::new()
475 }
476}
477
478impl Clone for Options {
479 #[inline]
480 fn clone(&self) -> Self {
481 Self {
482 memberlist_options: self.memberlist_options.clone(),
483 #[cfg(feature = "encryption")]
484 keyring_file: self.keyring_file.clone(),
485 snapshot_path: self.snapshot_path.clone(),
486 tags: self.tags.clone(),
487 ..*self
488 }
489 }
490}
491
492impl Options {
493 #[inline]
495 pub fn new() -> Self {
496 Self {
497 tags: Arc::new(ArcSwap::from_pointee(Tags::default())),
498 protocol_version: ProtocolVersion::V1,
499 delegate_version: DelegateVersion::V1,
500 broadcast_timeout: Duration::from_secs(5),
501 leave_propagate_delay: Duration::from_secs(1),
502 coalesce_period: Duration::ZERO,
503 quiescent_period: Duration::ZERO,
504 user_coalesce_period: Duration::ZERO,
505 user_quiescent_period: Duration::ZERO,
506 reap_interval: Duration::from_secs(15),
507 reconnect_interval: Duration::from_secs(30),
508 reconnect_timeout: Duration::from_secs(3600 * 24),
509 tombstone_timeout: Duration::from_secs(3600 * 24),
510 flap_timeout: Duration::from_secs(60),
511 queue_check_interval: Duration::from_secs(30),
512 queue_depth_warning: 128,
513 max_queue_depth: 4096,
514 min_queue_depth: 0,
515 recent_intent_timeout: Duration::from_secs(60 * 5),
516 event_buffer_size: 512,
517 query_buffer_size: 512,
518 query_timeout_mult: 16,
519 query_response_size_limit: 1024,
520 query_size_limit: 1024,
521 memberlist_options: MemberlistOptions::lan(),
522 snapshot_path: None,
523 rejoin_after_leave: false,
524 enable_id_conflict_resolution: true,
525 disable_coordinates: false,
526 #[cfg(feature = "encryption")]
527 keyring_file: None,
528 max_user_event_size: 512,
529 }
530 }
531
532 #[inline]
534 pub fn with_tags<K: Into<SmolStr>, V: Into<SmolStr>>(
535 self,
536 tags: impl Iterator<Item = (K, V)>,
537 ) -> Self {
538 self
539 .tags
540 .store(Arc::new(tags.map(|(k, v)| (k.into(), v.into())).collect()));
541 self
542 }
543
544 #[inline]
545 pub(crate) fn queue_opts(&self) -> QueueOptions {
546 QueueOptions {
547 max_queue_depth: self.max_queue_depth,
548 min_queue_depth: self.min_queue_depth,
549 check_interval: self.queue_check_interval,
550 depth_warning: self.queue_depth_warning,
551 #[cfg(feature = "metrics")]
552 metric_labels: self.memberlist_options.metric_labels().clone(),
553 }
554 }
555}
556
557#[derive(Debug, Clone, PartialEq, Eq, Hash)]
558pub(crate) struct QueueOptions {
559 pub(crate) max_queue_depth: usize,
560 pub(crate) min_queue_depth: usize,
561 pub(crate) check_interval: Duration,
562 pub(crate) depth_warning: usize,
563 #[cfg(feature = "metrics")]
564 pub(crate) metric_labels: Arc<memberlist_core::proto::MetricLabels>,
565}
566
567#[cfg(feature = "serde")]
568mod tags_serde {
569 use std::sync::Arc;
570
571 use arc_swap::ArcSwap;
572 use serde::{Deserialize, Deserializer, Serialize, Serializer};
573
574 use crate::types::Tags;
575
576 pub fn serialize<S>(tags: &Arc<ArcSwap<Tags>>, serializer: S) -> Result<S::Ok, S::Error>
577 where
578 S: Serializer,
579 {
580 let tags = tags.load();
581 Tags::serialize(&**tags, serializer)
582 }
583
584 pub fn deserialize<'de, D>(deserializer: D) -> Result<Arc<ArcSwap<Tags>>, D::Error>
585 where
586 D: Deserializer<'de>,
587 {
588 Tags::deserialize(deserializer).map(|map| Arc::new(ArcSwap::from_pointee(map)))
589 }
590}