1use std::{
2 sync::{Arc, atomic::Ordering},
3 time::Duration,
4};
5
6use agnostic_lite::{RuntimeLite, time::Instant};
7use bytes::Bytes;
8use futures::{FutureExt, StreamExt};
9use smallvec_wrapper::OneOrMore;
10
11use super::{
12 Options,
13 base::Memberlist,
14 delegate::{Delegate, VoidDelegate},
15 error::Error,
16 network::META_MAX_SIZE,
17 proto::{Alive, Dead, MaybeResolvedAddress, Message, Meta, NodeState, Ping, SmallVec},
18 state::AckMessage,
19 transport::{AddressResolver, CheapClone, Node, Transport},
20};
21
22impl<T, D> Memberlist<T, D>
23where
24 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
25 T: Transport,
26{
27 #[inline]
29 pub fn local_id(&self) -> &T::Id {
30 &self.inner.id
31 }
32
33 #[inline]
35 pub fn local_address(&self) -> &<T::Resolver as AddressResolver>::Address {
36 self.inner.transport.local_address()
37 }
38
39 #[inline]
41 pub fn advertise_node(&self) -> Node<T::Id, T::ResolvedAddress> {
42 Node::new(self.inner.id.clone(), self.inner.advertise.clone())
43 }
44
45 #[inline]
47 pub fn advertise_address(&self) -> &T::ResolvedAddress {
48 &self.inner.advertise
49 }
50
51 #[cfg(feature = "encryption")]
53 #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
54 #[inline]
55 pub fn keyring(&self) -> Option<&super::keyring::Keyring> {
56 self.inner.keyring.as_ref()
57 }
58
59 #[cfg(feature = "encryption")]
61 #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
62 #[inline]
63 pub fn encryption_enabled(&self) -> bool {
64 self.inner.keyring.is_some()
65 && self.inner.opts.encryption_algo.is_some()
66 && self.inner.opts.gossip_verify_outgoing
67 }
68
69 #[inline]
71 pub fn delegate(&self) -> Option<&D> {
72 self.delegate.as_deref()
73 }
74
75 #[inline]
77 pub async fn local_state(&self) -> Option<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
78 let nodes = self.inner.nodes.read().await;
79 nodes
80 .node_map
81 .get(&self.inner.id)
82 .map(|&idx| nodes.nodes[idx].state.server.clone())
83 }
84
85 pub async fn by_id(&self, id: &T::Id) -> Option<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
87 let members = self.inner.nodes.read().await;
88
89 members
90 .node_map
91 .get(id)
92 .map(|&idx| members.nodes[idx].state.server.clone())
93 }
94
95 #[inline]
97 pub async fn members(&self) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
98 self
99 .inner
100 .nodes
101 .read()
102 .await
103 .nodes
104 .iter()
105 .map(|n| n.state.server.clone())
106 .collect()
107 }
108
109 #[inline]
111 pub async fn num_members(&self) -> usize {
112 self.inner.nodes.read().await.nodes.len()
113 }
114
115 pub async fn online_members(&self) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
117 self
118 .inner
119 .nodes
120 .read()
121 .await
122 .nodes
123 .iter()
124 .filter(|n| !n.dead_or_left())
125 .map(|n| n.state.server.clone())
126 .collect()
127 }
128
129 pub async fn num_online_members(&self) -> usize {
131 self
132 .inner
133 .nodes
134 .read()
135 .await
136 .nodes
137 .iter()
138 .filter(|n| !n.dead_or_left())
139 .count()
140 }
141
142 pub async fn members_by(
144 &self,
145 mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> bool,
146 ) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
147 self
148 .inner
149 .nodes
150 .read()
151 .await
152 .nodes
153 .iter()
154 .filter(|n| f(&n.state))
155 .map(|n| n.state.server.clone())
156 .collect()
157 }
158
159 pub async fn num_members_by(
161 &self,
162 mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> bool,
163 ) -> usize {
164 self
165 .inner
166 .nodes
167 .read()
168 .await
169 .nodes
170 .iter()
171 .filter(|n| f(&n.state))
172 .count()
173 }
174
175 pub async fn members_map_by<O>(
177 &self,
178 mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> Option<O>,
179 ) -> SmallVec<O> {
180 self
181 .inner
182 .nodes
183 .read()
184 .await
185 .nodes
186 .iter()
187 .filter_map(|n| f(&n.state))
188 .collect()
189 }
190}
191
192impl<T> Memberlist<T>
193where
194 T: Transport,
195{
196 #[inline]
198 pub async fn new(
199 transport_options: T::Options,
200 opts: Options,
201 ) -> Result<Self, Error<T, VoidDelegate<T::Id, T::ResolvedAddress>>> {
202 Self::create(None, transport_options, opts).await
203 }
204}
205
206impl<T, D> Memberlist<T, D>
207where
208 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
209 T: Transport,
210{
211 #[inline]
213 pub async fn with_delegate(
214 delegate: D,
215 transport_options: T::Options,
216 opts: Options,
217 ) -> Result<Self, Error<T, D>> {
218 Self::create(Some(delegate), transport_options, opts).await
219 }
220
221 pub(crate) async fn create(
222 delegate: Option<D>,
223 transport_options: T::Options,
224 opts: Options,
225 ) -> Result<Self, Error<T, D>> {
226 let transport = T::new(transport_options).await.map_err(Error::Transport)?;
227 let (shutdown_rx, advertise, this) = Self::new_in(transport, delegate, opts).await?;
228 let meta = if let Some(d) = &this.delegate {
229 d.node_meta(META_MAX_SIZE).await
230 } else {
231 Meta::empty()
232 };
233
234 if meta.len() > META_MAX_SIZE {
235 panic!("NodeState meta data provided is longer than the limit");
236 }
237
238 let alive = Alive::new(
239 this.next_incarnation(),
240 Node::new(this.inner.id.clone(), this.inner.advertise.clone()),
241 )
242 .with_meta(meta)
243 .with_protocol_version(this.inner.opts.protocol_version)
244 .with_delegate_version(this.inner.opts.delegate_version);
245 this.alive_node(alive, None, true).await;
246 this.schedule(shutdown_rx).await;
247 tracing::debug!(local = %this.inner.id, advertise_addr = %advertise, "memberlist: node is living");
248 Ok(this)
249 }
250
251 pub async fn leave(&self, timeout: Duration) -> Result<bool, Error<T, D>> {
264 if self.has_shutdown() {
265 return Ok(false);
266 }
267
268 if self.has_left() {
269 return Ok(false);
270 }
271
272 if !self.has_left() {
273 self.inner.hot.leave.store(true, Ordering::Release);
274
275 let mut memberlist = self.inner.nodes.write().await;
276 if let Some(&idx) = memberlist.node_map.get(&self.inner.id) {
277 let state = &memberlist.nodes[idx];
283 let d = Dead::new(
284 state.state.incarnation.load(Ordering::Acquire),
285 state.id().cheap_clone(),
286 state.id().cheap_clone(),
287 );
288
289 self.dead_node(&mut memberlist, d).await?;
290 let any_alive = memberlist.any_alive();
291 drop(memberlist);
292
293 if any_alive {
295 if timeout > Duration::ZERO {
296 futures::select! {
297 _ = self.inner.leave_broadcast_rx.recv().fuse() => {},
298 _ = <T::Runtime as RuntimeLite>::sleep(timeout).fuse() => {
299 return Err(Error::LeaveTimeout);
300 }
301 }
302 } else if let Err(e) = self.inner.leave_broadcast_rx.recv().await {
303 tracing::error!("memberlist: failed to receive leave broadcast: {}", e);
304 }
305 }
306 } else {
307 tracing::warn!("memberlist: leave but we're not a member");
308 }
309 }
310
311 Ok(true)
312 }
313
314 pub async fn join(
317 &self,
318 addr: MaybeResolvedAddress<T::Address, T::ResolvedAddress>,
319 ) -> Result<T::ResolvedAddress, Error<T, D>> {
320 if self.has_left() || self.has_shutdown() {
321 return Err(Error::NotRunning);
322 }
323
324 let addr = match addr {
325 MaybeResolvedAddress::Resolved(addr) => addr,
326 MaybeResolvedAddress::Unresolved(addr) => self
327 .inner
328 .transport
329 .resolve(&addr)
330 .await
331 .map_err(Error::Transport)?,
332 };
333
334 self.push_pull_node(&addr, true).await.map(|_| addr)
335 }
336
337 pub async fn join_many(
346 &self,
347 existing: impl Iterator<Item = MaybeResolvedAddress<T::Address, T::ResolvedAddress>>,
348 ) -> Result<SmallVec<T::ResolvedAddress>, (SmallVec<T::ResolvedAddress>, Error<T, D>)> {
349 if self.has_left() || self.has_shutdown() {
350 return Err((Default::default(), Error::NotRunning));
351 }
352
353 let estimated_total = existing.size_hint().0;
354
355 let futs = existing
356 .into_iter()
357 .map(|addr| {
358 async move {
359 let resolved_addr = match addr {
360 MaybeResolvedAddress::Resolved(addr) => addr,
361 MaybeResolvedAddress::Unresolved(addr) => {
362 match self.inner.transport.resolve(&addr).await {
363 Ok(addr) => addr,
364 Err(e) => {
365 tracing::debug!(
366 err = %e,
367 "memberlist: failed to resolve address {}",
368 addr,
369 );
370 return Err((MaybeResolvedAddress::<T::Address, T::ResolvedAddress>::unresolved(addr), Error::<T, D>::transport(e)))
371 }
372 }
373 }
374 };
375 tracing::info!(local = %self.inner.transport.local_id(), peer = %resolved_addr, "memberlist: start join...");
376 if let Err(e) = self.push_pull_node(&resolved_addr, true).await {
377 tracing::debug!(
378 local = %self.inner.id,
379 err = %e,
380 "memberlist: failed to join {}",
381 resolved_addr,
382 );
383 Err((MaybeResolvedAddress::Resolved(resolved_addr), e))
384 } else {
385 Ok(resolved_addr)
386 }
387 }
388 }).collect::<futures::stream::FuturesUnordered<_>>();
389
390 let successes = std::cell::RefCell::new(SmallVec::with_capacity(estimated_total));
391 let errors = futs
392 .filter_map(|rst| async {
393 match rst {
394 Ok(addr) => {
395 successes.borrow_mut().push(addr);
396 None
397 }
398 Err((_, e)) => Some(e),
399 }
400 })
401 .collect::<OneOrMore<_>>()
402 .await;
403
404 match Error::try_from_one_or_more(errors) {
405 Ok(()) => Ok(successes.into_inner()),
406 Err(e) => Err((successes.into_inner(), e)),
407 }
408 }
409
410 #[inline]
414 pub fn health_score(&self) -> usize {
415 self.inner.awareness.get_health_score() as usize
416 }
417
418 pub async fn update_node(&self, timeout: Duration) -> Result<(), Error<T, D>> {
424 if self.has_left() || self.has_shutdown() {
425 return Err(Error::NotRunning);
426 }
427
428 let meta = if let Some(delegate) = &self.delegate {
430 let meta = delegate.node_meta(META_MAX_SIZE).await;
431 if meta.len() > META_MAX_SIZE {
432 panic!("node meta data provided is longer than the limit");
433 }
434 meta
435 } else {
436 Meta::empty()
437 };
438
439 let node = {
442 let members = self.inner.nodes.read().await;
443
444 let idx = *members.node_map.get(&self.inner.id).unwrap();
445
446 let state = &members.nodes[idx].state;
447 Node::new(state.id().cheap_clone(), state.address().cheap_clone())
448 };
449
450 let alive = Alive::new(self.next_incarnation(), node)
452 .with_meta(meta)
453 .with_protocol_version(self.inner.opts.protocol_version)
454 .with_delegate_version(self.inner.opts.delegate_version);
455 let (notify_tx, notify_rx) = async_channel::bounded(1);
456 self.alive_node(alive, Some(notify_tx), true).await;
457
458 if self.any_alive().await {
460 if timeout > Duration::ZERO {
461 let _ = <T::Runtime as RuntimeLite>::timeout(timeout, notify_rx.recv())
462 .await
463 .map_err(|_| Error::UpdateTimeout)?;
464 } else {
465 let _ = notify_rx.recv().await;
466 }
467 }
468
469 Ok(())
470 }
471
472 #[inline]
479 pub async fn send(&self, to: &T::ResolvedAddress, msg: Bytes) -> Result<(), Error<T, D>> {
480 self.send_many(to, std::iter::once(msg)).await
481 }
482
483 #[inline]
488 pub async fn send_many(
489 &self,
490 to: &T::ResolvedAddress,
491 msgs: impl Iterator<Item = Bytes>,
492 ) -> Result<(), Error<T, D>> {
493 if self.has_left() || self.has_shutdown() {
494 return Err(Error::NotRunning);
495 }
496
497 let stream = self
498 .transport_send_packets(to, msgs.map(Message::UserData).collect::<OneOrMore<_>>())
499 .await;
500 futures::pin_mut!(stream);
501 match stream.next().await {
502 None => Ok(()),
503 Some(Ok(_)) => Ok(()),
504 Some(Err(e)) => Err(e),
505 }
506 }
507
508 #[inline]
515 pub async fn send_reliable(
516 &self,
517 to: &T::ResolvedAddress,
518 msg: Bytes,
519 ) -> Result<(), Error<T, D>> {
520 self.send_many_reliable(to, std::iter::once(msg)).await
521 }
522
523 #[inline]
528 pub async fn send_many_reliable(
529 &self,
530 to: &T::ResolvedAddress,
531 msgs: impl Iterator<Item = Bytes>,
532 ) -> Result<(), Error<T, D>> {
533 if self.has_left() || self.has_shutdown() {
534 return Err(Error::NotRunning);
535 }
536 self
537 .send_user_msg(to, msgs.map(Message::UserData).collect())
538 .await
539 }
540
541 pub async fn ping(&self, node: Node<T::Id, T::ResolvedAddress>) -> Result<Duration, Error<T, D>> {
543 let self_addr = self.get_advertise();
545 let ping = Ping::new(
546 self.next_sequence_number(),
547 Node::new(self.inner.transport.local_id().clone(), self_addr.clone()),
548 node.clone(),
549 );
550
551 let (ack_tx, ack_rx) = async_channel::bounded(self.inner.opts.indirect_checks + 1);
552 self.inner.ack_manager.set_probe_channels(
553 ping.sequence_number(),
554 ack_tx,
555 None,
556 <T::Runtime as RuntimeLite>::now(),
557 self.inner.opts.probe_interval,
558 );
559
560 match <T::Runtime as RuntimeLite>::timeout(self.inner.opts.probe_timeout, async {
563 let stream = self.send_packets(node.address(), ping.into()).await;
564 futures::pin_mut!(stream);
565 let errs = stream.collect::<OneOrMore<_>>().await;
566 let num_errs = errs.len();
567
568 match num_errs {
569 0 => Ok(()),
570 _ => match errs.into_either() {
571 either::Either::Left([e]) => Err(e),
572 either::Either::Right(e) => Err(Error::Multiple(e.into_vec().into())),
573 },
574 }
575 })
576 .await
577 {
578 Ok(Ok(())) => {}
579 Ok(Err(e)) => return Err(e),
580 Err(_) => {
581 tracing::debug!(
583 "memberlist: failed ping {} by packet (timeout reached)",
584 node
585 );
586 return Err(Error::Lost(node));
587 }
588 }
589
590 let sent = <T::Runtime as RuntimeLite>::now();
594
595 futures::select! {
597 v = ack_rx.recv().fuse() => {
598 if let Ok(AckMessage { complete, .. }) = v {
600 if complete {
601 return Ok(sent.elapsed());
602 }
603 }
604 }
605 _ = <T::Runtime as RuntimeLite>::sleep(self.inner.opts.probe_timeout).fuse() => {}
606 }
607
608 tracing::debug!(
610 "memberlist: failed ping {} by packet (timeout reached)",
611 node
612 );
613 Err(Error::Lost(node))
614 }
615
616 pub async fn shutdown(&self) -> Result<(), Error<T, D>> {
624 self.inner.shutdown().await.map_err(Error::Transport)
625 }
626}