memberlist_proto/push_pull/
state.rs

1use nodecraft::{CheapClone, Node};
2
3use crate::{
4  Data, DataRef, DecodeError, DelegateVersion, EncodeError, Meta, ProtocolVersion, State, WireType,
5  merge, skip,
6};
7
8/// Push node state is the state push to the remote server.
9#[viewit::viewit(getters(vis_all = "pub"), setters(vis_all = "pub", prefix = "with"))]
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11#[cfg_attr(any(feature = "arbitrary", test), derive(arbitrary::Arbitrary))]
12#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
13pub struct PushNodeState<I, A> {
14  /// The id of the push node state.
15  #[viewit(
16    getter(
17      const,
18      style = "ref",
19      attrs(doc = "Returns the id of the push node state")
20    ),
21    setter(attrs(doc = "Sets the id of the push node state (Builder pattern)"))
22  )]
23  id: I,
24  /// The address of the push node state.
25  #[viewit(
26    getter(
27      const,
28      rename = "address",
29      style = "ref",
30      attrs(doc = "Returns the address of the push node state")
31    ),
32    setter(
33      rename = "with_address",
34      attrs(doc = "Sets the address of the push node state (Builder pattern)")
35    )
36  )]
37  addr: A,
38  /// Metadata from the delegate for this push node state.
39  #[viewit(
40    getter(
41      const,
42      style = "ref",
43      attrs(doc = "Returns the meta of the push node state")
44    ),
45    setter(attrs(doc = "Sets the meta of the push node state (Builder pattern)"))
46  )]
47  meta: Meta,
48  /// The incarnation of the push node state.
49  #[viewit(
50    getter(const, attrs(doc = "Returns the incarnation of the push node state")),
51    setter(
52      const,
53      attrs(doc = "Sets the incarnation of the push node state (Builder pattern)")
54    )
55  )]
56  incarnation: u32,
57  /// The state of the push node state.
58  #[viewit(
59    getter(const, attrs(doc = "Returns the state of the push node state")),
60    setter(
61      const,
62      attrs(doc = "Sets the state of the push node state (Builder pattern)")
63    )
64  )]
65  state: State,
66  /// The protocol version of the push node state is speaking.
67  #[viewit(
68    getter(
69      const,
70      attrs(doc = "Returns the protocol version of the push node state is speaking")
71    ),
72    setter(
73      const,
74      attrs(
75        doc = "Sets the protocol version of the push node state is speaking (Builder pattern)"
76      )
77    )
78  )]
79  protocol_version: ProtocolVersion,
80  /// The delegate version of the push node state is speaking.
81  #[viewit(
82    getter(
83      const,
84      attrs(doc = "Returns the delegate version of the push node state is speaking")
85    ),
86    setter(
87      const,
88      attrs(
89        doc = "Sets the delegate version of the push node state is speaking (Builder pattern)"
90      )
91    )
92  )]
93  delegate_version: DelegateVersion,
94}
95
96const ID_TAG: u8 = 1;
97const ADDR_TAG: u8 = 2;
98const META_TAG: u8 = 3;
99const META_BYTE: u8 = merge(WireType::LengthDelimited, META_TAG);
100const INCARNATION_TAG: u8 = 4;
101const INCARNATION_BYTE: u8 = merge(WireType::Varint, INCARNATION_TAG);
102const STATE_TAG: u8 = 5;
103const STATE_BYTE: u8 = merge(WireType::Byte, STATE_TAG);
104const PROTOCOL_VERSION_TAG: u8 = 6;
105const PROTOCOL_VERSION_BYTE: u8 = merge(WireType::Byte, PROTOCOL_VERSION_TAG);
106const DELEGATE_VERSION_TAG: u8 = 7;
107const DELEGATE_VERSION_BYTE: u8 = merge(WireType::Byte, DELEGATE_VERSION_TAG);
108
109impl<I, A> PushNodeState<I, A>
110where
111  I: Data,
112  A: Data,
113{
114  const fn id_byte() -> u8 {
115    merge(I::WIRE_TYPE, ID_TAG)
116  }
117
118  const fn addr_byte() -> u8 {
119    merge(A::WIRE_TYPE, ADDR_TAG)
120  }
121}
122
123/// Push node state is the state push to the remote server.
124#[viewit::viewit(getters(vis_all = "pub"), setters(skip))]
125#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
126pub struct PushNodeStateRef<'a, I, A> {
127  /// The id of the push node state.
128  #[viewit(getter(
129    const,
130    style = "ref",
131    attrs(doc = "Returns the id of the push node state")
132  ))]
133  id: I,
134  /// The address of the push node state.
135  #[viewit(getter(
136    const,
137    rename = "address",
138    style = "ref",
139    attrs(doc = "Returns the address of the push node state")
140  ))]
141  addr: A,
142  /// Metadata from the delegate for this push node state.
143  #[viewit(getter(
144    const,
145    style = "ref",
146    attrs(doc = "Returns the meta of the push node state")
147  ))]
148  meta: &'a [u8],
149  /// The incarnation of the push node state.
150  #[viewit(getter(const, attrs(doc = "Returns the incarnation of the push node state")))]
151  incarnation: u32,
152  /// The state of the push node state.
153  #[viewit(getter(const, attrs(doc = "Returns the state of the push node state")))]
154  state: State,
155  /// The protocol version of the push node state is speaking.
156  #[viewit(getter(
157    const,
158    attrs(doc = "Returns the protocol version of the push node state is speaking")
159  ))]
160  protocol_version: ProtocolVersion,
161  /// The delegate version of the push node state is speaking.
162  #[viewit(getter(
163    const,
164    attrs(doc = "Returns the delegate version of the push node state is speaking")
165  ))]
166  delegate_version: DelegateVersion,
167}
168
169impl<'a, I, A> DataRef<'a, PushNodeState<I, A>> for PushNodeStateRef<'a, I::Ref<'a>, A::Ref<'a>>
170where
171  I: Data,
172  A: Data,
173{
174  fn decode(src: &'a [u8]) -> Result<(usize, Self), crate::DecodeError>
175  where
176    Self: Sized,
177  {
178    let mut offset = 0;
179    let mut id = None;
180    let mut addr = None;
181    let mut meta = None;
182    let mut incarnation = None;
183    let mut state = None;
184    let mut protocol_version = None;
185    let mut delegate_version = None;
186
187    while offset < src.len() {
188      match src[offset] {
189        b if b == PushNodeState::<I, A>::id_byte() => {
190          if id.is_some() {
191            return Err(DecodeError::duplicate_field("PushNodeState", "id", ID_TAG));
192          }
193          offset += 1;
194
195          let (readed, value) = I::Ref::decode_length_delimited(&src[offset..])?;
196          offset += readed;
197          id = Some(value);
198        }
199        b if b == PushNodeState::<I, A>::addr_byte() => {
200          if addr.is_some() {
201            return Err(DecodeError::duplicate_field(
202              "PushNodeState",
203              "addr",
204              ADDR_TAG,
205            ));
206          }
207          offset += 1;
208
209          let (readed, value) = A::Ref::decode_length_delimited(&src[offset..])?;
210          offset += readed;
211          addr = Some(value);
212        }
213        META_BYTE => {
214          if meta.is_some() {
215            return Err(DecodeError::duplicate_field(
216              "PushNodeState",
217              "meta",
218              META_TAG,
219            ));
220          }
221          offset += 1;
222
223          let (readed, value) = <&[u8] as DataRef<Meta>>::decode_length_delimited(&src[offset..])?;
224          offset += readed;
225          meta = Some(value);
226        }
227        INCARNATION_BYTE => {
228          if incarnation.is_some() {
229            return Err(DecodeError::duplicate_field(
230              "PushNodeState",
231              "incarnation",
232              INCARNATION_TAG,
233            ));
234          }
235          offset += 1;
236
237          let (readed, value) = <u32 as DataRef<u32>>::decode(&src[offset..])?;
238          offset += readed;
239          incarnation = Some(value);
240        }
241        STATE_BYTE => {
242          if state.is_some() {
243            return Err(DecodeError::duplicate_field(
244              "PushNodeState",
245              "state",
246              STATE_TAG,
247            ));
248          }
249          offset += 1;
250
251          if offset >= src.len() {
252            return Err(DecodeError::buffer_underflow());
253          }
254          let value = State::from(src[offset]);
255          offset += 1;
256          state = Some(value);
257        }
258        PROTOCOL_VERSION_BYTE => {
259          if protocol_version.is_some() {
260            return Err(DecodeError::duplicate_field(
261              "PushNodeState",
262              "protocol_version",
263              PROTOCOL_VERSION_TAG,
264            ));
265          }
266          offset += 1;
267
268          if offset >= src.len() {
269            return Err(DecodeError::buffer_underflow());
270          }
271          let value = ProtocolVersion::from(src[offset]);
272          offset += 1;
273          protocol_version = Some(value);
274        }
275        DELEGATE_VERSION_BYTE => {
276          if delegate_version.is_some() {
277            return Err(DecodeError::duplicate_field(
278              "PushNodeState",
279              "delegate_version",
280              DELEGATE_VERSION_TAG,
281            ));
282          }
283          offset += 1;
284
285          if offset >= src.len() {
286            return Err(DecodeError::buffer_underflow());
287          }
288          let value = DelegateVersion::from(src[offset]);
289          offset += 1;
290          delegate_version = Some(value);
291        }
292        _ => offset += skip("PushNodeState", &src[offset..])?,
293      }
294    }
295
296    Ok((
297      offset,
298      Self {
299        id: id.ok_or_else(|| DecodeError::missing_field("PushNodeState", "id"))?,
300        addr: addr.ok_or_else(|| DecodeError::missing_field("PushNodeState", "missing addr"))?,
301        meta: meta.unwrap_or_default(),
302        incarnation: incarnation
303          .ok_or_else(|| DecodeError::missing_field("PushNodeState", "incarnation"))?,
304        state: state.ok_or_else(|| DecodeError::missing_field("PushNodeState", "state"))?,
305        protocol_version: protocol_version.unwrap_or_default(),
306        delegate_version: delegate_version.unwrap_or_default(),
307      },
308    ))
309  }
310}
311
312impl<I, A> Data for PushNodeState<I, A>
313where
314  I: Data,
315  A: Data,
316{
317  type Ref<'a> = PushNodeStateRef<'a, I::Ref<'a>, A::Ref<'a>>;
318
319  fn from_ref(val: Self::Ref<'_>) -> Result<Self, DecodeError> {
320    I::from_ref(val.id)
321      .and_then(|id| A::from_ref(val.addr).map(|addr| (id, addr)))
322      .and_then(|(id, addr)| Meta::from_ref(val.meta).map(|meta| (meta, id, addr)))
323      .map(|(meta, id, addr)| Self {
324        id,
325        addr,
326        meta,
327        incarnation: val.incarnation,
328        state: val.state,
329        protocol_version: val.protocol_version,
330        delegate_version: val.delegate_version,
331      })
332  }
333
334  fn encoded_len(&self) -> usize {
335    let mut len = 1 + self.id.encoded_len_with_length_delimited();
336    len += 1 + self.addr.encoded_len_with_length_delimited();
337    let meta_len = self.meta.len();
338    if meta_len != 0 {
339      len += 1 + self.meta.encoded_len_with_length_delimited();
340    }
341    len += 1 + self.incarnation.encoded_len();
342    len += 1 + 1; // state
343    len += 1 + 1; // protocol version
344    len += 1 + 1; // delegate version
345    len
346  }
347
348  fn encode(&self, buf: &mut [u8]) -> Result<usize, EncodeError> {
349    macro_rules! bail {
350      ($this:ident($offset:expr, $len:ident)) => {
351        if $offset >= $len {
352          return Err(EncodeError::insufficient_buffer(self.encoded_len(), $len));
353        }
354      };
355    }
356
357    let mut offset = 0;
358    let len = buf.len();
359    bail!(self(0, len));
360    buf[offset] = Self::id_byte();
361    offset += 1;
362    offset += self
363      .id
364      .encode_length_delimited(&mut buf[offset..])
365      .map_err(|e| e.update(self.encoded_len(), len))?;
366
367    bail!(self(offset, len));
368    buf[offset] = Self::addr_byte();
369    offset += 1;
370    offset += self
371      .addr
372      .encode_length_delimited(&mut buf[offset..])
373      .map_err(|e| e.update(self.encoded_len(), len))?;
374
375    let meta_len = self.meta.len();
376    if meta_len != 0 {
377      bail!(self(offset, len));
378      buf[offset] = META_BYTE;
379      offset += 1;
380      offset += self
381        .meta
382        .encode_length_delimited(&mut buf[offset..])
383        .map_err(|e| e.update(self.encoded_len(), len))?;
384    }
385
386    bail!(self(offset, len));
387    buf[offset] = INCARNATION_BYTE;
388    offset += 1;
389    offset += self
390      .incarnation
391      .encode(&mut buf[offset..])
392      .map_err(|e| e.update(self.encoded_len(), len))?;
393
394    bail!(self(offset, len));
395    buf[offset] = STATE_BYTE;
396    offset += 1;
397
398    bail!(self(offset, len));
399    buf[offset] = self.state.into();
400    offset += 1;
401
402    bail!(self(offset, len));
403    buf[offset] = PROTOCOL_VERSION_BYTE;
404    offset += 1;
405
406    bail!(self(offset, len));
407    buf[offset] = self.protocol_version.into();
408    offset += 1;
409
410    bail!(self(offset, len));
411    buf[offset] = DELEGATE_VERSION_BYTE;
412    offset += 1;
413
414    bail!(self(offset, len));
415    buf[offset] = self.delegate_version.into();
416    offset += 1;
417
418    #[cfg(debug_assertions)]
419    crate::debug_assert_write_eq::<Self>(offset, self.encoded_len());
420    Ok(offset)
421  }
422}
423
424impl<I, A> PushNodeState<I, A> {
425  /// Construct a new push node state with the given id, address and state.
426  #[inline]
427  pub const fn new(incarnation: u32, id: I, addr: A, state: State) -> Self {
428    Self {
429      id,
430      addr,
431      meta: Meta::empty(),
432      incarnation,
433      state,
434      protocol_version: ProtocolVersion::V1,
435      delegate_version: DelegateVersion::V1,
436    }
437  }
438
439  /// Sets the id of the push node state
440  #[inline]
441  pub fn set_id(&mut self, id: I) -> &mut Self {
442    self.id = id;
443    self
444  }
445
446  /// Sets the address of the push node state
447  #[inline]
448  pub fn set_address(&mut self, addr: A) -> &mut Self {
449    self.addr = addr;
450    self
451  }
452
453  /// Sets the meta of the push node state
454  #[inline]
455  pub fn set_meta(&mut self, meta: Meta) -> &mut Self {
456    self.meta = meta;
457    self
458  }
459
460  /// Sets the incarnation of the push node state
461  #[inline]
462  pub fn set_incarnation(&mut self, incarnation: u32) -> &mut Self {
463    self.incarnation = incarnation;
464    self
465  }
466
467  /// Sets the state of the push node state
468  #[inline]
469  pub fn set_state(&mut self, state: State) -> &mut Self {
470    self.state = state;
471    self
472  }
473
474  /// Sets the protocol version of the push node state
475  #[inline]
476  pub fn set_protocol_version(&mut self, protocol_version: ProtocolVersion) -> &mut Self {
477    self.protocol_version = protocol_version;
478    self
479  }
480
481  /// Sets the delegate version of the push node state
482  #[inline]
483  pub fn set_delegate_version(&mut self, delegate_version: DelegateVersion) -> &mut Self {
484    self.delegate_version = delegate_version;
485    self
486  }
487}
488
489impl<I: CheapClone, A: CheapClone> CheapClone for PushNodeState<I, A> {
490  fn cheap_clone(&self) -> Self {
491    Self {
492      id: self.id.cheap_clone(),
493      addr: self.addr.cheap_clone(),
494      meta: self.meta.clone(),
495      incarnation: self.incarnation,
496      state: self.state,
497      protocol_version: self.protocol_version,
498      delegate_version: self.delegate_version,
499    }
500  }
501}
502
503impl<I: CheapClone, A: CheapClone> PushNodeState<I, A> {
504  /// Returns a [`Node`] with the same id and address as this [`PushNodeState`].
505  pub fn node(&self) -> Node<I, A> {
506    Node::new(self.id.cheap_clone(), self.addr.cheap_clone())
507  }
508}