serf_core/delegate/
composite.rs

1use memberlist_core::{
2  transport::{Id, Node},
3  types::TinyVec,
4  CheapClone,
5};
6use serf_types::MessageType;
7
8use crate::{
9  coordinate::Coordinate,
10  types::{AsMessageRef, Filter, Member, SerfMessage, Tags},
11};
12
13use super::{
14  DefaultMergeDelegate, Delegate, LpeTransfromDelegate, MergeDelegate, NoopReconnectDelegate,
15  ReconnectDelegate, TransformDelegate,
16};
17
18/// `CompositeDelegate` is a helpful struct to split the [`Delegate`] into multiple small delegates,
19/// so that users do not need to implement full [`Delegate`] when they only want to custom some methods
20/// in the [`Delegate`].
21pub struct CompositeDelegate<
22  I,
23  A,
24  M = DefaultMergeDelegate<I, A>,
25  R = NoopReconnectDelegate<I, A>,
26  T = LpeTransfromDelegate<I, A>,
27> {
28  merge: M,
29  reconnect: R,
30  transform: T,
31  _m: std::marker::PhantomData<(I, A)>,
32}
33
34impl<I, A> Default for CompositeDelegate<I, A> {
35  fn default() -> Self {
36    Self::new()
37  }
38}
39
40impl<I, A> CompositeDelegate<I, A> {
41  /// Returns a new `CompositeDelegate`.
42  pub fn new() -> Self {
43    Self {
44      merge: Default::default(),
45      reconnect: Default::default(),
46      transform: Default::default(),
47      _m: std::marker::PhantomData,
48    }
49  }
50}
51
52impl<I, A, M, R, T> CompositeDelegate<I, A, M, R, T>
53where
54  M: MergeDelegate<Id = I, Address = A>,
55{
56  /// Set the [`MergeDelegate`] for the `CompositeDelegate`.
57  pub fn with_merge_delegate<NM>(self, merge: NM) -> CompositeDelegate<I, A, NM, R, T> {
58    CompositeDelegate {
59      merge,
60      reconnect: self.reconnect,
61      transform: self.transform,
62      _m: std::marker::PhantomData,
63    }
64  }
65}
66
67impl<I, A, M, R, T> CompositeDelegate<I, A, M, R, T> {
68  /// Set the [`ReconnectDelegate`] for the `CompositeDelegate`.
69  pub fn with_reconnect_delegate<NR>(self, reconnect: NR) -> CompositeDelegate<I, A, M, NR, T> {
70    CompositeDelegate {
71      reconnect,
72      merge: self.merge,
73      transform: self.transform,
74      _m: std::marker::PhantomData,
75    }
76  }
77}
78
79impl<I, A, M, R, T> CompositeDelegate<I, A, M, R, T> {
80  /// Set the [`TransformDelegate`] for the `CompositeDelegate`.
81  pub fn with_transform_delegate<NT>(self, transform: NT) -> CompositeDelegate<I, A, M, R, NT> {
82    CompositeDelegate {
83      transform,
84      merge: self.merge,
85      reconnect: self.reconnect,
86      _m: std::marker::PhantomData,
87    }
88  }
89}
90
91impl<I, A, M, R, T> MergeDelegate for CompositeDelegate<I, A, M, R, T>
92where
93  I: Id,
94  A: CheapClone + Send + Sync + 'static,
95  M: MergeDelegate<Id = I, Address = A>,
96  R: Send + Sync + 'static,
97  T: Send + Sync + 'static,
98{
99  type Error = M::Error;
100
101  type Id = M::Id;
102
103  type Address = M::Address;
104
105  async fn notify_merge(
106    &self,
107    members: TinyVec<Member<Self::Id, Self::Address>>,
108  ) -> Result<(), Self::Error> {
109    self.merge.notify_merge(members).await
110  }
111}
112
113impl<I, A, M, R, T> ReconnectDelegate for CompositeDelegate<I, A, M, R, T>
114where
115  I: Id,
116  A: CheapClone + Send + Sync + 'static,
117  M: Send + Sync + 'static,
118  R: ReconnectDelegate<Id = I, Address = A>,
119  T: Send + Sync + 'static,
120{
121  type Id = R::Id;
122
123  type Address = R::Address;
124
125  fn reconnect_timeout(
126    &self,
127    member: &Member<Self::Id, Self::Address>,
128    timeout: std::time::Duration,
129  ) -> std::time::Duration {
130    self.reconnect.reconnect_timeout(member, timeout)
131  }
132}
133
134impl<I, A, M, R, T> TransformDelegate for CompositeDelegate<I, A, M, R, T>
135where
136  I: Id,
137  A: CheapClone + Send + Sync + 'static,
138  M: Send + Sync + 'static,
139  R: Send + Sync + 'static,
140  T: TransformDelegate<Id = I, Address = A>,
141{
142  type Error = T::Error;
143
144  type Id = T::Id;
145
146  type Address = T::Address;
147
148  fn encode_filter(
149    filter: &Filter<Self::Id>,
150  ) -> Result<memberlist_core::bytes::Bytes, Self::Error> {
151    T::encode_filter(filter)
152  }
153
154  fn decode_filter(bytes: &[u8]) -> Result<(usize, Filter<Self::Id>), Self::Error> {
155    T::decode_filter(bytes)
156  }
157
158  fn node_encoded_len(node: &Node<Self::Id, Self::Address>) -> usize {
159    T::node_encoded_len(node)
160  }
161
162  fn encode_node(
163    node: &Node<Self::Id, Self::Address>,
164    dst: &mut [u8],
165  ) -> Result<usize, Self::Error> {
166    T::encode_node(node, dst)
167  }
168
169  fn decode_node(
170    bytes: impl AsRef<[u8]>,
171  ) -> Result<(usize, Node<Self::Id, Self::Address>), Self::Error> {
172    T::decode_node(bytes)
173  }
174
175  fn id_encoded_len(id: &Self::Id) -> usize {
176    T::id_encoded_len(id)
177  }
178
179  fn encode_id(id: &Self::Id, dst: &mut [u8]) -> Result<usize, Self::Error> {
180    T::encode_id(id, dst)
181  }
182
183  fn decode_id(bytes: &[u8]) -> Result<(usize, Self::Id), Self::Error> {
184    T::decode_id(bytes)
185  }
186
187  fn address_encoded_len(address: &Self::Address) -> usize {
188    T::address_encoded_len(address)
189  }
190
191  fn encode_address(address: &Self::Address, dst: &mut [u8]) -> Result<usize, Self::Error> {
192    T::encode_address(address, dst)
193  }
194
195  fn decode_address(bytes: &[u8]) -> Result<(usize, Self::Address), Self::Error> {
196    T::decode_address(bytes)
197  }
198
199  fn coordinate_encoded_len(coordinate: &Coordinate) -> usize {
200    T::coordinate_encoded_len(coordinate)
201  }
202
203  fn encode_coordinate(coordinate: &Coordinate, dst: &mut [u8]) -> Result<usize, Self::Error> {
204    T::encode_coordinate(coordinate, dst)
205  }
206
207  fn decode_coordinate(bytes: &[u8]) -> Result<(usize, Coordinate), Self::Error> {
208    T::decode_coordinate(bytes)
209  }
210
211  fn tags_encoded_len(tags: &Tags) -> usize {
212    T::tags_encoded_len(tags)
213  }
214
215  fn encode_tags(tags: &Tags, dst: &mut [u8]) -> Result<usize, Self::Error> {
216    T::encode_tags(tags, dst)
217  }
218
219  fn decode_tags(bytes: &[u8]) -> Result<(usize, Tags), Self::Error> {
220    T::decode_tags(bytes)
221  }
222
223  fn message_encoded_len(msg: impl AsMessageRef<Self::Id, Self::Address>) -> usize {
224    T::message_encoded_len(msg)
225  }
226
227  fn encode_message(
228    msg: impl AsMessageRef<Self::Id, Self::Address>,
229    dst: impl AsMut<[u8]>,
230  ) -> Result<usize, Self::Error> {
231    T::encode_message(msg, dst)
232  }
233
234  fn decode_message(
235    ty: MessageType,
236    bytes: impl AsRef<[u8]>,
237  ) -> Result<(usize, SerfMessage<Self::Id, Self::Address>), Self::Error> {
238    T::decode_message(ty, bytes)
239  }
240}
241
242impl<I, A, M, R, T> Delegate for CompositeDelegate<I, A, M, R, T>
243where
244  I: Id,
245  A: CheapClone + Send + Sync + 'static,
246  M: MergeDelegate<Id = I, Address = A>,
247  R: ReconnectDelegate<Id = I, Address = A>,
248  T: TransformDelegate<Id = I, Address = A>,
249{
250  type Id = I;
251
252  type Address = A;
253}