1use std::{
2 sync::{Arc, atomic::Ordering},
3 time::Duration,
4};
5
6use agnostic_lite::{RuntimeLite, time::Instant};
7use bytes::Bytes;
8use futures::{FutureExt, StreamExt};
9use smallvec_wrapper::OneOrMore;
10
11use super::{
12 Options,
13 base::Memberlist,
14 delegate::{Delegate, VoidDelegate},
15 error::Error,
16 network::META_MAX_SIZE,
17 proto::{Alive, Dead, MaybeResolvedAddress, Message, Meta, NodeState, Ping, SmallVec},
18 state::AckMessage,
19 transport::{AddressResolver, CheapClone, Node, Transport},
20};
21
22impl<T, D> Memberlist<T, D>
23where
24 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
25 T: Transport,
26{
27 #[inline]
29 pub fn local_id(&self) -> &T::Id {
30 &self.inner.id
31 }
32
33 #[inline]
35 pub fn local_address(&self) -> &<T::Resolver as AddressResolver>::Address {
36 self.inner.transport.local_address()
37 }
38
39 #[inline]
41 pub fn advertise_node(&self) -> Node<T::Id, T::ResolvedAddress> {
42 Node::new(self.inner.id.clone(), self.inner.advertise.clone())
43 }
44
45 #[inline]
47 pub fn advertise_address(&self) -> &T::ResolvedAddress {
48 &self.inner.advertise
49 }
50
51 #[cfg(feature = "encryption")]
53 #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
54 #[inline]
55 pub fn keyring(&self) -> Option<&super::keyring::Keyring> {
56 self.inner.keyring.as_ref()
57 }
58
59 #[cfg(feature = "encryption")]
61 #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
62 #[inline]
63 pub fn encryption_enabled(&self) -> bool {
64 self.inner.keyring.is_some()
65 && self.inner.opts.encryption_algo.is_some()
66 && self.inner.opts.gossip_verify_outgoing
67 }
68
69 #[inline]
71 pub fn delegate(&self) -> Option<&D> {
72 self.delegate.as_deref()
73 }
74
75 #[inline]
77 pub async fn local_state(&self) -> Option<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
78 let nodes = self.inner.nodes.read().await;
79 nodes
80 .node_map
81 .get(&self.inner.id)
82 .map(|&idx| nodes.nodes[idx].state.server.clone())
83 }
84
85 pub async fn by_id(&self, id: &T::Id) -> Option<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
87 let members = self.inner.nodes.read().await;
88
89 members
90 .node_map
91 .get(id)
92 .map(|&idx| members.nodes[idx].state.server.clone())
93 }
94
95 #[inline]
97 pub async fn members(&self) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
98 self
99 .inner
100 .nodes
101 .read()
102 .await
103 .nodes
104 .iter()
105 .map(|n| n.state.server.clone())
106 .collect()
107 }
108
109 #[inline]
111 pub async fn num_members(&self) -> usize {
112 self.inner.nodes.read().await.nodes.len()
113 }
114
115 pub async fn online_members(&self) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
117 self
118 .inner
119 .nodes
120 .read()
121 .await
122 .nodes
123 .iter()
124 .filter(|n| !n.dead_or_left())
125 .map(|n| n.state.server.clone())
126 .collect()
127 }
128
129 pub async fn num_online_members(&self) -> usize {
131 self
132 .inner
133 .nodes
134 .read()
135 .await
136 .nodes
137 .iter()
138 .filter(|n| !n.dead_or_left())
139 .count()
140 }
141
142 pub async fn members_by(
144 &self,
145 mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> bool,
146 ) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
147 self
148 .inner
149 .nodes
150 .read()
151 .await
152 .nodes
153 .iter()
154 .filter(|n| f(&n.state))
155 .map(|n| n.state.server.clone())
156 .collect()
157 }
158
159 pub async fn num_members_by(
161 &self,
162 mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> bool,
163 ) -> usize {
164 self
165 .inner
166 .nodes
167 .read()
168 .await
169 .nodes
170 .iter()
171 .filter(|n| f(&n.state))
172 .count()
173 }
174
175 pub async fn members_map_by<O>(
177 &self,
178 mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> Option<O>,
179 ) -> SmallVec<O> {
180 self
181 .inner
182 .nodes
183 .read()
184 .await
185 .nodes
186 .iter()
187 .filter_map(|n| f(&n.state))
188 .collect()
189 }
190}
191
192impl<T> Memberlist<T>
193where
194 T: Transport,
195{
196 #[inline]
198 pub async fn new(
199 transport_options: T::Options,
200 opts: Options,
201 ) -> Result<Self, Error<T, VoidDelegate<T::Id, T::ResolvedAddress>>> {
202 Self::create(None, transport_options, opts).await
203 }
204}
205
206impl<T, D> Memberlist<T, D>
207where
208 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
209 T: Transport,
210{
211 #[inline]
213 pub async fn with_delegate(
214 delegate: D,
215 transport_options: T::Options,
216 opts: Options,
217 ) -> Result<Self, Error<T, D>> {
218 Self::create(Some(delegate), transport_options, opts).await
219 }
220
221 pub(crate) async fn create(
222 delegate: Option<D>,
223 transport_options: T::Options,
224 opts: Options,
225 ) -> Result<Self, Error<T, D>> {
226 let transport = T::new(transport_options).await.map_err(Error::Transport)?;
227 let (shutdown_rx, advertise, this) = Self::new_in(transport, delegate, opts).await?;
228 let meta = if let Some(d) = &this.delegate {
229 d.node_meta(META_MAX_SIZE).await
230 } else {
231 Meta::empty()
232 };
233
234 if meta.len() > META_MAX_SIZE {
235 panic!("NodeState meta data provided is longer than the limit");
236 }
237
238 let alive = Alive::new(
239 this.next_incarnation(),
240 Node::new(this.inner.id.clone(), this.inner.advertise.clone()),
241 )
242 .with_meta(meta)
243 .with_protocol_version(this.inner.opts.protocol_version)
244 .with_delegate_version(this.inner.opts.delegate_version);
245 this.alive_node(alive, None, true).await;
246 this.schedule(shutdown_rx).await;
247 tracing::debug!(local = %this.inner.id, advertise_addr = %advertise, "memberlist: node is living");
248 Ok(this)
249 }
250
251 pub async fn leave(&self, timeout: Duration) -> Result<bool, Error<T, D>> {
264 if self.has_shutdown() {
265 return Ok(false);
266 }
267
268 if self.has_left() {
269 return Ok(false);
270 }
271
272 if !self.has_left() {
273 self.inner.hot.leave.store(true, Ordering::Release);
274
275 let mut memberlist = self.inner.nodes.write().await;
276 if let Some(&idx) = memberlist.node_map.get(&self.inner.id) {
277 let state = &memberlist.nodes[idx];
283 let d = Dead::new(
284 state.state.incarnation.load(Ordering::Acquire),
285 state.id().cheap_clone(),
286 state.id().cheap_clone(),
287 );
288
289 self.dead_node(&mut memberlist, d).await?;
290 let any_alive = memberlist.any_alive();
291 drop(memberlist);
292
293 if any_alive {
295 if timeout > Duration::ZERO {
296 futures::select! {
297 _ = self.inner.leave_broadcast_rx.recv().fuse() => {},
298 _ = <T::Runtime as RuntimeLite>::sleep(timeout).fuse() => {
299 return Err(Error::LeaveTimeout);
300 }
301 }
302 } else if let Err(e) = self.inner.leave_broadcast_rx.recv().await {
303 tracing::error!("memberlist: failed to receive leave broadcast: {}", e);
304 }
305 }
306 } else {
307 tracing::warn!("memberlist: leave but we're not a member");
308 }
309 }
310
311 Ok(true)
312 }
313
314 pub async fn join(
317 &self,
318 node: Node<T::Id, MaybeResolvedAddress<T::Address, T::ResolvedAddress>>,
319 ) -> Result<Node<T::Id, T::ResolvedAddress>, Error<T, D>> {
320 if self.has_left() || self.has_shutdown() {
321 return Err(Error::NotRunning);
322 }
323
324 let (id, addr) = node.into_components();
325 let addr = match addr {
326 MaybeResolvedAddress::Resolved(addr) => addr,
327 MaybeResolvedAddress::Unresolved(addr) => self
328 .inner
329 .transport
330 .resolve(&addr)
331 .await
332 .map_err(Error::Transport)?,
333 };
334 let n = Node::new(id, addr);
335 self.push_pull_node(n.cheap_clone(), true).await.map(|_| n)
336 }
337
338 pub async fn join_many(
347 &self,
348 existing: impl Iterator<Item = Node<T::Id, MaybeResolvedAddress<T::Address, T::ResolvedAddress>>>,
349 ) -> Result<
350 SmallVec<Node<T::Id, T::ResolvedAddress>>,
351 (SmallVec<Node<T::Id, T::ResolvedAddress>>, Error<T, D>),
352 > {
353 if self.has_left() || self.has_shutdown() {
354 return Err((Default::default(), Error::NotRunning));
355 }
356
357 let estimated_total = existing.size_hint().0;
358
359 let futs = existing
360 .into_iter()
361 .map(|node| {
362 async move {
363 let (id, addr) = node.into_components();
364 let resolved_addr = match addr {
365 MaybeResolvedAddress::Resolved(addr) => addr,
366 MaybeResolvedAddress::Unresolved(addr) => {
367 match self.inner.transport.resolve(&addr).await {
368 Ok(addr) => addr,
369 Err(e) => {
370 tracing::debug!(
371 err = %e,
372 "memberlist: failed to resolve address {}",
373 addr,
374 );
375 return Err((Node::new(id, MaybeResolvedAddress::<T::Address, T::ResolvedAddress>::unresolved(addr)), Error::<T, D>::transport(e)))
376 }
377 }
378 }
379 };
380 let node = Node::new(id, resolved_addr);
381 tracing::info!(local = %self.inner.transport.local_id(), peer = %node, "memberlist: start join...");
382 if let Err(e) = self.push_pull_node(node.cheap_clone(), true).await {
383 tracing::debug!(
384 local = %self.inner.id,
385 err = %e,
386 "memberlist: failed to join {}",
387 node,
388 );
389 let (id, addr) = node.into_components();
390 Err((Node::new(id, MaybeResolvedAddress::Resolved(addr)), e))
391 } else {
392 Ok(node)
393 }
394 }
395 }).collect::<futures::stream::FuturesUnordered<_>>();
396
397 let successes = std::cell::RefCell::new(SmallVec::with_capacity(estimated_total));
398 let errors = futs
399 .filter_map(|rst| async {
400 match rst {
401 Ok(node) => {
402 successes.borrow_mut().push(node);
403 None
404 }
405 Err((_, e)) => Some(e),
406 }
407 })
408 .collect::<OneOrMore<_>>()
409 .await;
410
411 match Error::try_from_one_or_more(errors) {
412 Ok(()) => Ok(successes.into_inner()),
413 Err(e) => Err((successes.into_inner(), e)),
414 }
415 }
416
417 #[inline]
421 pub fn health_score(&self) -> usize {
422 self.inner.awareness.get_health_score() as usize
423 }
424
425 pub async fn update_node(&self, timeout: Duration) -> Result<(), Error<T, D>> {
431 if self.has_left() || self.has_shutdown() {
432 return Err(Error::NotRunning);
433 }
434
435 let meta = if let Some(delegate) = &self.delegate {
437 let meta = delegate.node_meta(META_MAX_SIZE).await;
438 if meta.len() > META_MAX_SIZE {
439 panic!("node meta data provided is longer than the limit");
440 }
441 meta
442 } else {
443 Meta::empty()
444 };
445
446 let node = {
449 let members = self.inner.nodes.read().await;
450
451 let idx = *members.node_map.get(&self.inner.id).unwrap();
452
453 let state = &members.nodes[idx].state;
454 Node::new(state.id().cheap_clone(), state.address().cheap_clone())
455 };
456
457 let alive = Alive::new(self.next_incarnation(), node)
459 .with_meta(meta)
460 .with_protocol_version(self.inner.opts.protocol_version)
461 .with_delegate_version(self.inner.opts.delegate_version);
462 let (notify_tx, notify_rx) = async_channel::bounded(1);
463 self.alive_node(alive, Some(notify_tx), true).await;
464
465 if self.any_alive().await {
467 if timeout > Duration::ZERO {
468 let _ = <T::Runtime as RuntimeLite>::timeout(timeout, notify_rx.recv())
469 .await
470 .map_err(|_| Error::UpdateTimeout)?;
471 } else {
472 let _ = notify_rx.recv().await;
473 }
474 }
475
476 Ok(())
477 }
478
479 #[inline]
486 pub async fn send(&self, to: &T::ResolvedAddress, msg: Bytes) -> Result<(), Error<T, D>> {
487 self.send_many(to, std::iter::once(msg)).await
488 }
489
490 #[inline]
495 pub async fn send_many(
496 &self,
497 to: &T::ResolvedAddress,
498 msgs: impl Iterator<Item = Bytes>,
499 ) -> Result<(), Error<T, D>> {
500 if self.has_left() || self.has_shutdown() {
501 return Err(Error::NotRunning);
502 }
503
504 let stream = self
505 .transport_send_packets(to, msgs.map(Message::UserData).collect::<OneOrMore<_>>())
506 .await;
507 futures::pin_mut!(stream);
508 match stream.next().await {
509 None => Ok(()),
510 Some(Ok(_)) => Ok(()),
511 Some(Err(e)) => Err(e),
512 }
513 }
514
515 #[inline]
522 pub async fn send_reliable(
523 &self,
524 to: &T::ResolvedAddress,
525 msg: Bytes,
526 ) -> Result<(), Error<T, D>> {
527 self.send_many_reliable(to, std::iter::once(msg)).await
528 }
529
530 #[inline]
535 pub async fn send_many_reliable(
536 &self,
537 to: &T::ResolvedAddress,
538 msgs: impl Iterator<Item = Bytes>,
539 ) -> Result<(), Error<T, D>> {
540 if self.has_left() || self.has_shutdown() {
541 return Err(Error::NotRunning);
542 }
543 self
544 .send_user_msg(to, msgs.map(Message::UserData).collect())
545 .await
546 }
547
548 pub async fn ping(&self, node: Node<T::Id, T::ResolvedAddress>) -> Result<Duration, Error<T, D>> {
550 let self_addr = self.get_advertise();
552 let ping = Ping::new(
553 self.next_sequence_number(),
554 Node::new(self.inner.transport.local_id().clone(), self_addr.clone()),
555 node.clone(),
556 );
557
558 let (ack_tx, ack_rx) = async_channel::bounded(self.inner.opts.indirect_checks + 1);
559 self.inner.ack_manager.set_probe_channels(
560 ping.sequence_number(),
561 ack_tx,
562 None,
563 <T::Runtime as RuntimeLite>::now(),
564 self.inner.opts.probe_interval,
565 );
566
567 match <T::Runtime as RuntimeLite>::timeout(self.inner.opts.probe_timeout, async {
570 let stream = self.send_packets(node.address(), ping.into()).await;
571 futures::pin_mut!(stream);
572 let errs = stream.collect::<OneOrMore<_>>().await;
573 let num_errs = errs.len();
574
575 match num_errs {
576 0 => Ok(()),
577 _ => match errs.into_either() {
578 either::Either::Left([e]) => Err(e),
579 either::Either::Right(e) => Err(Error::Multiple(e.into_vec().into())),
580 },
581 }
582 })
583 .await
584 {
585 Ok(Ok(())) => {}
586 Ok(Err(e)) => return Err(e),
587 Err(_) => {
588 tracing::debug!(
590 "memberlist: failed ping {} by packet (timeout reached)",
591 node
592 );
593 return Err(Error::Lost(node));
594 }
595 }
596
597 let sent = <T::Runtime as RuntimeLite>::now();
601
602 futures::select! {
604 v = ack_rx.recv().fuse() => {
605 if let Ok(AckMessage { complete, .. }) = v {
607 if complete {
608 return Ok(sent.elapsed());
609 }
610 }
611 }
612 _ = <T::Runtime as RuntimeLite>::sleep(self.inner.opts.probe_timeout).fuse() => {}
613 }
614
615 tracing::debug!(
617 "memberlist: failed ping {} by packet (timeout reached)",
618 node
619 );
620 Err(Error::Lost(node))
621 }
622
623 pub async fn shutdown(&self) -> Result<(), Error<T, D>> {
631 self.inner.shutdown().await.map_err(Error::Transport)
632 }
633}