1use std::{
2 collections::{HashMap, VecDeque},
3 sync::{
4 Arc,
5 atomic::{AtomicBool, AtomicU32, AtomicUsize},
6 },
7};
8
9use agnostic_lite::{AsyncSpawner, RuntimeLite};
10use async_channel::{Receiver, Sender};
11use async_lock::{Mutex, RwLock};
12
13use atomic_refcell::AtomicRefCell;
14use futures::stream::FuturesUnordered;
15use nodecraft::{CheapClone, Node, resolver::AddressResolver};
16
17use super::{
18 Options,
19 awareness::Awareness,
20 broadcast::MemberlistBroadcast,
21 delegate::{Delegate, VoidDelegate},
22 error::Error,
23 proto::{Message, PushNodeState, TinyVec},
24 queue::TransmitLimitedQueue,
25 state::{AckManager, LocalNodeState},
26 suspicion::Suspicion,
27 transport::Transport,
28};
29
30#[cfg(feature = "encryption")]
31use super::keyring::Keyring;
32
33#[cfg(any(test, feature = "test"))]
34pub(crate) mod tests;
35
36#[viewit::viewit]
37pub(crate) struct HotData {
38 sequence_num: AtomicU32,
39 incarnation: AtomicU32,
40 push_pull_req: AtomicU32,
41 leave: AtomicBool,
42 num_nodes: Arc<AtomicU32>,
43}
44
45impl HotData {
46 fn new() -> Self {
47 Self {
48 sequence_num: AtomicU32::new(0),
49 incarnation: AtomicU32::new(0),
50 num_nodes: Arc::new(AtomicU32::new(0)),
51 push_pull_req: AtomicU32::new(0),
52 leave: AtomicBool::new(false),
53 }
54 }
55}
56
57#[viewit::viewit]
58pub(crate) struct MessageHandoff<I, A> {
59 msg: Message<I, A>,
60 from: A,
61}
62
63#[viewit::viewit]
64pub(crate) struct MessageQueue<I, A> {
65 high: VecDeque<MessageHandoff<I, A>>,
67 low: VecDeque<MessageHandoff<I, A>>,
69}
70
71impl<I, A> MessageQueue<I, A> {
72 const fn new() -> Self {
73 Self {
74 high: VecDeque::new(),
75 low: VecDeque::new(),
76 }
77 }
78}
79
80pub(crate) struct Member<T, D>
82where
83 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
84 T: Transport,
85{
86 pub(crate) state: LocalNodeState<T::Id, T::ResolvedAddress>,
87 pub(crate) suspicion: Option<Suspicion<T, D>>,
88}
89
90impl<T, D> core::fmt::Debug for Member<T, D>
91where
92 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
93 T: Transport,
94{
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.debug_struct("Member")
97 .field("state", &self.state)
98 .finish()
99 }
100}
101
102impl<T, D> core::ops::Deref for Member<T, D>
103where
104 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
105 T: Transport,
106{
107 type Target = LocalNodeState<T::Id, T::ResolvedAddress>;
108
109 fn deref(&self) -> &Self::Target {
110 &self.state
111 }
112}
113
114pub(crate) struct Members<T, D>
115where
116 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
117 T: Transport,
118{
119 pub(crate) local: Node<T::Id, T::ResolvedAddress>,
120 pub(crate) nodes: TinyVec<Member<T, D>>,
121 pub(crate) node_map: HashMap<T::Id, usize>,
122}
123
124impl<T, D> core::ops::Index<usize> for Members<T, D>
125where
126 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
127 T: Transport,
128{
129 type Output = Member<T, D>;
130
131 fn index(&self, index: usize) -> &Self::Output {
132 &self.nodes[index]
133 }
134}
135
136impl<T, D> core::ops::IndexMut<usize> for Members<T, D>
137where
138 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
139 T: Transport,
140{
141 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
142 &mut self.nodes[index]
143 }
144}
145
146impl<T, D> rand::seq::IndexedRandom for Members<T, D>
147where
148 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
149 T: Transport,
150{
151 fn len(&self) -> usize {
152 self.nodes.len()
153 }
154}
155
156impl<T, D> rand::seq::SliceRandom for Members<T, D>
157where
158 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
159 T: Transport,
160{
161 fn shuffle<R>(&mut self, rng: &mut R)
162 where
163 R: rand::Rng + ?Sized,
164 {
165 #[inline]
169 fn gen_index<R: rand::Rng + ?Sized>(rng: &mut R, ubound: usize) -> usize {
170 if ubound <= (u32::MAX as usize) {
171 rng.random_range(0..ubound as u32) as usize
172 } else {
173 rng.random_range(0..ubound)
174 }
175 }
176
177 for i in (1..self.nodes.len()).rev() {
178 let ridx = gen_index(rng, i + 1);
180 let curr = self.node_map.get_mut(self.nodes[i].state.id()).unwrap();
181 *curr = ridx;
182 let target = self.node_map.get_mut(self.nodes[ridx].state.id()).unwrap();
183 *target = i;
184 self.nodes.swap(i, ridx);
185 }
186 }
187
188 fn partial_shuffle<R>(
189 &mut self,
190 _rng: &mut R,
191 _amount: usize,
192 ) -> (&mut [Self::Output], &mut [Self::Output])
193 where
194 Self::Output: Sized,
195 R: rand::Rng + ?Sized,
196 {
197 unreachable!()
198 }
199}
200
201impl<T, D> Members<T, D>
202where
203 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
204 T: Transport,
205{
206 fn new(local: Node<T::Id, T::ResolvedAddress>) -> Self {
207 Self {
208 nodes: TinyVec::new(),
209 node_map: HashMap::new(),
210 local,
211 }
212 }
213}
214
215impl<T, D> Members<T, D>
216where
217 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
218 T: Transport,
219{
220 pub(crate) fn any_alive(&self) -> bool {
221 for m in self.nodes.iter() {
222 if !m.dead_or_left() && m.id().ne(self.local.id()) {
223 return true;
224 }
225 }
226
227 false
228 }
229}
230
231pub(crate) struct MemberlistCore<T, D>
232where
233 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
234 T: Transport,
235{
236 pub(crate) id: T::Id,
237 pub(crate) hot: HotData,
238 pub(crate) awareness: Awareness,
239 pub(crate) broadcast:
240 TransmitLimitedQueue<MemberlistBroadcast<T::Id, T::ResolvedAddress>, Arc<AtomicU32>>,
241 pub(crate) leave_broadcast_tx: Sender<()>,
242 pub(crate) leave_broadcast_rx: Receiver<()>,
243 pub(crate) handles: AtomicRefCell<
244 FuturesUnordered<<<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>>,
245 >,
246 pub(crate) probe_index: AtomicUsize,
247 pub(crate) handoff_tx: Sender<()>,
248 pub(crate) handoff_rx: Receiver<()>,
249 pub(crate) queue: Mutex<MessageQueue<T::Id, T::ResolvedAddress>>,
250 pub(crate) nodes: Arc<RwLock<Members<T, D>>>,
251 pub(crate) ack_manager: AckManager<T::Runtime>,
252 pub(crate) transport: Arc<T>,
253 pub(crate) shutdown_tx: Sender<()>,
255 pub(crate) advertise: T::ResolvedAddress,
256 pub(crate) opts: Arc<Options>,
257 #[cfg(feature = "encryption")]
258 pub(crate) keyring: Option<Keyring>,
259}
260
261impl<T, D> MemberlistCore<T, D>
262where
263 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
264 T: Transport,
265{
266 pub(crate) async fn shutdown(&self) -> Result<(), T::Error> {
267 if !self.shutdown_tx.close() {
268 return Ok(());
269 }
270
271 if let Err(e) = self.transport.shutdown().await {
275 tracing::error!(err=%e, "memberlist: failed to shutdown transport");
276 return Err(e);
277 }
278
279 Ok(())
280 }
281}
282
283impl<T, D> Drop for MemberlistCore<T, D>
284where
285 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
286 T: Transport,
287{
288 fn drop(&mut self) {
289 self.shutdown_tx.close();
290 }
291}
292
293pub struct Memberlist<
305 T,
306 D = VoidDelegate<
307 <T as Transport>::Id,
308 <<T as Transport>::Resolver as AddressResolver>::ResolvedAddress,
309 >,
310> where
311 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
312 T: Transport,
313{
314 pub(crate) inner: Arc<MemberlistCore<T, D>>,
315 pub(crate) delegate: Option<Arc<D>>,
316}
317
318impl<T, D> Clone for Memberlist<T, D>
319where
320 T: Transport,
321 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
322{
323 fn clone(&self) -> Self {
324 Self {
325 inner: self.inner.clone(),
326 delegate: self.delegate.clone(),
327 }
328 }
329}
330
331impl<T, D> Memberlist<T, D>
332where
333 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
334 T: Transport,
335{
336 pub(crate) async fn new_in(
337 transport: T,
338 delegate: Option<D>,
339 opts: Options,
340 ) -> Result<(Receiver<()>, T::ResolvedAddress, Self), Error<T, D>> {
341 let (handoff_tx, handoff_rx) = async_channel::bounded(1);
342 let (leave_broadcast_tx, leave_broadcast_rx) = async_channel::bounded(1);
343
344 let advertise = transport.advertise_address();
348 let id = transport.local_id();
349 let node = Node::new(id.clone(), advertise.clone());
350 let awareness = Awareness::new(
351 opts.awareness_max_multiplier as isize,
352 #[cfg(feature = "metrics")]
353 Arc::new(vec![]),
354 );
355 let hot = HotData::new();
356 let num_nodes = hot.num_nodes.clone();
357 let broadcast = TransmitLimitedQueue::new(opts.retransmit_mult, num_nodes);
358
359 let (shutdown_tx, shutdown_rx) = async_channel::bounded(1);
360
361 #[cfg(feature = "encryption")]
362 let keyring = match (opts.primary_key, opts.secret_keys.is_empty()) {
363 (None, false) => {
364 tracing::warn!("memberlist: using first key in keyring as primary key");
365 let mut iter = opts.secret_keys.iter().copied();
366 let pk = iter.next().unwrap();
367 let keyring = Keyring::with_keys(pk, iter);
368 Some(keyring)
369 }
370 (Some(pk), true) => Some(Keyring::new(pk)),
371 (Some(pk), false) => Some(Keyring::with_keys(pk, opts.secret_keys.iter().copied())),
372 (None, true) => None,
373 };
374
375 let this = Memberlist {
376 inner: Arc::new(MemberlistCore {
377 id: id.cheap_clone(),
378 hot,
379 awareness,
380 broadcast,
381 leave_broadcast_tx,
382 leave_broadcast_rx,
383 probe_index: AtomicUsize::new(0),
384 handles: AtomicRefCell::new(FuturesUnordered::new()),
385 handoff_tx,
386 handoff_rx,
387 queue: Mutex::new(MessageQueue::new()),
388 nodes: Arc::new(RwLock::new(Members::new(node))),
389 ack_manager: AckManager::new(),
390 shutdown_tx,
391 advertise: advertise.cheap_clone(),
392 transport: Arc::new(transport),
393 opts: Arc::new(opts),
394 #[cfg(feature = "encryption")]
395 keyring,
396 }),
397 delegate: delegate.map(Arc::new),
398 };
399
400 {
401 let handles = this.inner.handles.borrow();
402 handles.push(this.stream_listener(shutdown_rx.clone()));
403 handles.push(this.packet_handler(shutdown_rx.clone()));
404 handles.push(this.packet_listener(shutdown_rx.clone()));
405 #[cfg(feature = "metrics")]
406 handles.push(this.check_broadcast_queue_depth(shutdown_rx.clone()));
407 }
408
409 Ok((shutdown_rx, this.inner.advertise.cheap_clone(), this))
410 }
411}
412
413impl<T, D> Memberlist<T, D>
415where
416 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
417 T: Transport,
418{
419 #[inline]
420 pub(crate) fn get_advertise(&self) -> &T::ResolvedAddress {
421 &self.inner.advertise
422 }
423
424 #[inline]
426 pub(crate) async fn any_alive(&self) -> bool {
427 self
428 .inner
429 .nodes
430 .read()
431 .await
432 .nodes
433 .iter()
434 .any(|n| !n.state.dead_or_left() && n.state.server.id().ne(&self.inner.id))
435 }
436
437 #[cfg(feature = "metrics")]
438 fn check_broadcast_queue_depth(
439 &self,
440 shutdown_rx: Receiver<()>,
441 ) -> <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()> {
442 use futures::{FutureExt, StreamExt};
443
444 let queue_check_interval = self.inner.opts.queue_check_interval;
445 let this = self.clone();
446
447 <T::Runtime as RuntimeLite>::spawn(async move {
448 let tick = <T::Runtime as RuntimeLite>::interval(queue_check_interval);
449 futures::pin_mut!(tick);
450 loop {
451 futures::select! {
452 _ = shutdown_rx.recv().fuse() => {
453 tracing::debug!("memberlist: broadcast queue checker exits");
454 return;
455 },
456 _ = tick.next().fuse() => {
457 let numq = this.inner.broadcast.num_queued().await;
458 metrics::histogram!("memberlist.queue.broadcasts").record(numq as f64);
459 }
460 }
461 }
462 })
463 }
464
465 pub(crate) async fn verify_protocol(
466 &self,
467 _remote: &[PushNodeState<T::Id, T::ResolvedAddress>],
468 ) -> Result<(), Error<T, D>> {
469 Ok(())
472 }
473}