serf_types/
leave.rs

1use byteorder::{ByteOrder, NetworkEndian};
2
3use super::{LamportTime, LamportTimeTransformError, Transformable};
4
5/// The message broadcasted to signal the intentional to
6/// leave.
7#[viewit::viewit(setters(prefix = "with"))]
8#[derive(Debug, Clone, Eq, PartialEq)]
9#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
10pub struct LeaveMessage<I> {
11  /// The lamport time
12  #[viewit(
13    getter(const, attrs(doc = "Returns the lamport time for this message")),
14    setter(
15      const,
16      attrs(doc = "Sets the lamport time for this message (Builder pattern)")
17    )
18  )]
19  ltime: LamportTime,
20  /// The id of the node
21  #[viewit(
22    getter(const, style = "ref", attrs(doc = "Returns the node")),
23    setter(attrs(doc = "Sets the node (Builder pattern)"))
24  )]
25  id: I,
26
27  /// If prune or not
28  #[viewit(
29    getter(const, style = "ref", attrs(doc = "Returns if prune or not")),
30    setter(attrs(doc = "Sets prune or not (Builder pattern)"))
31  )]
32  prune: bool,
33}
34
35/// Error that can occur when transforming a [`LeaveMessage`].
36#[derive(thiserror::Error)]
37pub enum LeaveMessageTransformError<I: Transformable> {
38  /// Not enough bytes to decode LeaveMessage
39  #[error("not enough bytes to decode LeaveMessage")]
40  NotEnoughBytes,
41  /// Encode buffer too small
42  #[error("encode buffer too small")]
43  EncodeBufferTooSmall,
44  /// Error transforming Node
45  #[error(transparent)]
46  Id(I::Error),
47  /// Error transforming LamportTime
48  #[error(transparent)]
49  LamportTime(#[from] LamportTimeTransformError),
50}
51
52impl<I: Transformable> core::fmt::Debug for LeaveMessageTransformError<I> {
53  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
54    write!(f, "{}", self)
55  }
56}
57
58impl<I> Transformable for LeaveMessage<I>
59where
60  I: Transformable,
61{
62  type Error = LeaveMessageTransformError<I>;
63
64  fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
65    let encoded_len = self.encoded_len();
66    if dst.len() < encoded_len {
67      return Err(Self::Error::EncodeBufferTooSmall);
68    }
69
70    let mut offset = 0;
71    NetworkEndian::write_u32(&mut dst[offset..], encoded_len as u32);
72    offset += 4;
73    dst[offset] = self.prune as u8;
74    offset += 1;
75    offset += self.ltime.encode(&mut dst[offset..])?;
76    offset += self
77      .id
78      .encode(&mut dst[offset..])
79      .map_err(Self::Error::Id)?;
80
81    debug_assert_eq!(
82      offset, encoded_len,
83      "expect write {} bytes, but actual write {} bytes",
84      encoded_len, offset
85    );
86
87    Ok(offset)
88  }
89
90  fn encoded_len(&self) -> usize {
91    4 + 1 + self.id.encoded_len() + self.ltime.encoded_len()
92  }
93
94  fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
95  where
96    Self: Sized,
97  {
98    if src.len() < 5 {
99      return Err(Self::Error::NotEnoughBytes);
100    }
101
102    let len = NetworkEndian::read_u32(&src[0..4]) as usize;
103    if src.len() + 5 < len {
104      return Err(Self::Error::NotEnoughBytes);
105    }
106
107    let mut offset = 4;
108    let prune = src[offset] != 0;
109    offset += 1;
110
111    let (read, ltime) = LamportTime::decode(&src[offset..])?;
112    offset += read;
113
114    let (read, id) = I::decode(&src[offset..]).map_err(Self::Error::Id)?;
115    offset += read;
116
117    debug_assert_eq!(
118      offset, len,
119      "expect read {} bytes, but actual read {} bytes",
120      len, offset
121    );
122
123    Ok((offset, Self { ltime, id, prune }))
124  }
125}
126
127#[cfg(test)]
128mod tests {
129
130  use rand::{distributions::Alphanumeric, thread_rng, Rng};
131  use smol_str::SmolStr;
132
133  use super::*;
134
135  impl LeaveMessage<SmolStr> {
136    fn random(size: usize) -> Self {
137      let id = thread_rng()
138        .sample_iter(Alphanumeric)
139        .take(size)
140        .collect::<Vec<u8>>();
141      let id = String::from_utf8(id).unwrap().into();
142
143      Self {
144        ltime: LamportTime::random(),
145        id,
146        prune: thread_rng().gen(),
147      }
148    }
149  }
150
151  #[test]
152  fn test_leave_message_transform() {
153    futures::executor::block_on(async {
154      for i in 0..100 {
155        let filter = LeaveMessage::random(i);
156        let mut buf = vec![0; filter.encoded_len()];
157        let encoded_len = filter.encode(&mut buf).unwrap();
158        assert_eq!(encoded_len, filter.encoded_len());
159
160        let (decoded_len, decoded) = LeaveMessage::<SmolStr>::decode(&buf).unwrap();
161        assert_eq!(decoded_len, encoded_len);
162        assert_eq!(decoded, filter);
163
164        let (decoded_len, decoded) =
165          LeaveMessage::<SmolStr>::decode_from_reader(&mut std::io::Cursor::new(&buf)).unwrap();
166        assert_eq!(decoded_len, encoded_len);
167        assert_eq!(decoded, filter);
168
169        let (decoded_len, decoded) =
170          LeaveMessage::<SmolStr>::decode_from_async_reader(&mut futures::io::Cursor::new(&buf))
171            .await
172            .unwrap();
173        assert_eq!(decoded_len, encoded_len);
174        assert_eq!(decoded, filter);
175      }
176    });
177  }
178}