memberlist_proto/
alive.rs

1use super::{
2  Data, DataRef, DecodeError, DelegateVersion, EncodeError, Meta, ProtocolVersion, WireType, merge,
3  skip,
4};
5
6use nodecraft::{CheapClone, Node};
7
8/// Alive message
9#[viewit::viewit(getters(vis_all = "pub"), setters(vis_all = "pub", prefix = "with"))]
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11#[cfg_attr(any(feature = "arbitrary", test), derive(arbitrary::Arbitrary))]
12pub struct Alive<I, A> {
13  /// The incarnation of the alive message
14  #[viewit(
15    getter(const, attrs(doc = "Returns the incarnation of the alive message")),
16    setter(
17      const,
18      attrs(doc = "Sets the incarnation of the alive message (Builder pattern)")
19    )
20  )]
21  incarnation: u32,
22  /// The meta of the alive message
23  #[viewit(
24    getter(
25      const,
26      style = "ref",
27      attrs(doc = "Returns the meta of the alive message")
28    ),
29    setter(attrs(doc = "Sets the meta of the alive message (Builder pattern)"))
30  )]
31  meta: Meta,
32  /// The node of the alive message
33  #[viewit(
34    getter(
35      const,
36      style = "ref",
37      attrs(doc = "Returns the node of the alive message")
38    ),
39    setter(attrs(doc = "Sets the node of the alive message (Builder pattern)"))
40  )]
41  node: Node<I, A>,
42  /// The protocol version of the alive message is speaking
43  #[viewit(
44    getter(
45      const,
46      attrs(doc = "Returns the protocol version of the alive message is speaking")
47    ),
48    setter(
49      const,
50      attrs(doc = "Sets the protocol version of the alive message is speaking (Builder pattern)")
51    )
52  )]
53  protocol_version: ProtocolVersion,
54  /// The delegate version of the alive message is speaking
55  #[viewit(
56    getter(
57      const,
58      attrs(doc = "Returns the delegate version of the alive message is speaking")
59    ),
60    setter(
61      const,
62      attrs(doc = "Sets the delegate version of the alive message is speaking (Builder pattern)")
63    )
64  )]
65  delegate_version: DelegateVersion,
66}
67
68const INCARNATION_TAG: u8 = 1;
69const META_TAG: u8 = 2;
70const NODE_TAG: u8 = 3;
71const PROTOCOL_VERSION_TAG: u8 = 4;
72const DELEGATE_VERSION_TAG: u8 = 5;
73
74const INCARNATION_BYTE: u8 = merge(WireType::Varint, INCARNATION_TAG);
75const META_BYTE: u8 = merge(WireType::LengthDelimited, META_TAG);
76const PROTOCOL_VERSION_BYTE: u8 = merge(WireType::Byte, PROTOCOL_VERSION_TAG);
77const DELEGATE_VERSION_BYTE: u8 = merge(WireType::Byte, DELEGATE_VERSION_TAG);
78
79impl<I, A> Alive<I, A> {
80  #[inline]
81  const fn node_byte() -> u8
82  where
83    I: super::Data,
84    A: super::Data,
85  {
86    merge(WireType::LengthDelimited, NODE_TAG)
87  }
88}
89
90impl<I, A> Data for Alive<I, A>
91where
92  I: Data,
93  A: Data,
94{
95  type Ref<'a> = AliveRef<'a, I::Ref<'a>, A::Ref<'a>>;
96
97  fn from_ref(val: Self::Ref<'_>) -> Result<Self, DecodeError>
98  where
99    Self: Sized,
100  {
101    Meta::from_ref(val.meta)
102      .and_then(|meta| Node::<I, A>::from_ref(val.node).map(|node| (meta, node)))
103      .map(|(meta, node)| Self {
104        incarnation: val.incarnation,
105        meta,
106        node,
107        protocol_version: val.protocol_version,
108        delegate_version: val.delegate_version,
109      })
110  }
111
112  fn encoded_len(&self) -> usize {
113    let mut len = 1 + self.incarnation.encoded_len();
114    let meta_len = self.meta.len();
115    if meta_len != 0 {
116      len += 1 + self.meta.encoded_len_with_length_delimited();
117    }
118    len += 1 + self.node.encoded_len_with_length_delimited();
119    len += 1 + 1;
120    len += 1 + 1;
121    len
122  }
123
124  fn encode(&self, buf: &mut [u8]) -> Result<usize, EncodeError> {
125    macro_rules! bail {
126      ($this:ident($offset:expr, $len:ident)) => {
127        if $offset >= $len {
128          return Err(EncodeError::insufficient_buffer($this.encoded_len(), $len).into());
129        }
130      };
131    }
132
133    let len = buf.len();
134    let mut offset = 0;
135
136    bail!(self(offset, len));
137    buf[offset] = INCARNATION_BYTE;
138    offset += 1;
139    offset += self.incarnation.encode(&mut buf[offset..])?;
140
141    let meta_len = self.meta.len();
142    if meta_len != 0 {
143      bail!(self(offset, len));
144      buf[offset] = META_BYTE;
145      offset += 1;
146      offset += self
147        .meta
148        .encode_length_delimited(&mut buf[offset..])
149        .map_err(|e| e.update(self.encoded_len(), len))?;
150    }
151
152    bail!(self(offset, len));
153    buf[offset] = Self::node_byte();
154    offset += 1;
155    offset += self
156      .node
157      .encode_length_delimited(&mut buf[offset..])
158      .map_err(|e| e.update(self.encoded_len(), len))?;
159
160    bail!(self(offset, len));
161    buf[offset] = PROTOCOL_VERSION_BYTE;
162    offset += 1;
163
164    bail!(self(offset, len));
165    buf[offset] = self.protocol_version.into();
166    offset += 1;
167
168    bail!(self(offset, len));
169    buf[offset] = DELEGATE_VERSION_BYTE;
170    offset += 1;
171
172    bail!(self(offset, len));
173    buf[offset] = self.delegate_version.into();
174    offset += 1;
175
176    #[cfg(debug_assertions)]
177    super::debug_assert_write_eq::<Self>(offset, self.encoded_len());
178
179    Ok(offset)
180  }
181}
182
183impl<I, A> Alive<I, A> {
184  /// Construct a new alive message with the given incarnation, meta, node, protocol version and delegate version.
185  #[inline]
186  pub const fn new(incarnation: u32, node: Node<I, A>) -> Self {
187    Self {
188      incarnation,
189      meta: Meta::empty(),
190      node,
191      protocol_version: ProtocolVersion::V1,
192      delegate_version: DelegateVersion::V1,
193    }
194  }
195
196  /// Sets the incarnation of the alive message.
197  #[inline]
198  pub fn set_incarnation(&mut self, incarnation: u32) -> &mut Self {
199    self.incarnation = incarnation;
200    self
201  }
202
203  /// Sets the meta of the alive message.
204  #[inline]
205  pub fn set_meta(&mut self, meta: Meta) -> &mut Self {
206    self.meta = meta;
207    self
208  }
209
210  /// Sets the node of the alive message.
211  #[inline]
212  pub fn set_node(&mut self, node: Node<I, A>) -> &mut Self {
213    self.node = node;
214    self
215  }
216
217  /// Set the protocol version of the alive message is speaking.
218  #[inline]
219  pub fn set_protocol_version(&mut self, protocol_version: ProtocolVersion) -> &mut Self {
220    self.protocol_version = protocol_version;
221    self
222  }
223
224  /// Set the delegate version of the alive message is speaking.
225  #[inline]
226  pub fn set_delegate_version(&mut self, delegate_version: DelegateVersion) -> &mut Self {
227    self.delegate_version = delegate_version;
228    self
229  }
230}
231
232impl<I: CheapClone, A: CheapClone> CheapClone for Alive<I, A> {
233  fn cheap_clone(&self) -> Self {
234    Self {
235      incarnation: self.incarnation,
236      meta: self.meta.clone(),
237      node: self.node.cheap_clone(),
238      protocol_version: self.protocol_version,
239      delegate_version: self.delegate_version,
240    }
241  }
242}
243
244/// The reference type of [`Alive`].
245#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
246pub struct AliveRef<'a, I, A> {
247  incarnation: u32,
248  meta: &'a [u8],
249  node: Node<I, A>,
250  protocol_version: ProtocolVersion,
251  delegate_version: DelegateVersion,
252}
253
254impl<'a, I, A> AliveRef<'a, I, A> {
255  /// Returns the incarnation of the alive message.
256  #[inline]
257  pub const fn incarnation(&self) -> u32 {
258    self.incarnation
259  }
260
261  /// Returns the meta of the alive message.
262  #[inline]
263  pub const fn meta(&self) -> &'a [u8] {
264    self.meta
265  }
266
267  /// Returns the node of the alive message.
268  #[inline]
269  pub const fn node(&self) -> &Node<I, A> {
270    &self.node
271  }
272
273  /// Returns the protocol version of the alive message is speaking.
274  #[inline]
275  pub const fn protocol_version(&self) -> ProtocolVersion {
276    self.protocol_version
277  }
278
279  /// Returns the delegate version of the alive message is speaking.
280  #[inline]
281  pub const fn delegate_version(&self) -> DelegateVersion {
282    self.delegate_version
283  }
284}
285
286impl<'a, I, A> DataRef<'a, Alive<I, A>> for AliveRef<'a, I::Ref<'a>, A::Ref<'a>>
287where
288  I: Data,
289  A: Data,
290{
291  fn decode(src: &'a [u8]) -> Result<(usize, Self), DecodeError>
292  where
293    Self: Sized,
294  {
295    let mut offset = 0;
296    let mut incarnation = None;
297    let mut meta = None;
298    let mut node = None;
299    let mut protocol_version = None;
300    let mut delegate_version = None;
301
302    while offset < src.len() {
303      match src[offset] {
304        INCARNATION_BYTE => {
305          if incarnation.is_some() {
306            return Err(DecodeError::duplicate_field(
307              "Alive",
308              "incarnation",
309              INCARNATION_TAG,
310            ));
311          }
312          offset += 1;
313          let (bytes_read, value) = <u32 as DataRef<u32>>::decode(&src[offset..])?;
314          offset += bytes_read;
315          incarnation = Some(value);
316        }
317        META_BYTE => {
318          if meta.is_some() {
319            return Err(DecodeError::duplicate_field("Alive", "meta", META_TAG));
320          }
321          offset += 1;
322
323          let (readed, data) = <&[u8] as DataRef<Meta>>::decode_length_delimited(&src[offset..])?;
324          offset += readed;
325          meta = Some(data);
326        }
327        DELEGATE_VERSION_BYTE => {
328          if delegate_version.is_some() {
329            return Err(DecodeError::duplicate_field(
330              "Alive",
331              "delegate_version",
332              DELEGATE_VERSION_TAG,
333            ));
334          }
335          offset += 1;
336
337          if offset >= src.len() {
338            return Err(DecodeError::buffer_underflow());
339          }
340          delegate_version = Some(src[offset].into());
341          offset += 1;
342        }
343        PROTOCOL_VERSION_BYTE => {
344          if protocol_version.is_some() {
345            return Err(DecodeError::duplicate_field(
346              "Alive",
347              "protocol_version",
348              PROTOCOL_VERSION_TAG,
349            ));
350          }
351          offset += 1;
352
353          if offset >= src.len() {
354            return Err(DecodeError::buffer_underflow());
355          }
356          protocol_version = Some(src[offset].into());
357          offset += 1;
358        }
359        b if b == Alive::<I, A>::node_byte() => {
360          if node.is_some() {
361            return Err(DecodeError::duplicate_field("Alive", "node", NODE_TAG));
362          }
363          offset += 1;
364
365          let (readed, data) =
366            <Node<I::Ref<'_>, A::Ref<'_>> as DataRef<Node<I, A>>>::decode_length_delimited(
367              &src[offset..],
368            )?;
369
370          offset += readed;
371          node = Some(data);
372        }
373        _ => offset += skip("Alive", &src[offset..])?,
374      }
375    }
376
377    Ok((
378      offset,
379      Self {
380        incarnation: incarnation
381          .ok_or_else(|| DecodeError::missing_field("Alive", "incarnation"))?,
382        meta: meta.unwrap_or_default(),
383        node: node.ok_or_else(|| DecodeError::missing_field("Alive", "node"))?,
384        protocol_version: protocol_version.unwrap_or_default(),
385        delegate_version: delegate_version.unwrap_or_default(),
386      },
387    ))
388  }
389}
390
391#[cfg(test)]
392mod tests {
393  use std::net::SocketAddr;
394
395  use arbitrary::{Arbitrary, Unstructured};
396
397  use super::*;
398
399  #[test]
400  fn test_access() {
401    let mut data = vec![0; 1024];
402    rand::fill(&mut data[..]);
403    let mut data = Unstructured::new(&data);
404
405    let mut alive = Alive::<String, SocketAddr>::arbitrary(&mut data).unwrap();
406    alive.set_incarnation(1);
407    assert_eq!(alive.incarnation(), 1);
408    alive.set_meta(Meta::empty());
409    assert_eq!(alive.meta(), &Meta::empty());
410    alive.set_node(Node::new("a".into(), "127.0.0.1:8081".parse().unwrap()));
411    assert_eq!(alive.node().id(), "a");
412    alive.set_protocol_version(ProtocolVersion::V1);
413    assert_eq!(alive.protocol_version(), ProtocolVersion::V1);
414    alive.set_delegate_version(DelegateVersion::V1);
415    assert_eq!(alive.delegate_version(), DelegateVersion::V1);
416  }
417}