1use std::sync::atomic::Ordering;
2
3use futures::{FutureExt, StreamExt};
4use memberlist_core::{
5 CheapClone,
6 bytes::Bytes,
7 proto::{Data, MaybeResolvedAddress, Meta, Node, OneOrMore, SmallVec},
8 tracing,
9};
10use smol_str::SmolStr;
11
12use crate::{
13 error::Error,
14 event::EventProducer,
15 types::{LeaveMessage, Member, Tags, UserEventMessage},
16};
17
18use super::*;
19
20impl<T> Serf<T>
21where
22 T: Transport,
23{
24 pub async fn new(
26 transport: T::Options,
27 opts: Options,
28 ) -> Result<Self, Error<T, DefaultDelegate<T>>> {
29 Self::new_in(
30 None,
31 None,
32 transport,
33 opts,
34 #[cfg(any(test, feature = "test"))]
35 None,
36 )
37 .await
38 }
39
40 pub async fn with_event_producer(
42 transport: T::Options,
43 opts: Options,
44 ev: EventProducer<T, DefaultDelegate<T>>,
45 ) -> Result<Self, Error<T, DefaultDelegate<T>>> {
46 Self::new_in(
47 Some(ev.tx),
48 None,
49 transport,
50 opts,
51 #[cfg(any(test, feature = "test"))]
52 None,
53 )
54 .await
55 }
56}
57
58impl<T, D> Serf<T, D>
59where
60 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
61 T: Transport,
62{
63 pub async fn with_delegate(
65 transport: T::Options,
66 opts: Options,
67 delegate: D,
68 ) -> Result<Self, Error<T, D>> {
69 Self::new_in(
70 None,
71 Some(delegate),
72 transport,
73 opts,
74 #[cfg(any(test, feature = "test"))]
75 None,
76 )
77 .await
78 }
79
80 pub async fn with_event_producer_and_delegate(
82 transport: T::Options,
83 opts: Options,
84 ev: EventProducer<T, D>,
85 delegate: D,
86 ) -> Result<Self, Error<T, D>> {
87 Self::new_in(
88 Some(ev.tx),
89 Some(delegate),
90 transport,
91 opts,
92 #[cfg(any(test, feature = "test"))]
93 None,
94 )
95 .await
96 }
97
98 #[inline]
100 pub fn local_id(&self) -> &T::Id {
101 self.inner.memberlist.local_id()
102 }
103
104 #[inline]
106 pub fn advertise_node(&self) -> Node<T::Id, T::ResolvedAddress> {
107 self.inner.memberlist.advertise_node()
108 }
109
110 #[inline]
115 #[cfg(feature = "encryption")]
116 #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
117 pub fn encryption_enabled(&self) -> bool {
118 self.inner.memberlist.encryption_enabled()
119 }
120
121 #[inline]
124 pub fn shutdown_rx(&self) -> async_channel::Receiver<()> {
125 self.inner.shutdown_rx.clone()
126 }
127
128 #[inline]
130 pub fn state(&self) -> SerfState {
131 *self.inner.state.lock()
132 }
133
134 #[inline]
136 pub async fn members(&self) -> OneOrMore<Member<T::Id, T::ResolvedAddress>> {
137 self
138 .inner
139 .members
140 .read()
141 .await
142 .states
143 .values()
144 .map(|s| s.member.cheap_clone())
145 .collect()
146 }
147
148 #[inline]
150 pub async fn stats(&self) -> Stats {
151 let (num_members, num_failed, num_left, health_score) = {
152 let members = self.inner.members.read().await;
153 let num_members = members.states.len();
154 let num_failed = members.failed_members.len();
155 let num_left = members.left_members.len();
156 let health_score = self.inner.memberlist.health_score();
157 (num_members, num_failed, num_left, health_score)
158 };
159
160 #[cfg(not(feature = "encryption"))]
161 let encrypted = false;
162 #[cfg(feature = "encryption")]
163 let encrypted = self.inner.memberlist.encryption_enabled();
164
165 Stats {
166 members: num_members,
167 failed: num_failed,
168 left: num_left,
169 health_score,
170 member_time: self.inner.clock.time().into(),
171 event_time: self.inner.event_clock.time().into(),
172 query_time: self.inner.query_clock.time().into(),
173 intent_queue: self.inner.broadcasts.num_queued().await,
174 event_queue: self.inner.event_broadcasts.num_queued().await,
175 query_queue: self.inner.query_broadcasts.num_queued().await,
176 encrypted,
177 coordinate_resets: self
178 .inner
179 .coord_core
180 .as_ref()
181 .map(|coord| coord.client.stats().resets),
182 }
183 }
184
185 #[inline]
188 pub async fn num_members(&self) -> usize {
189 self.inner.members.read().await.states.len()
190 }
191
192 #[cfg(feature = "encryption")]
194 #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
195 #[inline]
196 pub fn key_manager(&self) -> &crate::key_manager::KeyManager<T, D> {
197 &self.inner.key_manager
198 }
199
200 #[inline]
202 pub async fn local_member(&self) -> Member<T::Id, T::ResolvedAddress> {
203 self
204 .inner
205 .members
206 .read()
207 .await
208 .states
209 .get(self.inner.memberlist.local_id())
210 .unwrap()
211 .member
212 .cheap_clone()
213 }
214
215 #[inline]
219 pub async fn set_tags(&self, tags: Tags) -> Result<(), Error<T, D>> {
220 let tags_encoded_len = tags.encoded_len_with_length_delimited();
222 if tags_encoded_len > Meta::MAX_SIZE {
223 return Err(Error::tags_too_large(tags_encoded_len));
224 }
225 self.inner.opts.tags.store(Arc::new(tags));
227
228 self
230 .inner
231 .memberlist
232 .update_node(self.inner.opts.broadcast_timeout)
233 .await
234 .map_err(From::from)
235 }
236
237 #[inline]
241 pub async fn user_event(
242 &self,
243 name: impl Into<SmolStr>,
244 payload: impl Into<Bytes>,
245 coalesce: bool,
246 ) -> Result<(), Error<T, D>> {
247 let name: SmolStr = name.into();
248 let payload: Bytes = payload.into();
249 let payload_size_before_encoding = name.len() + payload.len();
250
251 if payload_size_before_encoding > self.inner.opts.max_user_event_size {
253 return Err(Error::user_event_limit_too_large(
254 self.inner.opts.max_user_event_size,
255 ));
256 }
257
258 if payload_size_before_encoding > USER_EVENT_SIZE_LIMIT {
259 return Err(Error::user_event_too_large(USER_EVENT_SIZE_LIMIT));
260 }
261
262 let msg = UserEventMessage {
264 ltime: self.inner.event_clock.time(),
265 name: name.clone(),
266 payload,
267 cc: coalesce,
268 };
269
270 let len = crate::types::encoded_message_len(&msg);
272
273 if len > self.inner.opts.max_user_event_size {
276 return Err(Error::raw_user_event_too_large(len));
277 }
278
279 if len > USER_EVENT_SIZE_LIMIT {
280 return Err(Error::raw_user_event_too_large(len));
281 }
282
283 let raw = crate::types::encode_message_to_bytes(&msg)?;
284
285 self.inner.event_clock.increment();
286
287 self.handle_user_event(either::Either::Right(msg)).await;
289
290 self
291 .inner
292 .event_broadcasts
293 .queue_broadcast(SerfBroadcast {
294 msg: raw,
295 notify_tx: None,
296 })
297 .await;
298 Ok(())
299 }
300
301 pub async fn query(
305 &self,
306 name: impl Into<SmolStr>,
307 payload: impl Into<Bytes>,
308 params: Option<QueryParam<T::Id>>,
309 ) -> Result<QueryResponse<T::Id, T::ResolvedAddress>, Error<T, D>> {
310 self
311 .query_in(name.into(), payload.into(), params, None)
312 .await
313 }
314
315 pub async fn join(
319 &self,
320 node: MaybeResolvedAddress<T::Address, T::ResolvedAddress>,
321 ignore_old: bool,
322 ) -> Result<T::ResolvedAddress, Error<T, D>> {
323 let current_state = self.state();
325 if current_state != SerfState::Alive {
326 return Err(Error::bad_join_status(current_state));
327 }
328
329 let _join_lock = self.inner.join_lock.lock().await;
331
332 if ignore_old {
335 self.inner.event_join_ignore.store(true, Ordering::SeqCst);
336 }
337
338 match self.inner.memberlist.join(node).await {
340 Ok(node) => {
341 if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
343 if ignore_old {
344 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
345 }
346 return Err(e);
347 }
348 if ignore_old {
349 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
350 }
351
352 Ok(node)
353 }
354 Err(e) => {
355 if ignore_old {
356 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
357 }
358 Err(Error::from(e))
359 }
360 }
361 }
362
363 pub async fn join_many(
367 &self,
368 existing: impl Iterator<Item = MaybeResolvedAddress<T::Address, T::ResolvedAddress>>,
369 ignore_old: bool,
370 ) -> Result<SmallVec<T::ResolvedAddress>, (SmallVec<T::ResolvedAddress>, Error<T, D>)> {
371 let current_state = self.state();
373 if current_state != SerfState::Alive {
374 return Err((SmallVec::new(), Error::bad_join_status(current_state)));
375 }
376
377 let _join_lock = self.inner.join_lock.lock().await;
379
380 if ignore_old {
383 self.inner.event_join_ignore.store(true, Ordering::SeqCst);
384 }
385
386 match self.inner.memberlist.join_many(existing).await {
388 Ok(joined) => {
389 if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
391 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
392 return Err((joined, e));
393 }
394 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
395 Ok(joined)
396 }
397 Err((joined, err)) => {
398 if !joined.is_empty() {
400 if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
402 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
403 return Err((
404 joined,
405 Error::Multiple(std::sync::Arc::from_iter([err.into(), e])),
406 ));
407 }
408
409 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
410 Err((joined, Error::from(err)))
411 } else {
412 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
413 Err((joined, Error::from(err)))
414 }
415 }
416 }
417 }
418
419 pub async fn leave(&self) -> Result<(), Error<T, D>> {
423 {
425 let mut s = self.inner.state.lock();
426 match *s {
427 SerfState::Left => return Ok(()),
428 SerfState::Leaving => return Err(Error::bad_leave_status(*s)),
429 SerfState::Shutdown => return Err(Error::bad_leave_status(*s)),
430 _ => {
431 *s = SerfState::Leaving;
433 }
434 }
435 }
436
437 if let Some(ref snap) = self.inner.snapshot {
439 snap.leave().await;
440 }
441
442 let msg = LeaveMessage {
444 ltime: self.inner.clock.time(),
445 id: self.inner.memberlist.local_id().cheap_clone(),
446 prune: false,
447 };
448
449 self.inner.clock.increment();
450
451 self.handle_node_leave_intent(&msg).await;
453
454 if self.has_alive_members().await {
457 let (notify_tx, notify_rx) = async_channel::bounded(1);
458 let msg = crate::types::encode_message_to_bytes(&msg)?;
459 self.broadcast(msg, Some(notify_tx)).await?;
460
461 futures::select! {
462 _ = notify_rx.recv().fuse() => {
463 }
465 _ = <T::Runtime as RuntimeLite>::sleep(self.inner.opts.broadcast_timeout).fuse() => {
466 tracing::warn!("serf: timeout while waiting for graceful leave");
467 }
468 }
469 }
470
471 if let Err(e) = self
473 .inner
474 .memberlist
475 .leave(self.inner.opts.broadcast_timeout)
476 .await
477 {
478 tracing::warn!("serf: timeout waiting for leave broadcast: {}", e);
479 }
480
481 <T::Runtime as RuntimeLite>::sleep(self.inner.opts.leave_propagate_delay).await;
487
488 {
490 let mut s = self.inner.state.lock();
491 match *s {
492 SerfState::Shutdown => {}
493 _ => {
494 *s = SerfState::Left;
495 }
496 }
497 }
498 Ok(())
499 }
500
501 pub async fn remove_failed_node(&self, id: T::Id) -> Result<(), Error<T, D>> {
506 self.force_leave(id, false).await
507 }
508
509 pub async fn remove_failed_node_prune(&self, id: T::Id) -> Result<(), Error<T, D>> {
514 self.force_leave(id, true).await
515 }
516
517 pub async fn shutdown(&self) -> Result<(), Error<T, D>> {
526 {
527 let mut s = self.inner.state.lock();
528 match *s {
529 SerfState::Shutdown => return Ok(()),
530 SerfState::Left => {}
531 _ => {
532 tracing::warn!("serf: shutdown without a leave");
533 }
534 }
535
536 *s = SerfState::Shutdown;
540 }
541 self.inner.memberlist.shutdown().await?;
542 self.inner.shutdown_tx.close();
543
544 if let Some(ref snap) = self.inner.snapshot {
546 snap.wait().await;
547 }
548
549 loop {
550 if let Ok(mut handles) = self.inner.handles.try_borrow_mut() {
551 let mut futs = core::mem::take(&mut *handles);
552 while futs.next().await.is_some() {}
553 break;
554 }
555 }
556
557 Ok(())
558 }
559
560 pub fn cooridate(&self) -> Result<Coordinate, Error<T, D>> {
562 if let Some(ref coord) = self.inner.coord_core {
563 return Ok(coord.client.get_coordinate());
564 }
565
566 Err(Error::coordinates_disabled())
567 }
568
569 pub fn cached_coordinate(&self, id: &T::Id) -> Result<Option<Coordinate>, Error<T, D>> {
572 if let Some(ref coord) = self.inner.coord_core {
573 return Ok(coord.cache.read().get(id).cloned());
574 }
575
576 Err(Error::coordinates_disabled())
577 }
578
579 #[inline]
581 pub fn memberlist(&self) -> &Memberlist<T, SerfDelegate<T, D>> {
582 &self.inner.memberlist
583 }
584}
585
586#[viewit::viewit(vis_all = "", getters(vis_all = "pub", prefix = "get"), setters(skip))]
587#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
588pub struct Stats {
589 members: usize,
590 failed: usize,
591 left: usize,
592 health_score: usize,
593 member_time: u64,
594 event_time: u64,
595 query_time: u64,
596 intent_queue: usize,
597 event_queue: usize,
598 query_queue: usize,
599 encrypted: bool,
600 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
601 coordinate_resets: Option<usize>,
602}