serf_core/
serf.rs

1use std::{
2  collections::HashMap,
3  sync::{atomic::AtomicBool, Arc},
4};
5
6use async_lock::{Mutex, RwLock};
7use atomic_refcell::AtomicRefCell;
8use futures::stream::FuturesUnordered;
9use memberlist_core::{
10  agnostic_lite::{AsyncSpawner, RuntimeLite},
11  queue::TransmitLimitedQueue,
12  transport::{AddressResolver, Transport},
13  types::MediumVec,
14  Memberlist,
15};
16
17use super::{
18  broadcast::SerfBroadcast,
19  coordinate::{Coordinate, CoordinateClient},
20  delegate::{CompositeDelegate, Delegate},
21  event::CrateEvent,
22  snapshot::SnapshotHandle,
23  types::{LamportClock, LamportTime, Members, UserEvents},
24  Options,
25};
26
27mod api;
28pub(crate) mod base;
29
30mod delegate;
31pub(crate) use delegate::*;
32
33mod query;
34pub use query::*;
35
36mod internal_query;
37
38/// Maximum 128 KB snapshot
39pub(crate) const SNAPSHOT_SIZE_LIMIT: u64 = 128 * 1024;
40
41/// Maximum 9KB for event name and payload
42const USER_EVENT_SIZE_LIMIT: usize = 9 * 1024;
43
44/// Exports the default delegate type
45pub type DefaultDelegate<T> = CompositeDelegate<
46  <T as Transport>::Id,
47  <<T as Transport>::Resolver as AddressResolver>::ResolvedAddress,
48>;
49
50pub(crate) struct CoordCore<I> {
51  pub(crate) client: CoordinateClient<I>,
52  pub(crate) cache: parking_lot::RwLock<HashMap<I, Coordinate>>,
53}
54
55/// Stores all the query ids at a specific time
56#[derive(Debug, Clone, Eq, PartialEq)]
57#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
58pub(crate) struct Queries {
59  ltime: LamportTime,
60  query_ids: MediumVec<u32>,
61}
62
63#[derive(Default)]
64pub(crate) struct QueryCore<I, A> {
65  responses: HashMap<LamportTime, QueryResponse<I, A>>,
66  min_time: LamportTime,
67  buffer: Vec<Option<Queries>>,
68}
69
70#[viewit::viewit]
71pub(crate) struct EventCore {
72  min_time: LamportTime,
73  buffer: Vec<Option<UserEvents>>,
74}
75
76/// The state of the Serf instance.
77#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
78pub enum SerfState {
79  /// Alive state
80  Alive,
81  /// Leaving state
82  Leaving,
83  /// Left state
84  Left,
85  /// Shutdown state
86  Shutdown,
87}
88
89impl SerfState {
90  /// Returns the string representation of the state.
91  pub const fn as_str(&self) -> &'static str {
92    match self {
93      Self::Alive => "alive",
94      Self::Leaving => "leaving",
95      Self::Left => "left",
96      Self::Shutdown => "shutdown",
97    }
98  }
99}
100
101impl core::fmt::Display for SerfState {
102  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103    write!(f, "{}", self.as_str())
104  }
105}
106
107struct NumMembers<I, A>(Arc<RwLock<Members<I, A>>>);
108
109impl<I, A> Clone for NumMembers<I, A> {
110  fn clone(&self) -> Self {
111    Self(self.0.clone())
112  }
113}
114
115impl<I, A> From<Arc<RwLock<Members<I, A>>>> for NumMembers<I, A> {
116  fn from(value: Arc<RwLock<Members<I, A>>>) -> Self {
117    Self(value)
118  }
119}
120
121impl<I, A> memberlist_core::queue::NodeCalculator for NumMembers<I, A>
122where
123  I: Send + Sync + 'static,
124  A: Send + Sync + 'static,
125{
126  async fn num_nodes(&self) -> usize {
127    self.0.read().await.states.len()
128  }
129}
130
131pub(crate) struct SerfCore<T, D = DefaultDelegate<T>>
132where
133  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
134  T: Transport,
135{
136  pub(crate) clock: LamportClock,
137  pub(crate) event_clock: LamportClock,
138  pub(crate) query_clock: LamportClock,
139
140  broadcasts: Arc<
141    TransmitLimitedQueue<
142      SerfBroadcast,
143      NumMembers<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
144    >,
145  >,
146  event_broadcasts: Arc<
147    TransmitLimitedQueue<
148      SerfBroadcast,
149      NumMembers<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
150    >,
151  >,
152  query_broadcasts: Arc<
153    TransmitLimitedQueue<
154      SerfBroadcast,
155      NumMembers<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
156    >,
157  >,
158
159  pub(crate) memberlist: Memberlist<T, SerfDelegate<T, D>>,
160  pub(crate) members:
161    Arc<RwLock<Members<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>>,
162  event_tx: async_channel::Sender<CrateEvent<T, D>>,
163  pub(crate) event_join_ignore: AtomicBool,
164
165  pub(crate) event_core: RwLock<EventCore>,
166  query_core: Arc<RwLock<QueryCore<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>>,
167  handles: AtomicRefCell<
168    FuturesUnordered<<<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>>,
169  >,
170  pub(crate) opts: Options,
171
172  state: parking_lot::Mutex<SerfState>,
173
174  join_lock: Mutex<()>,
175
176  snapshot: Option<SnapshotHandle>,
177  #[cfg(feature = "encryption")]
178  key_manager: crate::key_manager::KeyManager<T, D>,
179  shutdown_tx: async_channel::Sender<()>,
180  shutdown_rx: async_channel::Receiver<()>,
181
182  pub(crate) coord_core: Option<Arc<CoordCore<T::Id>>>,
183}
184
185/// Serf is a single node that is part of a single cluster that gets
186/// events about joins/leaves/failures/etc. It is created with the Create
187/// method.
188///
189/// All functions on the Serf structure are safe to call concurrently.
190#[repr(transparent)]
191pub struct Serf<T: Transport, D = DefaultDelegate<T>>
192where
193  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
194  T: Transport,
195{
196  pub(crate) inner: Arc<SerfCore<T, D>>,
197}
198
199impl<T: Transport, D: Delegate> Clone for Serf<T, D>
200where
201  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
202  T: Transport,
203{
204  fn clone(&self) -> Self {
205    Self {
206      inner: self.inner.clone(),
207    }
208  }
209}