1use std::sync::Arc;
2
3use byteorder::{ByteOrder, NetworkEndian};
4use memberlist_types::CheapClone;
5
6use super::{
7 DelegateVersion, MemberlistDelegateVersion, MemberlistProtocolVersion, Node, NodeTransformError,
8 ProtocolVersion, Tags, TagsTransformError, Transformable, UnknownDelegateVersion,
9 UnknownMemberlistDelegateVersion, UnknownMemberlistProtocolVersion, UnknownProtocolVersion,
10};
11
12#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, bytemuck::NoUninit)]
14#[repr(u8)]
15#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
16pub enum MemberStatus {
17 None = 0,
19 Alive = 1,
21 Leaving = 2,
23 Left = 3,
25 Failed = 4,
27}
28
29impl core::fmt::Display for MemberStatus {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 write!(f, "{}", self.as_str())
32 }
33}
34
35impl TryFrom<u8> for MemberStatus {
36 type Error = UnknownMemberStatus;
37
38 fn try_from(value: u8) -> Result<Self, Self::Error> {
39 match value {
40 0 => Ok(Self::None),
41 1 => Ok(Self::Alive),
42 2 => Ok(Self::Leaving),
43 3 => Ok(Self::Left),
44 4 => Ok(Self::Failed),
45 _ => Err(UnknownMemberStatus(value)),
46 }
47 }
48}
49
50impl MemberStatus {
51 #[inline]
53 pub const fn as_str(&self) -> &'static str {
54 match self {
55 Self::None => "none",
56 Self::Alive => "alive",
57 Self::Leaving => "leaving",
58 Self::Left => "left",
59 Self::Failed => "failed",
60 }
61 }
62}
63
64#[derive(Debug, Copy, Clone, thiserror::Error)]
66#[error("Unknown member status: {0}")]
67pub struct UnknownMemberStatus(u8);
68
69#[viewit::viewit(setters(prefix = "with"))]
71#[derive(Debug, PartialEq)]
72#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
73pub struct Member<I, A> {
74 #[viewit(
76 getter(const, style = "ref", attrs(doc = "Returns the node")),
77 setter(attrs(doc = "Sets the node (Builder pattern)"))
78 )]
79 node: Node<I, A>,
80 #[viewit(
82 getter(const, style = "ref", attrs(doc = "Returns the tags")),
83 setter(attrs(doc = "Sets the tags (Builder pattern)"))
84 )]
85 tags: Arc<Tags>,
86 #[viewit(
88 getter(const, style = "ref", attrs(doc = "Returns the status")),
89 setter(attrs(doc = "Sets the status (Builder pattern)"))
90 )]
91 status: MemberStatus,
92 #[viewit(
94 getter(const, attrs(doc = "Returns the memberlist protocol version")),
95 setter(
96 const,
97 attrs(doc = "Sets the memberlist protocol version (Builder pattern)")
98 )
99 )]
100 memberlist_protocol_version: MemberlistProtocolVersion,
101 #[viewit(
103 getter(const, attrs(doc = "Returns the memberlist delegate version")),
104 setter(
105 const,
106 attrs(doc = "Sets the memberlist delegate version (Builder pattern)")
107 )
108 )]
109 memberlist_delegate_version: MemberlistDelegateVersion,
110
111 #[viewit(
113 getter(const, attrs(doc = "Returns the serf protocol version")),
114 setter(
115 const,
116 attrs(doc = "Sets the serf protocol version (Builder pattern)")
117 )
118 )]
119 protocol_version: ProtocolVersion,
120 #[viewit(
122 getter(const, attrs(doc = "Returns the serf delegate version")),
123 setter(
124 const,
125 attrs(doc = "Sets the serf delegate version (Builder pattern)")
126 )
127 )]
128 delegate_version: DelegateVersion,
129}
130
131impl<I, A> Member<I, A> {
132 #[inline]
135 pub fn new(node: Node<I, A>, tags: Tags, status: MemberStatus) -> Self {
136 Self {
137 node,
138 tags: Arc::new(tags),
139 status,
140 memberlist_protocol_version: MemberlistProtocolVersion::V1,
141 memberlist_delegate_version: MemberlistDelegateVersion::V1,
142 protocol_version: ProtocolVersion::V1,
143 delegate_version: DelegateVersion::V1,
144 }
145 }
146}
147
148impl<I: Clone, A: Clone> Clone for Member<I, A> {
149 fn clone(&self) -> Self {
150 Self {
151 node: self.node.clone(),
152 tags: self.tags.clone(),
153 status: self.status,
154 memberlist_protocol_version: self.memberlist_protocol_version,
155 memberlist_delegate_version: self.memberlist_delegate_version,
156 protocol_version: self.protocol_version,
157 delegate_version: self.delegate_version,
158 }
159 }
160}
161
162impl<I: CheapClone, A: CheapClone> CheapClone for Member<I, A> {
163 fn cheap_clone(&self) -> Self {
164 Self {
165 node: self.node.cheap_clone(),
166 tags: self.tags.cheap_clone(),
167 status: self.status,
168 memberlist_protocol_version: self.memberlist_protocol_version,
169 memberlist_delegate_version: self.memberlist_delegate_version,
170 protocol_version: self.protocol_version,
171 delegate_version: self.delegate_version,
172 }
173 }
174}
175
176#[derive(thiserror::Error)]
178pub enum MemberTransformError<I, A>
179where
180 I: Transformable,
181 A: Transformable,
182{
183 #[error(transparent)]
185 Node(#[from] NodeTransformError<I, A>),
186 #[error(transparent)]
188 Tags(#[from] TagsTransformError),
189 #[error(transparent)]
191 MemberStatus(#[from] UnknownMemberStatus),
192 #[error("encode buffer too small")]
194 BufferTooSmall,
195 #[error("not enough bytes to decode `Member`")]
197 NotEnoughBytes,
198
199 #[error(transparent)]
201 MemberlistProtocolVersion(#[from] UnknownMemberlistProtocolVersion),
202
203 #[error(transparent)]
205 MemberlistDelegateVersion(#[from] UnknownMemberlistDelegateVersion),
206
207 #[error(transparent)]
209 ProtocolVersion(#[from] UnknownProtocolVersion),
210
211 #[error(transparent)]
213 DelegateVersion(#[from] UnknownDelegateVersion),
214}
215
216impl<I, A> core::fmt::Debug for MemberTransformError<I, A>
217where
218 I: Transformable,
219 A: Transformable,
220{
221 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
222 write!(f, "{}", self)
223 }
224}
225
226impl<I, A> Transformable for Member<I, A>
227where
228 I: Transformable,
229 A: Transformable,
230{
231 type Error = MemberTransformError<I, A>;
232
233 fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
234 let encoded_len = self.encoded_len();
235 if dst.len() < encoded_len {
236 return Err(Self::Error::BufferTooSmall);
237 }
238
239 let mut offset = 0;
240 NetworkEndian::write_u32(&mut dst[offset..], encoded_len as u32);
241 offset += 4;
242
243 offset += self.node.encode(&mut dst[offset..])?;
244 offset += self.tags.encode(&mut dst[offset..])?;
245 dst[offset] = self.status as u8;
246 offset += 1;
247
248 dst[offset] = self.memberlist_protocol_version as u8;
249 offset += 1;
250
251 dst[offset] = self.memberlist_delegate_version as u8;
252 offset += 1;
253
254 dst[offset] = self.protocol_version as u8;
255 offset += 1;
256
257 dst[offset] = self.delegate_version as u8;
258 offset += 1;
259
260 debug_assert_eq!(
261 offset, encoded_len,
262 "expect write {} bytes, but actually write {} bytes",
263 offset, encoded_len
264 );
265
266 Ok(offset)
267 }
268
269 fn encoded_len(&self) -> usize {
270 4 + self.node.encoded_len()
271 + self.tags.encoded_len()
272 + 1 + 1 + 1 + 1 + 1 }
278
279 fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
280 where
281 Self: Sized,
282 {
283 let src_len = src.len();
284
285 if src_len < 4 {
286 return Err(Self::Error::NotEnoughBytes);
287 }
288
289 let encoded_len = NetworkEndian::read_u32(&src[0..4]) as usize;
290 if src_len < encoded_len {
291 return Err(Self::Error::NotEnoughBytes);
292 }
293
294 let mut offset = 4;
295 let (node_len, node) = Node::decode(&src[offset..])?;
296 offset += node_len;
297
298 let (tags_len, tags) = Tags::decode(&src[offset..])?;
299 offset += tags_len;
300
301 if src_len < offset + 5 {
302 return Err(Self::Error::NotEnoughBytes);
303 }
304
305 let status = MemberStatus::try_from(src[offset])?;
306 offset += 1;
307
308 let memberlist_protocol_version = MemberlistProtocolVersion::try_from(src[offset])?;
309 offset += 1;
310
311 let memberlist_delegate_version = MemberlistDelegateVersion::try_from(src[offset])?;
312 offset += 1;
313
314 let protocol_version = ProtocolVersion::try_from(src[offset])?;
315 offset += 1;
316
317 let delegate_version = DelegateVersion::try_from(src[offset])?;
318 offset += 1;
319
320 debug_assert_eq!(
321 offset, encoded_len,
322 "expect read {} bytes, but actually read {} bytes",
323 offset, encoded_len
324 );
325
326 Ok((
327 encoded_len,
328 Self {
329 node,
330 tags: Arc::new(tags),
331 status,
332 memberlist_protocol_version,
333 memberlist_delegate_version,
334 protocol_version,
335 delegate_version,
336 },
337 ))
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use std::net::SocketAddr;
344
345 use rand::{distributions::Alphanumeric, random, thread_rng, Rng};
346 use smol_str::SmolStr;
347
348 use super::*;
349
350 impl Member<SmolStr, SocketAddr> {
351 fn random(num_tags: usize, size: usize) -> Self {
352 let id = thread_rng()
353 .sample_iter(Alphanumeric)
354 .take(size)
355 .collect::<Vec<u8>>();
356 let id = String::from_utf8(id).unwrap().into();
357 let addr = SocketAddr::from(([127, 0, 0, 1], random::<u16>()));
358 let node = Node::new(id, addr);
359 let tags = Tags::random(num_tags, size);
360
361 Self {
362 node,
363 tags: Arc::new(tags),
364 status: MemberStatus::Alive,
365 memberlist_protocol_version: MemberlistProtocolVersion::V1,
366 memberlist_delegate_version: MemberlistDelegateVersion::V1,
367 protocol_version: ProtocolVersion::V1,
368 delegate_version: DelegateVersion::V1,
369 }
370 }
371 }
372
373 #[test]
374 fn member_encode_decode() {
375 futures::executor::block_on(async {
376 for i in 0..100 {
377 let filter = Member::random(i % 10, i);
378 let mut buf = vec![0; filter.encoded_len()];
379 let encoded_len = filter.encode(&mut buf).unwrap();
380 assert_eq!(encoded_len, filter.encoded_len());
381
382 let (decoded_len, decoded) = Member::<SmolStr, SocketAddr>::decode(&buf).unwrap();
383 assert_eq!(decoded_len, encoded_len);
384 assert_eq!(decoded, filter);
385
386 let (decoded_len, decoded) =
387 Member::<SmolStr, SocketAddr>::decode_from_reader(&mut std::io::Cursor::new(&buf))
388 .unwrap();
389 assert_eq!(decoded_len, encoded_len);
390 assert_eq!(decoded, filter);
391
392 let (decoded_len, decoded) = Member::<SmolStr, SocketAddr>::decode_from_async_reader(
393 &mut futures::io::Cursor::new(&buf),
394 )
395 .await
396 .unwrap();
397 assert_eq!(decoded_len, encoded_len);
398 assert_eq!(decoded, filter);
399 }
400 });
401 }
402}