serf_types/
member.rs

1use std::sync::Arc;
2
3use byteorder::{ByteOrder, NetworkEndian};
4use memberlist_types::CheapClone;
5
6use super::{
7  DelegateVersion, MemberlistDelegateVersion, MemberlistProtocolVersion, Node, NodeTransformError,
8  ProtocolVersion, Tags, TagsTransformError, Transformable, UnknownDelegateVersion,
9  UnknownMemberlistDelegateVersion, UnknownMemberlistProtocolVersion, UnknownProtocolVersion,
10};
11
12/// The member status.
13#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, bytemuck::NoUninit)]
14#[repr(u8)]
15#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
16pub enum MemberStatus {
17  /// None status
18  None = 0,
19  /// Alive status
20  Alive = 1,
21  /// Leaving status
22  Leaving = 2,
23  /// Left status
24  Left = 3,
25  /// Failed status
26  Failed = 4,
27}
28
29impl core::fmt::Display for MemberStatus {
30  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31    write!(f, "{}", self.as_str())
32  }
33}
34
35impl TryFrom<u8> for MemberStatus {
36  type Error = UnknownMemberStatus;
37
38  fn try_from(value: u8) -> Result<Self, Self::Error> {
39    match value {
40      0 => Ok(Self::None),
41      1 => Ok(Self::Alive),
42      2 => Ok(Self::Leaving),
43      3 => Ok(Self::Left),
44      4 => Ok(Self::Failed),
45      _ => Err(UnknownMemberStatus(value)),
46    }
47  }
48}
49
50impl MemberStatus {
51  /// Get the string representation of the member status
52  #[inline]
53  pub const fn as_str(&self) -> &'static str {
54    match self {
55      Self::None => "none",
56      Self::Alive => "alive",
57      Self::Leaving => "leaving",
58      Self::Left => "left",
59      Self::Failed => "failed",
60    }
61  }
62}
63
64/// Unknown member status
65#[derive(Debug, Copy, Clone, thiserror::Error)]
66#[error("Unknown member status: {0}")]
67pub struct UnknownMemberStatus(u8);
68
69/// A single member of the Serf cluster.
70#[viewit::viewit(setters(prefix = "with"))]
71#[derive(Debug, PartialEq)]
72#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
73pub struct Member<I, A> {
74  /// The node
75  #[viewit(
76    getter(const, style = "ref", attrs(doc = "Returns the node")),
77    setter(attrs(doc = "Sets the node (Builder pattern)"))
78  )]
79  node: Node<I, A>,
80  /// The tags
81  #[viewit(
82    getter(const, style = "ref", attrs(doc = "Returns the tags")),
83    setter(attrs(doc = "Sets the tags (Builder pattern)"))
84  )]
85  tags: Arc<Tags>,
86  /// The status
87  #[viewit(
88    getter(const, style = "ref", attrs(doc = "Returns the status")),
89    setter(attrs(doc = "Sets the status (Builder pattern)"))
90  )]
91  status: MemberStatus,
92  /// The memberlist protocol version
93  #[viewit(
94    getter(const, attrs(doc = "Returns the memberlist protocol version")),
95    setter(
96      const,
97      attrs(doc = "Sets the memberlist protocol version (Builder pattern)")
98    )
99  )]
100  memberlist_protocol_version: MemberlistProtocolVersion,
101  /// The memberlist delegate version
102  #[viewit(
103    getter(const, attrs(doc = "Returns the memberlist delegate version")),
104    setter(
105      const,
106      attrs(doc = "Sets the memberlist delegate version (Builder pattern)")
107    )
108  )]
109  memberlist_delegate_version: MemberlistDelegateVersion,
110
111  /// The serf protocol version
112  #[viewit(
113    getter(const, attrs(doc = "Returns the serf protocol version")),
114    setter(
115      const,
116      attrs(doc = "Sets the serf protocol version (Builder pattern)")
117    )
118  )]
119  protocol_version: ProtocolVersion,
120  /// The serf delegate version
121  #[viewit(
122    getter(const, attrs(doc = "Returns the serf delegate version")),
123    setter(
124      const,
125      attrs(doc = "Sets the serf delegate version (Builder pattern)")
126    )
127  )]
128  delegate_version: DelegateVersion,
129}
130
131impl<I, A> Member<I, A> {
132  /// Create a new member with the given node, tags, and status.
133  /// Other fields are set to their default values.
134  #[inline]
135  pub fn new(node: Node<I, A>, tags: Tags, status: MemberStatus) -> Self {
136    Self {
137      node,
138      tags: Arc::new(tags),
139      status,
140      memberlist_protocol_version: MemberlistProtocolVersion::V1,
141      memberlist_delegate_version: MemberlistDelegateVersion::V1,
142      protocol_version: ProtocolVersion::V1,
143      delegate_version: DelegateVersion::V1,
144    }
145  }
146}
147
148impl<I: Clone, A: Clone> Clone for Member<I, A> {
149  fn clone(&self) -> Self {
150    Self {
151      node: self.node.clone(),
152      tags: self.tags.clone(),
153      status: self.status,
154      memberlist_protocol_version: self.memberlist_protocol_version,
155      memberlist_delegate_version: self.memberlist_delegate_version,
156      protocol_version: self.protocol_version,
157      delegate_version: self.delegate_version,
158    }
159  }
160}
161
162impl<I: CheapClone, A: CheapClone> CheapClone for Member<I, A> {
163  fn cheap_clone(&self) -> Self {
164    Self {
165      node: self.node.cheap_clone(),
166      tags: self.tags.cheap_clone(),
167      status: self.status,
168      memberlist_protocol_version: self.memberlist_protocol_version,
169      memberlist_delegate_version: self.memberlist_delegate_version,
170      protocol_version: self.protocol_version,
171      delegate_version: self.delegate_version,
172    }
173  }
174}
175
176/// Error transforming the [`Member`]
177#[derive(thiserror::Error)]
178pub enum MemberTransformError<I, A>
179where
180  I: Transformable,
181  A: Transformable,
182{
183  /// Error transforming the `node` field
184  #[error(transparent)]
185  Node(#[from] NodeTransformError<I, A>),
186  /// Error transforming the `tags` field
187  #[error(transparent)]
188  Tags(#[from] TagsTransformError),
189  /// Error transforming the `status` field
190  #[error(transparent)]
191  MemberStatus(#[from] UnknownMemberStatus),
192  /// Encode buffer too small
193  #[error("encode buffer too small")]
194  BufferTooSmall,
195  /// Not enough bytes to decode
196  #[error("not enough bytes to decode `Member`")]
197  NotEnoughBytes,
198
199  /// Error transforming the `memberlist_protocol_version` field
200  #[error(transparent)]
201  MemberlistProtocolVersion(#[from] UnknownMemberlistProtocolVersion),
202
203  /// Error transforming the `memberlist_delegate_version` field
204  #[error(transparent)]
205  MemberlistDelegateVersion(#[from] UnknownMemberlistDelegateVersion),
206
207  /// Error transforming the `protocol_version` field
208  #[error(transparent)]
209  ProtocolVersion(#[from] UnknownProtocolVersion),
210
211  /// Error transforming the `delegate_version` field
212  #[error(transparent)]
213  DelegateVersion(#[from] UnknownDelegateVersion),
214}
215
216impl<I, A> core::fmt::Debug for MemberTransformError<I, A>
217where
218  I: Transformable,
219  A: Transformable,
220{
221  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
222    write!(f, "{}", self)
223  }
224}
225
226impl<I, A> Transformable for Member<I, A>
227where
228  I: Transformable,
229  A: Transformable,
230{
231  type Error = MemberTransformError<I, A>;
232
233  fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
234    let encoded_len = self.encoded_len();
235    if dst.len() < encoded_len {
236      return Err(Self::Error::BufferTooSmall);
237    }
238
239    let mut offset = 0;
240    NetworkEndian::write_u32(&mut dst[offset..], encoded_len as u32);
241    offset += 4;
242
243    offset += self.node.encode(&mut dst[offset..])?;
244    offset += self.tags.encode(&mut dst[offset..])?;
245    dst[offset] = self.status as u8;
246    offset += 1;
247
248    dst[offset] = self.memberlist_protocol_version as u8;
249    offset += 1;
250
251    dst[offset] = self.memberlist_delegate_version as u8;
252    offset += 1;
253
254    dst[offset] = self.protocol_version as u8;
255    offset += 1;
256
257    dst[offset] = self.delegate_version as u8;
258    offset += 1;
259
260    debug_assert_eq!(
261      offset, encoded_len,
262      "expect write {} bytes, but actually write {} bytes",
263      offset, encoded_len
264    );
265
266    Ok(offset)
267  }
268
269  fn encoded_len(&self) -> usize {
270    4 + self.node.encoded_len()
271      + self.tags.encoded_len()
272      + 1 // status
273      + 1 // memberlist_protocol_version
274      + 1 // memberlist_delegate_version
275      + 1 // protocol_version
276      + 1 // delegate_version
277  }
278
279  fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
280  where
281    Self: Sized,
282  {
283    let src_len = src.len();
284
285    if src_len < 4 {
286      return Err(Self::Error::NotEnoughBytes);
287    }
288
289    let encoded_len = NetworkEndian::read_u32(&src[0..4]) as usize;
290    if src_len < encoded_len {
291      return Err(Self::Error::NotEnoughBytes);
292    }
293
294    let mut offset = 4;
295    let (node_len, node) = Node::decode(&src[offset..])?;
296    offset += node_len;
297
298    let (tags_len, tags) = Tags::decode(&src[offset..])?;
299    offset += tags_len;
300
301    if src_len < offset + 5 {
302      return Err(Self::Error::NotEnoughBytes);
303    }
304
305    let status = MemberStatus::try_from(src[offset])?;
306    offset += 1;
307
308    let memberlist_protocol_version = MemberlistProtocolVersion::try_from(src[offset])?;
309    offset += 1;
310
311    let memberlist_delegate_version = MemberlistDelegateVersion::try_from(src[offset])?;
312    offset += 1;
313
314    let protocol_version = ProtocolVersion::try_from(src[offset])?;
315    offset += 1;
316
317    let delegate_version = DelegateVersion::try_from(src[offset])?;
318    offset += 1;
319
320    debug_assert_eq!(
321      offset, encoded_len,
322      "expect read {} bytes, but actually read {} bytes",
323      offset, encoded_len
324    );
325
326    Ok((
327      encoded_len,
328      Self {
329        node,
330        tags: Arc::new(tags),
331        status,
332        memberlist_protocol_version,
333        memberlist_delegate_version,
334        protocol_version,
335        delegate_version,
336      },
337    ))
338  }
339}
340
341#[cfg(test)]
342mod tests {
343  use std::net::SocketAddr;
344
345  use rand::{distributions::Alphanumeric, random, thread_rng, Rng};
346  use smol_str::SmolStr;
347
348  use super::*;
349
350  impl Member<SmolStr, SocketAddr> {
351    fn random(num_tags: usize, size: usize) -> Self {
352      let id = thread_rng()
353        .sample_iter(Alphanumeric)
354        .take(size)
355        .collect::<Vec<u8>>();
356      let id = String::from_utf8(id).unwrap().into();
357      let addr = SocketAddr::from(([127, 0, 0, 1], random::<u16>()));
358      let node = Node::new(id, addr);
359      let tags = Tags::random(num_tags, size);
360
361      Self {
362        node,
363        tags: Arc::new(tags),
364        status: MemberStatus::Alive,
365        memberlist_protocol_version: MemberlistProtocolVersion::V1,
366        memberlist_delegate_version: MemberlistDelegateVersion::V1,
367        protocol_version: ProtocolVersion::V1,
368        delegate_version: DelegateVersion::V1,
369      }
370    }
371  }
372
373  #[test]
374  fn member_encode_decode() {
375    futures::executor::block_on(async {
376      for i in 0..100 {
377        let filter = Member::random(i % 10, i);
378        let mut buf = vec![0; filter.encoded_len()];
379        let encoded_len = filter.encode(&mut buf).unwrap();
380        assert_eq!(encoded_len, filter.encoded_len());
381
382        let (decoded_len, decoded) = Member::<SmolStr, SocketAddr>::decode(&buf).unwrap();
383        assert_eq!(decoded_len, encoded_len);
384        assert_eq!(decoded, filter);
385
386        let (decoded_len, decoded) =
387          Member::<SmolStr, SocketAddr>::decode_from_reader(&mut std::io::Cursor::new(&buf))
388            .unwrap();
389        assert_eq!(decoded_len, encoded_len);
390        assert_eq!(decoded, filter);
391
392        let (decoded_len, decoded) = Member::<SmolStr, SocketAddr>::decode_from_async_reader(
393          &mut futures::io::Cursor::new(&buf),
394        )
395        .await
396        .unwrap();
397        assert_eq!(decoded_len, encoded_len);
398        assert_eq!(decoded, filter);
399      }
400    });
401  }
402}