1use std::{
2 collections::HashMap,
3 sync::{Arc, Weak, atomic::AtomicBool},
4};
5
6use async_lock::{Mutex, RwLock};
7use atomic_refcell::AtomicRefCell;
8use futures::stream::FuturesUnordered;
9use memberlist_core::{
10 Memberlist,
11 agnostic_lite::{AsyncSpawner, RuntimeLite},
12 proto::MediumVec,
13 queue::TransmitLimitedQueue,
14 transport::{AddressResolver, Transport},
15};
16
17use super::{
18 Options,
19 broadcast::SerfBroadcast,
20 delegate::{CompositeDelegate, Delegate},
21 event::CrateEvent,
22 snapshot::SnapshotHandle,
23 types::{
24 LamportClock, LamportTime, Members, UserEvents,
25 coordinate::{Coordinate, CoordinateClient},
26 },
27};
28
29mod api;
30pub(crate) mod base;
31
32mod delegate;
33pub(crate) use delegate::*;
34
35mod query;
36pub use query::*;
37
38mod internal_query;
39
40pub(crate) const SNAPSHOT_SIZE_LIMIT: u64 = 128 * 1024;
42
43const USER_EVENT_SIZE_LIMIT: usize = 9 * 1024;
45
46pub type DefaultDelegate<T> = CompositeDelegate<
48 <T as Transport>::Id,
49 <<T as Transport>::Resolver as AddressResolver>::ResolvedAddress,
50>;
51
52pub(crate) struct CoordCore<I> {
53 pub(crate) client: CoordinateClient<I>,
54 pub(crate) cache: parking_lot::RwLock<HashMap<I, Coordinate>>,
55}
56
57#[derive(Debug, Clone, Eq, PartialEq)]
59#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
60pub(crate) struct Queries {
61 ltime: LamportTime,
62 query_ids: MediumVec<u32>,
63}
64
65#[derive(Default)]
66pub(crate) struct QueryCore<I, A> {
67 responses: HashMap<LamportTime, QueryResponse<I, A>>,
68 min_time: LamportTime,
69 buffer: Vec<Option<Queries>>,
70}
71
72#[viewit::viewit]
73pub(crate) struct EventCore {
74 min_time: LamportTime,
75 buffer: Vec<Option<UserEvents>>,
76}
77
78#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
80pub enum SerfState {
81 Alive,
83 Leaving,
85 Left,
87 Shutdown,
89}
90
91impl SerfState {
92 pub const fn as_str(&self) -> &'static str {
94 match self {
95 Self::Alive => "alive",
96 Self::Leaving => "leaving",
97 Self::Left => "left",
98 Self::Shutdown => "shutdown",
99 }
100 }
101}
102
103impl core::fmt::Display for SerfState {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 write!(f, "{}", self.as_str())
106 }
107}
108
109struct NumMembers<I, A>(Arc<RwLock<Members<I, A>>>);
110
111impl<I, A> Clone for NumMembers<I, A> {
112 fn clone(&self) -> Self {
113 Self(self.0.clone())
114 }
115}
116
117impl<I, A> From<Arc<RwLock<Members<I, A>>>> for NumMembers<I, A> {
118 fn from(value: Arc<RwLock<Members<I, A>>>) -> Self {
119 Self(value)
120 }
121}
122
123impl<I, A> memberlist_core::queue::NodeCalculator for NumMembers<I, A>
124where
125 I: Send + Sync + 'static,
126 A: Send + Sync + 'static,
127{
128 async fn num_nodes(&self) -> usize {
129 self.0.read().await.states.len()
130 }
131}
132
133pub(crate) struct SerfCore<T, D = DefaultDelegate<T>>
134where
135 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
136 T: Transport,
137{
138 pub(crate) clock: LamportClock,
139 pub(crate) event_clock: LamportClock,
140 pub(crate) query_clock: LamportClock,
141
142 broadcasts: Arc<TransmitLimitedQueue<SerfBroadcast, NumMembers<T::Id, T::ResolvedAddress>>>,
143 event_broadcasts: Arc<TransmitLimitedQueue<SerfBroadcast, NumMembers<T::Id, T::ResolvedAddress>>>,
144 query_broadcasts: Arc<TransmitLimitedQueue<SerfBroadcast, NumMembers<T::Id, T::ResolvedAddress>>>,
145
146 pub(crate) memberlist: Memberlist<T, SerfDelegate<T, D>>,
147 pub(crate) members: Arc<RwLock<Members<T::Id, T::ResolvedAddress>>>,
148 event_tx: async_channel::Sender<CrateEvent<T, D>>,
149 pub(crate) event_join_ignore: AtomicBool,
150
151 pub(crate) event_core: RwLock<EventCore>,
152 query_core: Arc<RwLock<QueryCore<T::Id, T::ResolvedAddress>>>,
153 handles: AtomicRefCell<
154 FuturesUnordered<<<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>>,
155 >,
156 pub(crate) opts: Options,
157
158 state: parking_lot::Mutex<SerfState>,
159
160 join_lock: Mutex<()>,
161
162 snapshot: Option<SnapshotHandle>,
163 #[cfg(feature = "encryption")]
164 key_manager: crate::key_manager::KeyManager<T, D>,
165 shutdown_tx: async_channel::Sender<()>,
166 shutdown_rx: async_channel::Receiver<()>,
167
168 pub(crate) coord_core: Option<Arc<CoordCore<T::Id>>>,
169}
170
171#[repr(transparent)]
177pub struct Serf<T: Transport, D = DefaultDelegate<T>>
178where
179 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
180 T: Transport,
181{
182 pub(crate) inner: Arc<SerfCore<T, D>>,
183}
184
185impl<T: Transport, D: Delegate> Clone for Serf<T, D>
186where
187 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
188 T: Transport,
189{
190 fn clone(&self) -> Self {
191 Self {
192 inner: self.inner.clone(),
193 }
194 }
195}
196
197impl<T: Transport, D: Delegate> Serf<T, D>
198where
199 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
200 T: Transport,
201{
202 fn downgrade(&self) -> SerfWeakRef<T, D> {
216 SerfWeakRef {
217 inner: Arc::downgrade(&self.inner),
218 }
219 }
220}
221
222#[repr(transparent)]
232pub(crate) struct SerfWeakRef<T: Transport, D = DefaultDelegate<T>>
233where
234 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
235 T: Transport,
236{
237 inner: Weak<SerfCore<T, D>>,
238}
239
240impl<T: Transport, D: Delegate> SerfWeakRef<T, D>
241where
242 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
243 T: Transport,
244{
245 pub(crate) fn upgrade(&self) -> Option<Serf<T, D>> {
253 self.inner.upgrade().map(|inner| Serf { inner })
254 }
255}