1use std::sync::atomic::Ordering;
2
3use futures::{FutureExt, StreamExt};
4use memberlist_core::{
5 CheapClone,
6 bytes::Bytes,
7 proto::{Data, MaybeResolvedAddress, Meta, Node, OneOrMore, SmallVec},
8 tracing,
9};
10use smol_str::SmolStr;
11
12use crate::{
13 error::Error,
14 event::EventProducer,
15 types::{LeaveMessage, Member, Tags, UserEventMessage},
16};
17
18use super::*;
19
20impl<T> Serf<T>
21where
22 T: Transport,
23{
24 pub async fn new(
26 transport: T::Options,
27 opts: Options,
28 ) -> Result<Self, Error<T, DefaultDelegate<T>>> {
29 Self::new_in(
30 None,
31 None,
32 transport,
33 opts,
34 #[cfg(any(test, feature = "test"))]
35 None,
36 )
37 .await
38 }
39
40 pub async fn with_event_producer(
42 transport: T::Options,
43 opts: Options,
44 ev: EventProducer<T, DefaultDelegate<T>>,
45 ) -> Result<Self, Error<T, DefaultDelegate<T>>> {
46 Self::new_in(
47 Some(ev.tx),
48 None,
49 transport,
50 opts,
51 #[cfg(any(test, feature = "test"))]
52 None,
53 )
54 .await
55 }
56}
57
58impl<T, D> Serf<T, D>
59where
60 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
61 T: Transport,
62{
63 pub async fn with_delegate(
65 transport: T::Options,
66 opts: Options,
67 delegate: D,
68 ) -> Result<Self, Error<T, D>> {
69 Self::new_in(
70 None,
71 Some(delegate),
72 transport,
73 opts,
74 #[cfg(any(test, feature = "test"))]
75 None,
76 )
77 .await
78 }
79
80 pub async fn with_event_producer_and_delegate(
82 transport: T::Options,
83 opts: Options,
84 ev: EventProducer<T, D>,
85 delegate: D,
86 ) -> Result<Self, Error<T, D>> {
87 Self::new_in(
88 Some(ev.tx),
89 Some(delegate),
90 transport,
91 opts,
92 #[cfg(any(test, feature = "test"))]
93 None,
94 )
95 .await
96 }
97
98 #[inline]
100 pub fn local_id(&self) -> &T::Id {
101 self.inner.memberlist.local_id()
102 }
103
104 #[inline]
106 pub fn advertise_node(&self) -> Node<T::Id, T::ResolvedAddress> {
107 self.inner.memberlist.advertise_node()
108 }
109
110 #[inline]
115 #[cfg(feature = "encryption")]
116 #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
117 pub fn encryption_enabled(&self) -> bool {
118 self.inner.memberlist.encryption_enabled()
119 }
120
121 #[inline]
124 pub fn shutdown_rx(&self) -> async_channel::Receiver<()> {
125 self.inner.shutdown_rx.clone()
126 }
127
128 #[inline]
130 pub fn state(&self) -> SerfState {
131 *self.inner.state.lock()
132 }
133
134 #[inline]
136 pub async fn members(&self) -> OneOrMore<Member<T::Id, T::ResolvedAddress>> {
137 self
138 .inner
139 .members
140 .read()
141 .await
142 .states
143 .values()
144 .map(|s| s.member.cheap_clone())
145 .collect()
146 }
147
148 #[inline]
150 pub async fn stats(&self) -> Stats {
151 let (num_members, num_failed, num_left, health_score) = {
152 let members = self.inner.members.read().await;
153 let num_members = members.states.len();
154 let num_failed = members.failed_members.len();
155 let num_left = members.left_members.len();
156 let health_score = self.inner.memberlist.health_score();
157 (num_members, num_failed, num_left, health_score)
158 };
159
160 #[cfg(not(feature = "encryption"))]
161 let encrypted = false;
162 #[cfg(feature = "encryption")]
163 let encrypted = self.inner.memberlist.encryption_enabled();
164
165 Stats {
166 members: num_members,
167 failed: num_failed,
168 left: num_left,
169 health_score,
170 member_time: self.inner.clock.time().into(),
171 event_time: self.inner.event_clock.time().into(),
172 query_time: self.inner.query_clock.time().into(),
173 intent_queue: self.inner.broadcasts.num_queued().await,
174 event_queue: self.inner.event_broadcasts.num_queued().await,
175 query_queue: self.inner.query_broadcasts.num_queued().await,
176 encrypted,
177 coordinate_resets: self
178 .inner
179 .coord_core
180 .as_ref()
181 .map(|coord| coord.client.stats().resets),
182 }
183 }
184
185 #[inline]
188 pub async fn num_members(&self) -> usize {
189 self.inner.members.read().await.states.len()
190 }
191
192 #[cfg(feature = "encryption")]
194 #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
195 #[inline]
196 pub fn key_manager(&self) -> &crate::key_manager::KeyManager<T, D> {
197 &self.inner.key_manager
198 }
199
200 #[inline]
202 pub async fn local_member(&self) -> Member<T::Id, T::ResolvedAddress> {
203 self
204 .inner
205 .members
206 .read()
207 .await
208 .states
209 .get(self.inner.memberlist.local_id())
210 .unwrap()
211 .member
212 .cheap_clone()
213 }
214
215 #[inline]
219 pub async fn set_tags(&self, tags: Tags) -> Result<(), Error<T, D>> {
220 let tags_encoded_len = tags.encoded_len_with_length_delimited();
222 if tags_encoded_len > Meta::MAX_SIZE {
223 return Err(Error::tags_too_large(tags_encoded_len));
224 }
225 self.inner.opts.tags.store(Arc::new(tags));
227
228 self
230 .inner
231 .memberlist
232 .update_node(self.inner.opts.broadcast_timeout)
233 .await
234 .map_err(From::from)
235 }
236
237 #[inline]
241 pub async fn user_event(
242 &self,
243 name: impl Into<SmolStr>,
244 payload: impl Into<Bytes>,
245 coalesce: bool,
246 ) -> Result<(), Error<T, D>> {
247 let name: SmolStr = name.into();
248 let payload: Bytes = payload.into();
249 let payload_size_before_encoding = name.len() + payload.len();
250
251 if payload_size_before_encoding > self.inner.opts.max_user_event_size {
253 return Err(Error::user_event_limit_too_large(
254 self.inner.opts.max_user_event_size,
255 ));
256 }
257
258 if payload_size_before_encoding > USER_EVENT_SIZE_LIMIT {
259 return Err(Error::user_event_too_large(USER_EVENT_SIZE_LIMIT));
260 }
261
262 let msg = UserEventMessage {
264 ltime: self.inner.event_clock.time(),
265 name: name.clone(),
266 payload,
267 cc: coalesce,
268 };
269
270 let len = crate::types::encoded_message_len(&msg);
272
273 if len > self.inner.opts.max_user_event_size {
276 return Err(Error::raw_user_event_too_large(len));
277 }
278
279 if len > USER_EVENT_SIZE_LIMIT {
280 return Err(Error::raw_user_event_too_large(len));
281 }
282
283 let raw = crate::types::encode_message_to_bytes(&msg)?;
284
285 self.inner.event_clock.increment();
286
287 self.handle_user_event(either::Either::Right(msg)).await;
289
290 self
291 .inner
292 .event_broadcasts
293 .queue_broadcast(SerfBroadcast {
294 msg: raw,
295 notify_tx: None,
296 })
297 .await;
298 Ok(())
299 }
300
301 pub async fn query(
305 &self,
306 name: impl Into<SmolStr>,
307 payload: impl Into<Bytes>,
308 params: Option<QueryParam<T::Id>>,
309 ) -> Result<QueryResponse<T::Id, T::ResolvedAddress>, Error<T, D>> {
310 self
311 .query_in(name.into(), payload.into(), params, None)
312 .await
313 }
314
315 pub async fn join(
319 &self,
320 node: Node<T::Id, MaybeResolvedAddress<T::Address, T::ResolvedAddress>>,
321 ignore_old: bool,
322 ) -> Result<Node<T::Id, T::ResolvedAddress>, Error<T, D>> {
323 let current_state = self.state();
325 if current_state != SerfState::Alive {
326 return Err(Error::bad_join_status(current_state));
327 }
328
329 let _join_lock = self.inner.join_lock.lock().await;
331
332 if ignore_old {
335 self.inner.event_join_ignore.store(true, Ordering::SeqCst);
336 }
337
338 match self.inner.memberlist.join(node).await {
340 Ok(node) => {
341 if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
343 if ignore_old {
344 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
345 }
346 return Err(e);
347 }
348 if ignore_old {
349 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
350 }
351
352 Ok(node)
353 }
354 Err(e) => {
355 if ignore_old {
356 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
357 }
358 Err(Error::from(e))
359 }
360 }
361 }
362
363 pub async fn join_many(
367 &self,
368 existing: impl Iterator<Item = Node<T::Id, MaybeResolvedAddress<T::Address, T::ResolvedAddress>>>,
369 ignore_old: bool,
370 ) -> Result<
371 SmallVec<Node<T::Id, T::ResolvedAddress>>,
372 (SmallVec<Node<T::Id, T::ResolvedAddress>>, Error<T, D>),
373 > {
374 let current_state = self.state();
376 if current_state != SerfState::Alive {
377 return Err((SmallVec::new(), Error::bad_join_status(current_state)));
378 }
379
380 let _join_lock = self.inner.join_lock.lock().await;
382
383 if ignore_old {
386 self.inner.event_join_ignore.store(true, Ordering::SeqCst);
387 }
388
389 match self.inner.memberlist.join_many(existing).await {
391 Ok(joined) => {
392 if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
394 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
395 return Err((joined, e));
396 }
397 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
398 Ok(joined)
399 }
400 Err((joined, err)) => {
401 if !joined.is_empty() {
403 if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
405 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
406 return Err((
407 joined,
408 Error::Multiple(std::sync::Arc::from_iter([err.into(), e])),
409 ));
410 }
411
412 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
413 Err((joined, Error::from(err)))
414 } else {
415 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
416 Err((joined, Error::from(err)))
417 }
418 }
419 }
420 }
421
422 pub async fn leave(&self) -> Result<(), Error<T, D>> {
426 {
428 let mut s = self.inner.state.lock();
429 match *s {
430 SerfState::Left => return Ok(()),
431 SerfState::Leaving => return Err(Error::bad_leave_status(*s)),
432 SerfState::Shutdown => return Err(Error::bad_leave_status(*s)),
433 _ => {
434 *s = SerfState::Leaving;
436 }
437 }
438 }
439
440 if let Some(ref snap) = self.inner.snapshot {
442 snap.leave().await;
443 }
444
445 let msg = LeaveMessage {
447 ltime: self.inner.clock.time(),
448 id: self.inner.memberlist.local_id().cheap_clone(),
449 prune: false,
450 };
451
452 self.inner.clock.increment();
453
454 self.handle_node_leave_intent(&msg).await;
456
457 if self.has_alive_members().await {
460 let (notify_tx, notify_rx) = async_channel::bounded(1);
461 let msg = crate::types::encode_message_to_bytes(&msg)?;
462 self.broadcast(msg, Some(notify_tx)).await?;
463
464 futures::select! {
465 _ = notify_rx.recv().fuse() => {
466 }
468 _ = <T::Runtime as RuntimeLite>::sleep(self.inner.opts.broadcast_timeout).fuse() => {
469 tracing::warn!("serf: timeout while waiting for graceful leave");
470 }
471 }
472 }
473
474 if let Err(e) = self
476 .inner
477 .memberlist
478 .leave(self.inner.opts.broadcast_timeout)
479 .await
480 {
481 tracing::warn!("serf: timeout waiting for leave broadcast: {}", e);
482 }
483
484 <T::Runtime as RuntimeLite>::sleep(self.inner.opts.leave_propagate_delay).await;
490
491 {
493 let mut s = self.inner.state.lock();
494 match *s {
495 SerfState::Shutdown => {}
496 _ => {
497 *s = SerfState::Left;
498 }
499 }
500 }
501 Ok(())
502 }
503
504 pub async fn remove_failed_node(&self, id: T::Id) -> Result<(), Error<T, D>> {
509 self.force_leave(id, false).await
510 }
511
512 pub async fn remove_failed_node_prune(&self, id: T::Id) -> Result<(), Error<T, D>> {
517 self.force_leave(id, true).await
518 }
519
520 pub async fn shutdown(&self) -> Result<(), Error<T, D>> {
529 {
530 let mut s = self.inner.state.lock();
531 match *s {
532 SerfState::Shutdown => return Ok(()),
533 SerfState::Left => {}
534 _ => {
535 tracing::warn!("serf: shutdown without a leave");
536 }
537 }
538
539 *s = SerfState::Shutdown;
543 }
544 self.inner.memberlist.shutdown().await?;
545 self.inner.shutdown_tx.close();
546
547 if let Some(ref snap) = self.inner.snapshot {
549 snap.wait().await;
550 }
551
552 loop {
553 if let Ok(mut handles) = self.inner.handles.try_borrow_mut() {
554 let mut futs = core::mem::take(&mut *handles);
555 while futs.next().await.is_some() {}
556 break;
557 }
558 }
559
560 Ok(())
561 }
562
563 pub fn cooridate(&self) -> Result<Coordinate, Error<T, D>> {
565 if let Some(ref coord) = self.inner.coord_core {
566 return Ok(coord.client.get_coordinate());
567 }
568
569 Err(Error::coordinates_disabled())
570 }
571
572 pub fn cached_coordinate(&self, id: &T::Id) -> Result<Option<Coordinate>, Error<T, D>> {
575 if let Some(ref coord) = self.inner.coord_core {
576 return Ok(coord.cache.read().get(id).cloned());
577 }
578
579 Err(Error::coordinates_disabled())
580 }
581
582 #[inline]
584 pub fn memberlist(&self) -> &Memberlist<T, SerfDelegate<T, D>> {
585 &self.inner.memberlist
586 }
587}
588
589#[viewit::viewit(vis_all = "", getters(vis_all = "pub", prefix = "get"), setters(skip))]
590#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
591pub struct Stats {
592 members: usize,
593 failed: usize,
594 left: usize,
595 health_score: usize,
596 member_time: u64,
597 event_time: u64,
598 query_time: u64,
599 intent_queue: usize,
600 event_queue: usize,
601 query_queue: usize,
602 encrypted: bool,
603 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
604 coordinate_resets: Option<usize>,
605}