memberlist_core/delegate/
event.rs

1use std::{
2  future::Future,
3  pin::Pin,
4  sync::Arc,
5  task::{Context, Poll},
6};
7
8use futures::Stream;
9use nodecraft::{CheapClone, Id};
10
11use crate::proto::NodeState;
12
13#[derive(Debug, PartialEq, Eq, Hash)]
14#[repr(u8)]
15#[non_exhaustive]
16enum EventInner<I, A> {
17  /// Join event.
18  Join(Arc<NodeState<I, A>>),
19  /// Leave event.
20  Leave(Arc<NodeState<I, A>>),
21  /// Update event.
22  Update(Arc<NodeState<I, A>>),
23}
24
25impl<I, A> Clone for EventInner<I, A> {
26  fn clone(&self) -> Self {
27    match self {
28      EventInner::Join(node) => EventInner::Join(node.clone()),
29      EventInner::Leave(node) => EventInner::Leave(node.clone()),
30      EventInner::Update(node) => EventInner::Update(node.clone()),
31    }
32  }
33}
34
35/// Represents the kind of event that has occurred.
36#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
37#[repr(u8)]
38#[non_exhaustive]
39pub enum EventKind {
40  /// A node has joined the cluster.
41  Join,
42  /// A node has left the cluster.
43  Leave,
44  /// A node has updated, usually involving the meta data.
45  Update,
46}
47
48/// A single event related to node activity in the memberlist.
49#[derive(Debug)]
50pub struct Event<I, A>(EventInner<I, A>);
51
52impl<I, A> Clone for Event<I, A> {
53  fn clone(&self) -> Self {
54    Self(self.0.clone())
55  }
56}
57
58impl<I, A> Event<I, A> {
59  /// Returns the node state associated with the event.
60  #[inline]
61  pub fn node_state(&self) -> &NodeState<I, A> {
62    match &self.0 {
63      EventInner::Join(node) => node,
64      EventInner::Leave(node) => node,
65      EventInner::Update(node) => node,
66    }
67  }
68
69  /// Returns the kind of event that has occurred.
70  #[inline]
71  pub const fn kind(&self) -> EventKind {
72    match &self.0 {
73      EventInner::Join(_) => EventKind::Join,
74      EventInner::Leave(_) => EventKind::Leave,
75      EventInner::Update(_) => EventKind::Update,
76    }
77  }
78
79  pub(crate) fn join(node: Arc<NodeState<I, A>>) -> Self {
80    Event(EventInner::Join(node))
81  }
82
83  pub(crate) fn leave(node: Arc<NodeState<I, A>>) -> Self {
84    Event(EventInner::Leave(node))
85  }
86
87  pub(crate) fn update(node: Arc<NodeState<I, A>>) -> Self {
88    Event(EventInner::Update(node))
89  }
90}
91
92/// A simpler delegate that is used only to receive
93/// notifications about members joining and leaving. The methods in this
94/// delegate may be called by multiple threads, but never concurrently.
95/// This allows you to reason about ordering.
96#[auto_impl::auto_impl(Box, Arc)]
97pub trait EventDelegate: Send + Sync + 'static {
98  /// The id type of the delegate
99  type Id: Id;
100
101  /// The address type of the delegate
102  type Address: CheapClone + Send + Sync + 'static;
103
104  /// Invoked when a node is detected to have joined the cluster
105  fn notify_join(
106    &self,
107    node: Arc<NodeState<Self::Id, Self::Address>>,
108  ) -> impl Future<Output = ()> + Send;
109
110  /// Invoked when a node is detected to have left the cluster
111  fn notify_leave(
112    &self,
113    node: Arc<NodeState<Self::Id, Self::Address>>,
114  ) -> impl Future<Output = ()> + Send;
115
116  /// Invoked when a node is detected to have
117  /// updated, usually involving the meta data.
118  fn notify_update(
119    &self,
120    node: Arc<NodeState<Self::Id, Self::Address>>,
121  ) -> impl Future<Output = ()> + Send;
122}
123
124/// Used to enable an application to receive
125/// events about joins and leaves over a subscriber instead of a direct
126/// function call.
127pub struct SubscribleEventDelegate<I, A>(async_channel::Sender<Event<I, A>>);
128
129impl<I, A> SubscribleEventDelegate<I, A> {
130  /// Creates a new `EventDelegate` and unbounded subscriber.
131  pub fn unbounded() -> (Self, EventSubscriber<I, A>) {
132    let (tx, rx) = async_channel::unbounded();
133    (Self(tx), EventSubscriber(rx))
134  }
135
136  /// Creates a new `EventDelegate` and bounded subscriber.
137  ///
138  /// Care must be taken that events are processed in a timely manner from
139  /// the channel, since this delegate will block until an event can be sent.
140  pub fn bounded(capacity: usize) -> (Self, EventSubscriber<I, A>) {
141    let (tx, rx) = async_channel::bounded(capacity);
142    (Self(tx), EventSubscriber(rx))
143  }
144}
145
146impl<I, A> EventDelegate for SubscribleEventDelegate<I, A>
147where
148  I: Id + Send + Sync + 'static,
149  A: CheapClone + Send + Sync + 'static,
150{
151  type Id = I;
152
153  /// The address type of the delegate
154  type Address = A;
155
156  /// Invoked when a node is detected to have joined the cluster
157  async fn notify_join(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
158    let _ = self.0.send(Event::join(node)).await;
159  }
160
161  /// Invoked when a node is detected to have left the cluster
162  async fn notify_leave(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
163    let _ = self.0.send(Event::leave(node)).await;
164  }
165
166  /// Invoked when a node is detected to have
167  /// updated, usually involving the meta data.
168  async fn notify_update(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
169    let _ = self.0.send(Event::update(node)).await;
170  }
171}
172
173/// A subscriber for receiving events about joins and leaves.
174#[pin_project::pin_project]
175pub struct EventSubscriber<I, A>(#[pin] async_channel::Receiver<Event<I, A>>);
176
177impl<I, A> EventSubscriber<I, A> {
178  /// Receives the next event from the subscriber.
179  pub async fn recv(&self) -> Result<Event<I, A>, async_channel::RecvError> {
180    self.0.recv().await
181  }
182
183  /// Tries to receive the next event from the subscriber without blocking.
184  pub fn try_recv(&self) -> Result<Event<I, A>, async_channel::TryRecvError> {
185    self.0.try_recv()
186  }
187
188  /// Returns the capacity of the subscriber.
189  pub fn capacity(&self) -> Option<usize> {
190    self.0.capacity()
191  }
192
193  /// Returns the number of events in the subscriber.
194  pub fn len(&self) -> usize {
195    self.0.len()
196  }
197
198  /// Returns `true` if the subscriber is empty.
199  pub fn is_empty(&self) -> bool {
200    self.0.is_empty()
201  }
202
203  /// Returns `true` if the subscriber is full.
204  pub fn is_full(&self) -> bool {
205    self.0.is_full()
206  }
207}
208
209impl<I, A> Stream for EventSubscriber<I, A> {
210  type Item = Event<I, A>;
211
212  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
213    <async_channel::Receiver<Event<I, A>> as Stream>::poll_next(self.project().0, cx)
214  }
215}