memberlist_core/delegate/
event.rs1use std::{
2 future::Future,
3 pin::Pin,
4 sync::Arc,
5 task::{Context, Poll},
6};
7
8use futures::Stream;
9use nodecraft::{CheapClone, Id};
10
11use crate::proto::NodeState;
12
13#[derive(Debug, PartialEq, Eq, Hash)]
14#[repr(u8)]
15#[non_exhaustive]
16enum EventInner<I, A> {
17 Join(Arc<NodeState<I, A>>),
19 Leave(Arc<NodeState<I, A>>),
21 Update(Arc<NodeState<I, A>>),
23}
24
25impl<I, A> Clone for EventInner<I, A> {
26 fn clone(&self) -> Self {
27 match self {
28 EventInner::Join(node) => EventInner::Join(node.clone()),
29 EventInner::Leave(node) => EventInner::Leave(node.clone()),
30 EventInner::Update(node) => EventInner::Update(node.clone()),
31 }
32 }
33}
34
35#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
37#[repr(u8)]
38#[non_exhaustive]
39pub enum EventKind {
40 Join,
42 Leave,
44 Update,
46}
47
48#[derive(Debug)]
50pub struct Event<I, A>(EventInner<I, A>);
51
52impl<I, A> Clone for Event<I, A> {
53 fn clone(&self) -> Self {
54 Self(self.0.clone())
55 }
56}
57
58impl<I, A> Event<I, A> {
59 #[inline]
61 pub fn node_state(&self) -> &NodeState<I, A> {
62 match &self.0 {
63 EventInner::Join(node) => node,
64 EventInner::Leave(node) => node,
65 EventInner::Update(node) => node,
66 }
67 }
68
69 #[inline]
71 pub const fn kind(&self) -> EventKind {
72 match &self.0 {
73 EventInner::Join(_) => EventKind::Join,
74 EventInner::Leave(_) => EventKind::Leave,
75 EventInner::Update(_) => EventKind::Update,
76 }
77 }
78
79 pub(crate) fn join(node: Arc<NodeState<I, A>>) -> Self {
80 Event(EventInner::Join(node))
81 }
82
83 pub(crate) fn leave(node: Arc<NodeState<I, A>>) -> Self {
84 Event(EventInner::Leave(node))
85 }
86
87 pub(crate) fn update(node: Arc<NodeState<I, A>>) -> Self {
88 Event(EventInner::Update(node))
89 }
90}
91
92#[auto_impl::auto_impl(Box, Arc)]
97pub trait EventDelegate: Send + Sync + 'static {
98 type Id: Id;
100
101 type Address: CheapClone + Send + Sync + 'static;
103
104 fn notify_join(
106 &self,
107 node: Arc<NodeState<Self::Id, Self::Address>>,
108 ) -> impl Future<Output = ()> + Send;
109
110 fn notify_leave(
112 &self,
113 node: Arc<NodeState<Self::Id, Self::Address>>,
114 ) -> impl Future<Output = ()> + Send;
115
116 fn notify_update(
119 &self,
120 node: Arc<NodeState<Self::Id, Self::Address>>,
121 ) -> impl Future<Output = ()> + Send;
122}
123
124pub struct SubscribleEventDelegate<I, A>(async_channel::Sender<Event<I, A>>);
128
129impl<I, A> SubscribleEventDelegate<I, A> {
130 pub fn unbounded() -> (Self, EventSubscriber<I, A>) {
132 let (tx, rx) = async_channel::unbounded();
133 (Self(tx), EventSubscriber(rx))
134 }
135
136 pub fn bounded(capacity: usize) -> (Self, EventSubscriber<I, A>) {
141 let (tx, rx) = async_channel::bounded(capacity);
142 (Self(tx), EventSubscriber(rx))
143 }
144}
145
146impl<I, A> EventDelegate for SubscribleEventDelegate<I, A>
147where
148 I: Id + Send + Sync + 'static,
149 A: CheapClone + Send + Sync + 'static,
150{
151 type Id = I;
152
153 type Address = A;
155
156 async fn notify_join(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
158 let _ = self.0.send(Event::join(node)).await;
159 }
160
161 async fn notify_leave(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
163 let _ = self.0.send(Event::leave(node)).await;
164 }
165
166 async fn notify_update(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
169 let _ = self.0.send(Event::update(node)).await;
170 }
171}
172
173#[pin_project::pin_project]
175pub struct EventSubscriber<I, A>(#[pin] async_channel::Receiver<Event<I, A>>);
176
177impl<I, A> EventSubscriber<I, A> {
178 pub async fn recv(&self) -> Result<Event<I, A>, async_channel::RecvError> {
180 self.0.recv().await
181 }
182
183 pub fn try_recv(&self) -> Result<Event<I, A>, async_channel::TryRecvError> {
185 self.0.try_recv()
186 }
187
188 pub fn capacity(&self) -> Option<usize> {
190 self.0.capacity()
191 }
192
193 pub fn len(&self) -> usize {
195 self.0.len()
196 }
197
198 pub fn is_empty(&self) -> bool {
200 self.0.is_empty()
201 }
202
203 pub fn is_full(&self) -> bool {
205 self.0.is_full()
206 }
207}
208
209impl<I, A> Stream for EventSubscriber<I, A> {
210 type Item = Event<I, A>;
211
212 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
213 <async_channel::Receiver<Event<I, A>> as Stream>::poll_next(self.project().0, cx)
214 }
215}