1use std::{
2 collections::HashMap,
3 sync::{atomic::AtomicBool, Arc},
4};
5
6use async_lock::{Mutex, RwLock};
7use atomic_refcell::AtomicRefCell;
8use futures::stream::FuturesUnordered;
9use memberlist_core::{
10 agnostic_lite::{AsyncSpawner, RuntimeLite},
11 queue::TransmitLimitedQueue,
12 transport::{AddressResolver, Transport},
13 types::MediumVec,
14 Memberlist,
15};
16
17use super::{
18 broadcast::SerfBroadcast,
19 coordinate::{Coordinate, CoordinateClient},
20 delegate::{CompositeDelegate, Delegate},
21 event::CrateEvent,
22 snapshot::SnapshotHandle,
23 types::{LamportClock, LamportTime, Members, UserEvents},
24 Options,
25};
26
27mod api;
28pub(crate) mod base;
29
30mod delegate;
31pub(crate) use delegate::*;
32
33mod query;
34pub use query::*;
35
36mod internal_query;
37
38pub(crate) const SNAPSHOT_SIZE_LIMIT: u64 = 128 * 1024;
40
41const USER_EVENT_SIZE_LIMIT: usize = 9 * 1024;
43
44pub type DefaultDelegate<T> = CompositeDelegate<
46 <T as Transport>::Id,
47 <<T as Transport>::Resolver as AddressResolver>::ResolvedAddress,
48>;
49
50pub(crate) struct CoordCore<I> {
51 pub(crate) client: CoordinateClient<I>,
52 pub(crate) cache: parking_lot::RwLock<HashMap<I, Coordinate>>,
53}
54
55#[derive(Debug, Clone, Eq, PartialEq)]
57#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
58pub(crate) struct Queries {
59 ltime: LamportTime,
60 query_ids: MediumVec<u32>,
61}
62
63#[derive(Default)]
64pub(crate) struct QueryCore<I, A> {
65 responses: HashMap<LamportTime, QueryResponse<I, A>>,
66 min_time: LamportTime,
67 buffer: Vec<Option<Queries>>,
68}
69
70#[viewit::viewit]
71pub(crate) struct EventCore {
72 min_time: LamportTime,
73 buffer: Vec<Option<UserEvents>>,
74}
75
76#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
78pub enum SerfState {
79 Alive,
81 Leaving,
83 Left,
85 Shutdown,
87}
88
89impl SerfState {
90 pub const fn as_str(&self) -> &'static str {
92 match self {
93 Self::Alive => "alive",
94 Self::Leaving => "leaving",
95 Self::Left => "left",
96 Self::Shutdown => "shutdown",
97 }
98 }
99}
100
101impl core::fmt::Display for SerfState {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 write!(f, "{}", self.as_str())
104 }
105}
106
107struct NumMembers<I, A>(Arc<RwLock<Members<I, A>>>);
108
109impl<I, A> Clone for NumMembers<I, A> {
110 fn clone(&self) -> Self {
111 Self(self.0.clone())
112 }
113}
114
115impl<I, A> From<Arc<RwLock<Members<I, A>>>> for NumMembers<I, A> {
116 fn from(value: Arc<RwLock<Members<I, A>>>) -> Self {
117 Self(value)
118 }
119}
120
121impl<I, A> memberlist_core::queue::NodeCalculator for NumMembers<I, A>
122where
123 I: Send + Sync + 'static,
124 A: Send + Sync + 'static,
125{
126 async fn num_nodes(&self) -> usize {
127 self.0.read().await.states.len()
128 }
129}
130
131pub(crate) struct SerfCore<T, D = DefaultDelegate<T>>
132where
133 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
134 T: Transport,
135{
136 pub(crate) clock: LamportClock,
137 pub(crate) event_clock: LamportClock,
138 pub(crate) query_clock: LamportClock,
139
140 broadcasts: Arc<
141 TransmitLimitedQueue<
142 SerfBroadcast,
143 NumMembers<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
144 >,
145 >,
146 event_broadcasts: Arc<
147 TransmitLimitedQueue<
148 SerfBroadcast,
149 NumMembers<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
150 >,
151 >,
152 query_broadcasts: Arc<
153 TransmitLimitedQueue<
154 SerfBroadcast,
155 NumMembers<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
156 >,
157 >,
158
159 pub(crate) memberlist: Memberlist<T, SerfDelegate<T, D>>,
160 pub(crate) members:
161 Arc<RwLock<Members<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>>,
162 event_tx: async_channel::Sender<CrateEvent<T, D>>,
163 pub(crate) event_join_ignore: AtomicBool,
164
165 pub(crate) event_core: RwLock<EventCore>,
166 query_core: Arc<RwLock<QueryCore<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>>,
167 handles: AtomicRefCell<
168 FuturesUnordered<<<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>>,
169 >,
170 pub(crate) opts: Options,
171
172 state: parking_lot::Mutex<SerfState>,
173
174 join_lock: Mutex<()>,
175
176 snapshot: Option<SnapshotHandle>,
177 #[cfg(feature = "encryption")]
178 key_manager: crate::key_manager::KeyManager<T, D>,
179 shutdown_tx: async_channel::Sender<()>,
180 shutdown_rx: async_channel::Receiver<()>,
181
182 pub(crate) coord_core: Option<Arc<CoordCore<T::Id>>>,
183}
184
185#[repr(transparent)]
191pub struct Serf<T: Transport, D = DefaultDelegate<T>>
192where
193 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
194 T: Transport,
195{
196 pub(crate) inner: Arc<SerfCore<T, D>>,
197}
198
199impl<T: Transport, D: Delegate> Clone for Serf<T, D>
200where
201 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
202 T: Transport,
203{
204 fn clone(&self) -> Self {
205 Self {
206 inner: self.inner.clone(),
207 }
208 }
209}