serf_core/
serf.rs

1use std::{
2  collections::HashMap,
3  sync::{Arc, Weak, atomic::AtomicBool},
4};
5
6use async_lock::{Mutex, RwLock};
7use atomic_refcell::AtomicRefCell;
8use futures::stream::FuturesUnordered;
9use memberlist_core::{
10  Memberlist,
11  agnostic_lite::{AsyncSpawner, RuntimeLite},
12  proto::MediumVec,
13  queue::TransmitLimitedQueue,
14  transport::{AddressResolver, Transport},
15};
16
17use super::{
18  Options,
19  broadcast::SerfBroadcast,
20  delegate::{CompositeDelegate, Delegate},
21  event::CrateEvent,
22  snapshot::SnapshotHandle,
23  types::{
24    LamportClock, LamportTime, Members, UserEvents,
25    coordinate::{Coordinate, CoordinateClient},
26  },
27};
28
29mod api;
30pub(crate) mod base;
31
32mod delegate;
33pub(crate) use delegate::*;
34
35mod query;
36pub use query::*;
37
38mod internal_query;
39
40/// Maximum 128 KB snapshot
41pub(crate) const SNAPSHOT_SIZE_LIMIT: u64 = 128 * 1024;
42
43/// Maximum 9KB for event name and payload
44const USER_EVENT_SIZE_LIMIT: usize = 9 * 1024;
45
46/// Exports the default delegate type
47pub type DefaultDelegate<T> = CompositeDelegate<
48  <T as Transport>::Id,
49  <<T as Transport>::Resolver as AddressResolver>::ResolvedAddress,
50>;
51
52pub(crate) struct CoordCore<I> {
53  pub(crate) client: CoordinateClient<I>,
54  pub(crate) cache: parking_lot::RwLock<HashMap<I, Coordinate>>,
55}
56
57/// Stores all the query ids at a specific time
58#[derive(Debug, Clone, Eq, PartialEq)]
59#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
60pub(crate) struct Queries {
61  ltime: LamportTime,
62  query_ids: MediumVec<u32>,
63}
64
65#[derive(Default)]
66pub(crate) struct QueryCore<I, A> {
67  responses: HashMap<LamportTime, QueryResponse<I, A>>,
68  min_time: LamportTime,
69  buffer: Vec<Option<Queries>>,
70}
71
72#[viewit::viewit]
73pub(crate) struct EventCore {
74  min_time: LamportTime,
75  buffer: Vec<Option<UserEvents>>,
76}
77
78/// The state of the Serf instance.
79#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
80pub enum SerfState {
81  /// Alive state
82  Alive,
83  /// Leaving state
84  Leaving,
85  /// Left state
86  Left,
87  /// Shutdown state
88  Shutdown,
89}
90
91impl SerfState {
92  /// Returns the string representation of the state.
93  pub const fn as_str(&self) -> &'static str {
94    match self {
95      Self::Alive => "alive",
96      Self::Leaving => "leaving",
97      Self::Left => "left",
98      Self::Shutdown => "shutdown",
99    }
100  }
101}
102
103impl core::fmt::Display for SerfState {
104  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105    write!(f, "{}", self.as_str())
106  }
107}
108
109struct NumMembers<I, A>(Arc<RwLock<Members<I, A>>>);
110
111impl<I, A> Clone for NumMembers<I, A> {
112  fn clone(&self) -> Self {
113    Self(self.0.clone())
114  }
115}
116
117impl<I, A> From<Arc<RwLock<Members<I, A>>>> for NumMembers<I, A> {
118  fn from(value: Arc<RwLock<Members<I, A>>>) -> Self {
119    Self(value)
120  }
121}
122
123impl<I, A> memberlist_core::queue::NodeCalculator for NumMembers<I, A>
124where
125  I: Send + Sync + 'static,
126  A: Send + Sync + 'static,
127{
128  async fn num_nodes(&self) -> usize {
129    self.0.read().await.states.len()
130  }
131}
132
133pub(crate) struct SerfCore<T, D = DefaultDelegate<T>>
134where
135  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
136  T: Transport,
137{
138  pub(crate) clock: LamportClock,
139  pub(crate) event_clock: LamportClock,
140  pub(crate) query_clock: LamportClock,
141
142  broadcasts: Arc<TransmitLimitedQueue<SerfBroadcast, NumMembers<T::Id, T::ResolvedAddress>>>,
143  event_broadcasts: Arc<TransmitLimitedQueue<SerfBroadcast, NumMembers<T::Id, T::ResolvedAddress>>>,
144  query_broadcasts: Arc<TransmitLimitedQueue<SerfBroadcast, NumMembers<T::Id, T::ResolvedAddress>>>,
145
146  pub(crate) memberlist: Memberlist<T, SerfDelegate<T, D>>,
147  pub(crate) members: Arc<RwLock<Members<T::Id, T::ResolvedAddress>>>,
148  event_tx: async_channel::Sender<CrateEvent<T, D>>,
149  pub(crate) event_join_ignore: AtomicBool,
150
151  pub(crate) event_core: RwLock<EventCore>,
152  query_core: Arc<RwLock<QueryCore<T::Id, T::ResolvedAddress>>>,
153  handles: AtomicRefCell<
154    FuturesUnordered<<<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>>,
155  >,
156  pub(crate) opts: Options,
157
158  state: parking_lot::Mutex<SerfState>,
159
160  join_lock: Mutex<()>,
161
162  snapshot: Option<SnapshotHandle>,
163  #[cfg(feature = "encryption")]
164  key_manager: crate::key_manager::KeyManager<T, D>,
165  shutdown_tx: async_channel::Sender<()>,
166  shutdown_rx: async_channel::Receiver<()>,
167
168  pub(crate) coord_core: Option<Arc<CoordCore<T::Id>>>,
169}
170
171/// Serf is a single node that is part of a single cluster that gets
172/// events about joins/leaves/failures/etc. It is created with the Create
173/// method.
174///
175/// All functions on the Serf structure are safe to call concurrently.
176#[repr(transparent)]
177pub struct Serf<T: Transport, D = DefaultDelegate<T>>
178where
179  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
180  T: Transport,
181{
182  pub(crate) inner: Arc<SerfCore<T, D>>,
183}
184
185impl<T: Transport, D: Delegate> Clone for Serf<T, D>
186where
187  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
188  T: Transport,
189{
190  fn clone(&self) -> Self {
191    Self {
192      inner: self.inner.clone(),
193    }
194  }
195}
196
197impl<T: Transport, D: Delegate> Serf<T, D>
198where
199  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
200  T: Transport,
201{
202  /// Creates a weak reference to this `Serf` instance.
203  ///
204  /// This is used to break reference cycles between `Serf` and components that hold a back-reference,
205  /// such as [`SerfDelegate`] or [`KeyManager`]. Since Rust uses reference counting for ownership,
206  /// strong references would prevent the `Serf` instance from being dropped even when no longer needed.
207  ///
208  /// A weak reference does not keep the inner `SerfCore` alive. It must be upgraded to a strong
209  /// reference using [`SerfWeakRef::upgrade()`] before use, which returns `None` if the `Serf`
210  /// has already been destroyed.
211  ///
212  /// # Returns
213  ///
214  /// A [`SerfWeakRef`] that may be safely stored without preventing cleanup.
215  fn downgrade(&self) -> SerfWeakRef<T, D> {
216    SerfWeakRef {
217      inner: Arc::downgrade(&self.inner),
218    }
219  }
220}
221
222/// A weak reference to a [`Serf`] instance.
223///
224/// This type allows holding a non-owning reference to a `Serf<T, D>` without extending its lifetime.
225/// It is primarily used to break reference cycles.
226///
227/// To access the inner `Serf`, call [`upgrade()`](Self::upgrade), which returns `Some(Serf)` if the
228/// original instance is still alive, or `None` if it has been dropped.
229///
230/// This type is analogous to [`std::sync::Weak`] in relation to [`std::sync::Arc`].
231#[repr(transparent)]
232pub(crate) struct SerfWeakRef<T: Transport, D = DefaultDelegate<T>>
233where
234  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
235  T: Transport,
236{
237  inner: Weak<SerfCore<T, D>>,
238}
239
240impl<T: Transport, D: Delegate> SerfWeakRef<T, D>
241where
242  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
243  T: Transport,
244{
245  /// Attempts to upgrade this weak reference to a strong one.
246  ///
247  /// Returns `Some(Serf<T, D>)` if the referenced `Serf` is still alive, otherwise `None`.
248  ///
249  /// This is typically called within delegate methods or background tasks that need temporary
250  /// access to the `Serf` instance. Always handle the `None` case gracefully, as it indicates
251  /// the `Serf` has already begun shutting down.
252  pub(crate) fn upgrade(&self) -> Option<Serf<T, D>> {
253    self.inner.upgrade().map(|inner| Serf { inner })
254  }
255}