1use std::sync::atomic::Ordering;
2
3use futures::{FutureExt, StreamExt};
4use memberlist_core::{
5 bytes::{BufMut, Bytes, BytesMut},
6 tracing,
7 transport::{MaybeResolvedAddress, Node},
8 types::{Meta, OneOrMore, SmallVec},
9 CheapClone,
10};
11use smol_str::SmolStr;
12
13use crate::{
14 delegate::TransformDelegate,
15 error::{Error, JoinError},
16 event::EventProducer,
17 types::{LeaveMessage, Member, MessageType, SerfMessage, Tags, UserEventMessage},
18};
19
20use super::*;
21
22impl<T> Serf<T>
23where
24 T: Transport,
25{
26 pub async fn new(
28 transport: T::Options,
29 opts: Options,
30 ) -> Result<Self, Error<T, DefaultDelegate<T>>> {
31 Self::new_in(
32 None,
33 None,
34 transport,
35 opts,
36 #[cfg(any(test, feature = "test"))]
37 None,
38 )
39 .await
40 }
41
42 pub async fn with_event_producer(
44 transport: T::Options,
45 opts: Options,
46 ev: EventProducer<T, DefaultDelegate<T>>,
47 ) -> Result<Self, Error<T, DefaultDelegate<T>>> {
48 Self::new_in(
49 Some(ev.tx),
50 None,
51 transport,
52 opts,
53 #[cfg(any(test, feature = "test"))]
54 None,
55 )
56 .await
57 }
58}
59
60impl<T, D> Serf<T, D>
61where
62 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
63 T: Transport,
64{
65 pub async fn with_delegate(
67 transport: T::Options,
68 opts: Options,
69 delegate: D,
70 ) -> Result<Self, Error<T, D>> {
71 Self::new_in(
72 None,
73 Some(delegate),
74 transport,
75 opts,
76 #[cfg(any(test, feature = "test"))]
77 None,
78 )
79 .await
80 }
81
82 pub async fn with_event_producer_and_delegate(
84 transport: T::Options,
85 opts: Options,
86 ev: EventProducer<T, D>,
87 delegate: D,
88 ) -> Result<Self, Error<T, D>> {
89 Self::new_in(
90 Some(ev.tx),
91 Some(delegate),
92 transport,
93 opts,
94 #[cfg(any(test, feature = "test"))]
95 None,
96 )
97 .await
98 }
99
100 #[inline]
102 pub fn local_id(&self) -> &T::Id {
103 self.inner.memberlist.local_id()
104 }
105
106 #[inline]
108 pub fn advertise_node(&self) -> Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress> {
109 self.inner.memberlist.advertise_node()
110 }
111
112 #[inline]
117 #[cfg(feature = "encryption")]
118 #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
119 pub fn encryption_enabled(&self) -> bool {
120 self.inner.memberlist.encryption_enabled()
121 }
122
123 #[inline]
126 pub fn shutdown_rx(&self) -> async_channel::Receiver<()> {
127 self.inner.shutdown_rx.clone()
128 }
129
130 #[inline]
132 pub fn state(&self) -> SerfState {
133 *self.inner.state.lock()
134 }
135
136 #[inline]
138 pub async fn members(
139 &self,
140 ) -> OneOrMore<Member<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>> {
141 self
142 .inner
143 .members
144 .read()
145 .await
146 .states
147 .values()
148 .map(|s| s.member.cheap_clone())
149 .collect()
150 }
151
152 #[inline]
154 pub async fn stats(&self) -> Stats {
155 let (num_members, num_failed, num_left, health_score) = {
156 let members = self.inner.members.read().await;
157 let num_members = members.states.len();
158 let num_failed = members.failed_members.len();
159 let num_left = members.left_members.len();
160 let health_score = self.inner.memberlist.health_score();
161 (num_members, num_failed, num_left, health_score)
162 };
163
164 #[cfg(not(feature = "encryption"))]
165 let encrypted = false;
166 #[cfg(feature = "encryption")]
167 let encrypted = self.inner.memberlist.encryption_enabled();
168
169 Stats {
170 members: num_members,
171 failed: num_failed,
172 left: num_left,
173 health_score,
174 member_time: self.inner.clock.time().into(),
175 event_time: self.inner.event_clock.time().into(),
176 query_time: self.inner.query_clock.time().into(),
177 intent_queue: self.inner.broadcasts.num_queued().await,
178 event_queue: self.inner.event_broadcasts.num_queued().await,
179 query_queue: self.inner.query_broadcasts.num_queued().await,
180 encrypted,
181 coordinate_resets: self
182 .inner
183 .coord_core
184 .as_ref()
185 .map(|coord| coord.client.stats().resets),
186 }
187 }
188
189 #[inline]
192 pub async fn num_members(&self) -> usize {
193 self.inner.members.read().await.states.len()
194 }
195
196 #[cfg(feature = "encryption")]
198 #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
199 #[inline]
200 pub fn key_manager(&self) -> &crate::key_manager::KeyManager<T, D> {
201 &self.inner.key_manager
202 }
203
204 #[inline]
206 pub async fn local_member(
207 &self,
208 ) -> Member<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress> {
209 self
210 .inner
211 .members
212 .read()
213 .await
214 .states
215 .get(self.inner.memberlist.local_id())
216 .unwrap()
217 .member
218 .cheap_clone()
219 }
220
221 #[inline]
225 pub async fn set_tags(&self, tags: Tags) -> Result<(), Error<T, D>> {
226 let tags_encoded_len = <D as TransformDelegate>::tags_encoded_len(&tags);
228 if tags_encoded_len > Meta::MAX_SIZE {
229 return Err(Error::tags_too_large(tags_encoded_len));
230 }
231 self.inner.opts.tags.store(Arc::new(tags));
233
234 self
236 .inner
237 .memberlist
238 .update_node(self.inner.opts.broadcast_timeout)
239 .await
240 .map_err(From::from)
241 }
242
243 #[inline]
247 pub async fn user_event(
248 &self,
249 name: impl Into<SmolStr>,
250 payload: impl Into<Bytes>,
251 coalesce: bool,
252 ) -> Result<(), Error<T, D>> {
253 let name: SmolStr = name.into();
254 let payload: Bytes = payload.into();
255 let payload_size_before_encoding = name.len() + payload.len();
256
257 if payload_size_before_encoding > self.inner.opts.max_user_event_size {
259 return Err(Error::user_event_limit_too_large(
260 self.inner.opts.max_user_event_size,
261 ));
262 }
263
264 if payload_size_before_encoding > USER_EVENT_SIZE_LIMIT {
265 return Err(Error::user_event_too_large(USER_EVENT_SIZE_LIMIT));
266 }
267
268 let msg = UserEventMessage {
270 ltime: self.inner.event_clock.time(),
271 name: name.clone(),
272 payload,
273 cc: coalesce,
274 };
275
276 let len = <D as TransformDelegate>::message_encoded_len(&msg);
278
279 if len > self.inner.opts.max_user_event_size {
282 return Err(Error::raw_user_event_too_large(len));
283 }
284
285 if len > USER_EVENT_SIZE_LIMIT {
286 return Err(Error::raw_user_event_too_large(len));
287 }
288
289 let mut raw = BytesMut::with_capacity(len + 1); raw.put_u8(MessageType::UserEvent as u8);
291 raw.resize(len + 1, 0);
292
293 let actual_encoded_len = <D as TransformDelegate>::encode_message(&msg, &mut raw[1..])
294 .map_err(Error::transform_delegate)?;
295 debug_assert_eq!(
296 actual_encoded_len, len,
297 "expected encoded len {} mismatch the actual encoded len {}",
298 len, actual_encoded_len
299 );
300
301 self.inner.event_clock.increment();
302
303 self.handle_user_event(msg).await;
305
306 self
307 .inner
308 .event_broadcasts
309 .queue_broadcast(SerfBroadcast {
310 msg: raw.freeze(),
311 notify_tx: None,
312 })
313 .await;
314 Ok(())
315 }
316
317 pub async fn query(
322 &self,
323 name: impl Into<SmolStr>,
324 payload: impl Into<Bytes>,
325 params: Option<QueryParam<T::Id>>,
326 ) -> Result<QueryResponse<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>, Error<T, D>>
327 {
328 self
329 .query_in(name.into(), payload.into(), params, None)
330 .await
331 }
332
333 pub async fn join(
337 &self,
338 node: Node<T::Id, MaybeResolvedAddress<T>>,
339 ignore_old: bool,
340 ) -> Result<Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>, Error<T, D>> {
341 let current_state = self.state();
343 if current_state != SerfState::Alive {
344 return Err(Error::bad_join_status(current_state));
345 }
346
347 let _join_lock = self.inner.join_lock.lock().await;
349
350 if ignore_old {
353 self.inner.event_join_ignore.store(true, Ordering::SeqCst);
354 }
355
356 match self.inner.memberlist.join(node).await {
358 Ok(node) => {
359 if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
361 if ignore_old {
362 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
363 }
364 return Err(e);
365 }
366 if ignore_old {
367 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
368 }
369
370 Ok(node)
371 }
372 Err(e) => {
373 if ignore_old {
374 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
375 }
376 Err(Error::from(e))
377 }
378 }
379 }
380
381 pub async fn join_many(
385 &self,
386 existing: impl Iterator<Item = Node<T::Id, MaybeResolvedAddress<T>>>,
387 ignore_old: bool,
388 ) -> Result<
389 SmallVec<Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>,
390 JoinError<T, D>,
391 > {
392 let current_state = self.state();
394 if current_state != SerfState::Alive {
395 return Err(JoinError {
396 joined: SmallVec::new(),
397 errors: existing
398 .into_iter()
399 .map(|node| (node, Error::bad_join_status(current_state)))
400 .collect(),
401 broadcast_error: None,
402 });
403 }
404
405 let _join_lock = self.inner.join_lock.lock().await;
407
408 if ignore_old {
411 self.inner.event_join_ignore.store(true, Ordering::SeqCst);
412 }
413
414 match self.inner.memberlist.join_many(existing).await {
416 Ok(joined) => {
417 if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
419 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
420 return Err(JoinError {
421 joined,
422 errors: Default::default(),
423 broadcast_error: Some(e),
424 });
425 }
426 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
427 Ok(joined)
428 }
429 Err(e) => {
430 let (joined, errors) = e.into();
431 if !joined.is_empty() {
433 if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
435 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
436 return Err(JoinError {
437 joined,
438 errors: errors
439 .into_iter()
440 .map(|(addr, err)| (addr, err.into()))
441 .collect(),
442 broadcast_error: Some(e),
443 });
444 }
445
446 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
447 Err(JoinError {
448 joined,
449 errors: errors
450 .into_iter()
451 .map(|(addr, err)| (addr, err.into()))
452 .collect(),
453 broadcast_error: None,
454 })
455 } else {
456 self.inner.event_join_ignore.store(false, Ordering::SeqCst);
457 Err(JoinError {
458 joined,
459 errors: errors
460 .into_iter()
461 .map(|(addr, err)| (addr, err.into()))
462 .collect(),
463 broadcast_error: None,
464 })
465 }
466 }
467 }
468 }
469
470 pub async fn leave(&self) -> Result<(), Error<T, D>> {
474 {
476 let mut s = self.inner.state.lock();
477 match *s {
478 SerfState::Left => return Ok(()),
479 SerfState::Leaving => return Err(Error::bad_leave_status(*s)),
480 SerfState::Shutdown => return Err(Error::bad_leave_status(*s)),
481 _ => {
482 *s = SerfState::Leaving;
484 }
485 }
486 }
487
488 if let Some(ref snap) = self.inner.snapshot {
490 snap.leave().await;
491 }
492
493 let msg = LeaveMessage {
495 ltime: self.inner.clock.time(),
496 id: self.inner.memberlist.local_id().cheap_clone(),
497 prune: false,
498 };
499
500 self.inner.clock.increment();
501
502 self.handle_node_leave_intent(&msg).await;
504
505 let msg = SerfMessage::Leave(msg);
506
507 if self.has_alive_members().await {
510 let (notify_tx, notify_rx) = async_channel::bounded(1);
511 self.broadcast(msg, Some(notify_tx)).await?;
512
513 futures::select! {
514 _ = notify_rx.recv().fuse() => {
515 }
517 _ = <T::Runtime as RuntimeLite>::sleep(self.inner.opts.broadcast_timeout).fuse() => {
518 tracing::warn!("serf: timeout while waiting for graceful leave");
519 }
520 }
521 }
522
523 if let Err(e) = self
525 .inner
526 .memberlist
527 .leave(self.inner.opts.broadcast_timeout)
528 .await
529 {
530 tracing::warn!("serf: timeout waiting for leave broadcast: {}", e);
531 }
532
533 <T::Runtime as RuntimeLite>::sleep(self.inner.opts.leave_propagate_delay).await;
539
540 {
542 let mut s = self.inner.state.lock();
543 match *s {
544 SerfState::Shutdown => {}
545 _ => {
546 *s = SerfState::Left;
547 }
548 }
549 }
550 Ok(())
551 }
552
553 pub async fn remove_failed_node(&self, id: T::Id) -> Result<(), Error<T, D>> {
558 self.force_leave(id, false).await
559 }
560
561 pub async fn remove_failed_node_prune(&self, id: T::Id) -> Result<(), Error<T, D>> {
566 self.force_leave(id, true).await
567 }
568
569 pub async fn shutdown(&self) -> Result<(), Error<T, D>> {
578 {
579 let mut s = self.inner.state.lock();
580 match *s {
581 SerfState::Shutdown => return Ok(()),
582 SerfState::Left => {}
583 _ => {
584 tracing::warn!("serf: shutdown without a leave");
585 }
586 }
587
588 *s = SerfState::Shutdown;
592 }
593 self.inner.memberlist.shutdown().await?;
594 self.inner.shutdown_tx.close();
595
596 if let Some(ref snap) = self.inner.snapshot {
598 snap.wait().await;
599 }
600
601 loop {
602 if let Ok(mut handles) = self.inner.handles.try_borrow_mut() {
603 let mut futs = core::mem::take(&mut *handles);
604 while futs.next().await.is_some() {}
605 break;
606 }
607 }
608
609 Ok(())
610 }
611
612 pub fn cooridate(&self) -> Result<Coordinate, Error<T, D>> {
614 if let Some(ref coord) = self.inner.coord_core {
615 return Ok(coord.client.get_coordinate());
616 }
617
618 Err(Error::coordinates_disabled())
619 }
620
621 pub fn cached_coordinate(&self, id: &T::Id) -> Result<Option<Coordinate>, Error<T, D>> {
624 if let Some(ref coord) = self.inner.coord_core {
625 return Ok(coord.cache.read().get(id).cloned());
626 }
627
628 Err(Error::coordinates_disabled())
629 }
630
631 #[inline]
633 pub fn memberlist(&self) -> &Memberlist<T, SerfDelegate<T, D>> {
634 &self.inner.memberlist
635 }
636}
637
638#[viewit::viewit(vis_all = "", getters(vis_all = "pub", prefix = "get"), setters(skip))]
639#[cfg_attr(feature = "async-graphql", derive(async_graphql::SimpleObject))]
640#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
641pub struct Stats {
642 members: usize,
643 failed: usize,
644 left: usize,
645 health_score: usize,
646 member_time: u64,
647 event_time: u64,
648 query_time: u64,
649 intent_queue: usize,
650 event_queue: usize,
651 query_queue: usize,
652 encrypted: bool,
653 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
654 coordinate_resets: Option<usize>,
655}