memberlist_core/
delegate.rs

1use std::{borrow::Cow, sync::Arc};
2
3use bytes::Bytes;
4use nodecraft::{CheapClone, Id};
5
6use crate::proto::{Meta, NodeState};
7
8#[cfg(any(test, feature = "test"))]
9#[doc(hidden)]
10pub mod mock;
11
12mod alive;
13pub use alive::*;
14
15mod conflict;
16pub use conflict::*;
17
18mod composite;
19pub use composite::*;
20
21mod event;
22pub use event::*;
23
24mod node;
25pub use node::*;
26
27mod merge;
28pub use merge::*;
29
30mod ping;
31pub use ping::*;
32
33/// Error trait for [`Delegate`]
34pub enum DelegateError<D: Delegate> {
35  /// [`AliveDelegate`] error
36  AliveDelegate(<D as AliveDelegate>::Error),
37  /// [`MergeDelegate`] error
38  MergeDelegate(<D as MergeDelegate>::Error),
39}
40
41impl<D: Delegate> core::fmt::Debug for DelegateError<D> {
42  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43    match self {
44      Self::AliveDelegate(err) => write!(f, "{err:?}"),
45      Self::MergeDelegate(err) => write!(f, "{err:?}"),
46    }
47  }
48}
49
50impl<D: Delegate> core::fmt::Display for DelegateError<D> {
51  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52    match self {
53      Self::AliveDelegate(err) => write!(f, "{err}"),
54      Self::MergeDelegate(err) => write!(f, "{err}"),
55    }
56  }
57}
58
59impl<D: Delegate> std::error::Error for DelegateError<D> {}
60
61impl<D: Delegate> DelegateError<D> {
62  /// Create a delegate error from an alive delegate error.
63  #[inline]
64  pub const fn alive(err: <D as AliveDelegate>::Error) -> Self {
65    Self::AliveDelegate(err)
66  }
67
68  /// Create a delegate error from a merge delegate error.
69  #[inline]
70  pub const fn merge(err: <D as MergeDelegate>::Error) -> Self {
71    Self::MergeDelegate(err)
72  }
73}
74
75/// [`Delegate`] is the trait that clients must implement if they want to hook
76/// into the gossip layer of [`Memberlist`](crate::Memberlist). All the methods must be thread-safe,
77/// as they can and generally will be called concurrently.
78pub trait Delegate:
79  NodeDelegate
80  + PingDelegate<Id = <Self as Delegate>::Id, Address = <Self as Delegate>::Address>
81  + EventDelegate<Id = <Self as Delegate>::Id, Address = <Self as Delegate>::Address>
82  + ConflictDelegate<Id = <Self as Delegate>::Id, Address = <Self as Delegate>::Address>
83  + AliveDelegate<Id = <Self as Delegate>::Id, Address = <Self as Delegate>::Address>
84  + MergeDelegate<Id = <Self as Delegate>::Id, Address = <Self as Delegate>::Address>
85{
86  /// The id type of the delegate
87  type Id: Id;
88
89  /// The address type of the delegate
90  type Address: CheapClone + Send + Sync + 'static;
91}
92
93/// Error type for [`VoidDelegate`].
94#[derive(Debug, Copy, Clone)]
95pub struct VoidDelegateError;
96
97impl std::fmt::Display for VoidDelegateError {
98  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99    write!(f, "void delegate error")
100  }
101}
102
103impl std::error::Error for VoidDelegateError {}
104
105/// Void delegate
106#[derive(Debug, Copy, Clone)]
107pub struct VoidDelegate<I, A>(core::marker::PhantomData<(I, A)>);
108
109impl<I, A> Default for VoidDelegate<I, A> {
110  fn default() -> Self {
111    Self::new()
112  }
113}
114
115impl<I, A> VoidDelegate<I, A> {
116  /// Creates a new [`VoidDelegate`].
117  #[inline]
118  pub const fn new() -> Self {
119    Self(core::marker::PhantomData)
120  }
121}
122
123impl<I, A> AliveDelegate for VoidDelegate<I, A>
124where
125  I: Id + Send + Sync + 'static,
126  A: CheapClone + Send + Sync + 'static,
127{
128  type Error = VoidDelegateError;
129  type Id = I;
130  type Address = A;
131
132  async fn notify_alive(
133    &self,
134    _peer: Arc<NodeState<Self::Id, Self::Address>>,
135  ) -> Result<(), Self::Error> {
136    Ok(())
137  }
138}
139
140impl<I, A> MergeDelegate for VoidDelegate<I, A>
141where
142  I: Id + Send + Sync + 'static,
143  A: CheapClone + Send + Sync + 'static,
144{
145  type Error = VoidDelegateError;
146  type Id = I;
147  type Address = A;
148
149  async fn notify_merge(
150    &self,
151    _peers: Arc<[NodeState<Self::Id, Self::Address>]>,
152  ) -> Result<(), Self::Error> {
153    Ok(())
154  }
155}
156
157impl<I, A> ConflictDelegate for VoidDelegate<I, A>
158where
159  I: Id + Send + Sync + 'static,
160  A: CheapClone + Send + Sync + 'static,
161{
162  type Id = I;
163  type Address = A;
164
165  async fn notify_conflict(
166    &self,
167    _existing: Arc<NodeState<Self::Id, Self::Address>>,
168    _other: Arc<NodeState<Self::Id, Self::Address>>,
169  ) {
170  }
171}
172
173impl<I, A> PingDelegate for VoidDelegate<I, A>
174where
175  I: Id + Send + Sync + 'static,
176  A: CheapClone + Send + Sync + 'static,
177{
178  type Id = I;
179  type Address = A;
180
181  async fn ack_payload(&self) -> Bytes {
182    Bytes::new()
183  }
184
185  async fn notify_ping_complete(
186    &self,
187    _node: Arc<NodeState<Self::Id, Self::Address>>,
188    _rtt: std::time::Duration,
189    _payload: Bytes,
190  ) {
191  }
192
193  fn disable_reliable_pings(&self, _target: &Self::Id) -> bool {
194    false
195  }
196}
197
198impl<I, A> EventDelegate for VoidDelegate<I, A>
199where
200  I: Id + Send + Sync + 'static,
201  A: CheapClone + Send + Sync + 'static,
202{
203  type Id = I;
204  type Address = A;
205
206  async fn notify_join(&self, _node: Arc<NodeState<Self::Id, Self::Address>>) {}
207
208  async fn notify_leave(&self, _node: Arc<NodeState<Self::Id, Self::Address>>) {}
209
210  async fn notify_update(&self, _node: Arc<NodeState<Self::Id, Self::Address>>) {}
211}
212
213impl<I, A> NodeDelegate for VoidDelegate<I, A>
214where
215  I: Id + Send + Sync + 'static,
216  A: CheapClone + Send + Sync + 'static,
217{
218  async fn node_meta(&self, _limit: usize) -> Meta {
219    Meta::empty()
220  }
221
222  async fn notify_message(&self, _msg: Cow<'_, [u8]>) {}
223
224  async fn broadcast_messages<F>(
225    &self,
226    _limit: usize,
227    _encoded_len: F,
228  ) -> impl Iterator<Item = Bytes> + Send
229  where
230    F: Fn(Bytes) -> (usize, Bytes) + Send,
231  {
232    core::iter::empty()
233  }
234
235  async fn local_state(&self, _join: bool) -> Bytes {
236    Bytes::new()
237  }
238
239  async fn merge_remote_state(&self, _buf: &[u8], _join: bool) {}
240}
241
242impl<I, A> Delegate for VoidDelegate<I, A>
243where
244  I: Id + Send + Sync + 'static,
245  A: CheapClone + Send + Sync + 'static,
246{
247  type Id = I;
248  type Address = A;
249}