1use super::{
2 Data, DataRef, DecodeError, DelegateVersion, EncodeError, Meta, ProtocolVersion, WireType, merge,
3 skip,
4};
5
6use nodecraft::{CheapClone, Node};
7
8#[viewit::viewit(getters(vis_all = "pub"), setters(vis_all = "pub", prefix = "with"))]
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11#[cfg_attr(any(feature = "arbitrary", test), derive(arbitrary::Arbitrary))]
12pub struct Alive<I, A> {
13 #[viewit(
15 getter(const, attrs(doc = "Returns the incarnation of the alive message")),
16 setter(
17 const,
18 attrs(doc = "Sets the incarnation of the alive message (Builder pattern)")
19 )
20 )]
21 incarnation: u32,
22 #[viewit(
24 getter(
25 const,
26 style = "ref",
27 attrs(doc = "Returns the meta of the alive message")
28 ),
29 setter(attrs(doc = "Sets the meta of the alive message (Builder pattern)"))
30 )]
31 meta: Meta,
32 #[viewit(
34 getter(
35 const,
36 style = "ref",
37 attrs(doc = "Returns the node of the alive message")
38 ),
39 setter(attrs(doc = "Sets the node of the alive message (Builder pattern)"))
40 )]
41 node: Node<I, A>,
42 #[viewit(
44 getter(
45 const,
46 attrs(doc = "Returns the protocol version of the alive message is speaking")
47 ),
48 setter(
49 const,
50 attrs(doc = "Sets the protocol version of the alive message is speaking (Builder pattern)")
51 )
52 )]
53 protocol_version: ProtocolVersion,
54 #[viewit(
56 getter(
57 const,
58 attrs(doc = "Returns the delegate version of the alive message is speaking")
59 ),
60 setter(
61 const,
62 attrs(doc = "Sets the delegate version of the alive message is speaking (Builder pattern)")
63 )
64 )]
65 delegate_version: DelegateVersion,
66}
67
68const INCARNATION_TAG: u8 = 1;
69const META_TAG: u8 = 2;
70const NODE_TAG: u8 = 3;
71const PROTOCOL_VERSION_TAG: u8 = 4;
72const DELEGATE_VERSION_TAG: u8 = 5;
73
74const INCARNATION_BYTE: u8 = merge(WireType::Varint, INCARNATION_TAG);
75const META_BYTE: u8 = merge(WireType::LengthDelimited, META_TAG);
76const PROTOCOL_VERSION_BYTE: u8 = merge(WireType::Byte, PROTOCOL_VERSION_TAG);
77const DELEGATE_VERSION_BYTE: u8 = merge(WireType::Byte, DELEGATE_VERSION_TAG);
78
79impl<I, A> Alive<I, A> {
80 #[inline]
81 const fn node_byte() -> u8
82 where
83 I: super::Data,
84 A: super::Data,
85 {
86 merge(WireType::LengthDelimited, NODE_TAG)
87 }
88}
89
90impl<I, A> Data for Alive<I, A>
91where
92 I: Data,
93 A: Data,
94{
95 type Ref<'a> = AliveRef<'a, I::Ref<'a>, A::Ref<'a>>;
96
97 fn from_ref(val: Self::Ref<'_>) -> Result<Self, DecodeError>
98 where
99 Self: Sized,
100 {
101 Meta::from_ref(val.meta)
102 .and_then(|meta| Node::<I, A>::from_ref(val.node).map(|node| (meta, node)))
103 .map(|(meta, node)| Self {
104 incarnation: val.incarnation,
105 meta,
106 node,
107 protocol_version: val.protocol_version,
108 delegate_version: val.delegate_version,
109 })
110 }
111
112 fn encoded_len(&self) -> usize {
113 let mut len = 1 + self.incarnation.encoded_len();
114 let meta_len = self.meta.len();
115 if meta_len != 0 {
116 len += 1 + self.meta.encoded_len_with_length_delimited();
117 }
118 len += 1 + self.node.encoded_len_with_length_delimited();
119 len += 1 + 1;
120 len += 1 + 1;
121 len
122 }
123
124 fn encode(&self, buf: &mut [u8]) -> Result<usize, EncodeError> {
125 macro_rules! bail {
126 ($this:ident($offset:expr, $len:ident)) => {
127 if $offset >= $len {
128 return Err(EncodeError::insufficient_buffer($this.encoded_len(), $len).into());
129 }
130 };
131 }
132
133 let len = buf.len();
134 let mut offset = 0;
135
136 bail!(self(offset, len));
137 buf[offset] = INCARNATION_BYTE;
138 offset += 1;
139 offset += self.incarnation.encode(&mut buf[offset..])?;
140
141 let meta_len = self.meta.len();
142 if meta_len != 0 {
143 bail!(self(offset, len));
144 buf[offset] = META_BYTE;
145 offset += 1;
146 offset += self
147 .meta
148 .encode_length_delimited(&mut buf[offset..])
149 .map_err(|e| e.update(self.encoded_len(), len))?;
150 }
151
152 bail!(self(offset, len));
153 buf[offset] = Self::node_byte();
154 offset += 1;
155 offset += self
156 .node
157 .encode_length_delimited(&mut buf[offset..])
158 .map_err(|e| e.update(self.encoded_len(), len))?;
159
160 bail!(self(offset, len));
161 buf[offset] = PROTOCOL_VERSION_BYTE;
162 offset += 1;
163
164 bail!(self(offset, len));
165 buf[offset] = self.protocol_version.into();
166 offset += 1;
167
168 bail!(self(offset, len));
169 buf[offset] = DELEGATE_VERSION_BYTE;
170 offset += 1;
171
172 bail!(self(offset, len));
173 buf[offset] = self.delegate_version.into();
174 offset += 1;
175
176 #[cfg(debug_assertions)]
177 super::debug_assert_write_eq::<Self>(offset, self.encoded_len());
178
179 Ok(offset)
180 }
181}
182
183impl<I, A> Alive<I, A> {
184 #[inline]
186 pub const fn new(incarnation: u32, node: Node<I, A>) -> Self {
187 Self {
188 incarnation,
189 meta: Meta::empty(),
190 node,
191 protocol_version: ProtocolVersion::V1,
192 delegate_version: DelegateVersion::V1,
193 }
194 }
195
196 #[inline]
198 pub fn set_incarnation(&mut self, incarnation: u32) -> &mut Self {
199 self.incarnation = incarnation;
200 self
201 }
202
203 #[inline]
205 pub fn set_meta(&mut self, meta: Meta) -> &mut Self {
206 self.meta = meta;
207 self
208 }
209
210 #[inline]
212 pub fn set_node(&mut self, node: Node<I, A>) -> &mut Self {
213 self.node = node;
214 self
215 }
216
217 #[inline]
219 pub fn set_protocol_version(&mut self, protocol_version: ProtocolVersion) -> &mut Self {
220 self.protocol_version = protocol_version;
221 self
222 }
223
224 #[inline]
226 pub fn set_delegate_version(&mut self, delegate_version: DelegateVersion) -> &mut Self {
227 self.delegate_version = delegate_version;
228 self
229 }
230}
231
232impl<I: CheapClone, A: CheapClone> CheapClone for Alive<I, A> {
233 fn cheap_clone(&self) -> Self {
234 Self {
235 incarnation: self.incarnation,
236 meta: self.meta.clone(),
237 node: self.node.cheap_clone(),
238 protocol_version: self.protocol_version,
239 delegate_version: self.delegate_version,
240 }
241 }
242}
243
244#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
246pub struct AliveRef<'a, I, A> {
247 incarnation: u32,
248 meta: &'a [u8],
249 node: Node<I, A>,
250 protocol_version: ProtocolVersion,
251 delegate_version: DelegateVersion,
252}
253
254impl<'a, I, A> AliveRef<'a, I, A> {
255 #[inline]
257 pub const fn incarnation(&self) -> u32 {
258 self.incarnation
259 }
260
261 #[inline]
263 pub const fn meta(&self) -> &'a [u8] {
264 self.meta
265 }
266
267 #[inline]
269 pub const fn node(&self) -> &Node<I, A> {
270 &self.node
271 }
272
273 #[inline]
275 pub const fn protocol_version(&self) -> ProtocolVersion {
276 self.protocol_version
277 }
278
279 #[inline]
281 pub const fn delegate_version(&self) -> DelegateVersion {
282 self.delegate_version
283 }
284}
285
286impl<'a, I, A> DataRef<'a, Alive<I, A>> for AliveRef<'a, I::Ref<'a>, A::Ref<'a>>
287where
288 I: Data,
289 A: Data,
290{
291 fn decode(src: &'a [u8]) -> Result<(usize, Self), DecodeError>
292 where
293 Self: Sized,
294 {
295 let mut offset = 0;
296 let mut incarnation = None;
297 let mut meta = None;
298 let mut node = None;
299 let mut protocol_version = None;
300 let mut delegate_version = None;
301
302 while offset < src.len() {
303 match src[offset] {
304 INCARNATION_BYTE => {
305 if incarnation.is_some() {
306 return Err(DecodeError::duplicate_field(
307 "Alive",
308 "incarnation",
309 INCARNATION_TAG,
310 ));
311 }
312 offset += 1;
313 let (bytes_read, value) = <u32 as DataRef<u32>>::decode(&src[offset..])?;
314 offset += bytes_read;
315 incarnation = Some(value);
316 }
317 META_BYTE => {
318 if meta.is_some() {
319 return Err(DecodeError::duplicate_field("Alive", "meta", META_TAG));
320 }
321 offset += 1;
322
323 let (readed, data) = <&[u8] as DataRef<Meta>>::decode_length_delimited(&src[offset..])?;
324 offset += readed;
325 meta = Some(data);
326 }
327 DELEGATE_VERSION_BYTE => {
328 if delegate_version.is_some() {
329 return Err(DecodeError::duplicate_field(
330 "Alive",
331 "delegate_version",
332 DELEGATE_VERSION_TAG,
333 ));
334 }
335 offset += 1;
336
337 if offset >= src.len() {
338 return Err(DecodeError::buffer_underflow());
339 }
340 delegate_version = Some(src[offset].into());
341 offset += 1;
342 }
343 PROTOCOL_VERSION_BYTE => {
344 if protocol_version.is_some() {
345 return Err(DecodeError::duplicate_field(
346 "Alive",
347 "protocol_version",
348 PROTOCOL_VERSION_TAG,
349 ));
350 }
351 offset += 1;
352
353 if offset >= src.len() {
354 return Err(DecodeError::buffer_underflow());
355 }
356 protocol_version = Some(src[offset].into());
357 offset += 1;
358 }
359 b if b == Alive::<I, A>::node_byte() => {
360 if node.is_some() {
361 return Err(DecodeError::duplicate_field("Alive", "node", NODE_TAG));
362 }
363 offset += 1;
364
365 let (readed, data) =
366 <Node<I::Ref<'_>, A::Ref<'_>> as DataRef<Node<I, A>>>::decode_length_delimited(
367 &src[offset..],
368 )?;
369
370 offset += readed;
371 node = Some(data);
372 }
373 _ => offset += skip("Alive", &src[offset..])?,
374 }
375 }
376
377 Ok((
378 offset,
379 Self {
380 incarnation: incarnation
381 .ok_or_else(|| DecodeError::missing_field("Alive", "incarnation"))?,
382 meta: meta.unwrap_or_default(),
383 node: node.ok_or_else(|| DecodeError::missing_field("Alive", "node"))?,
384 protocol_version: protocol_version.unwrap_or_default(),
385 delegate_version: delegate_version.unwrap_or_default(),
386 },
387 ))
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use std::net::SocketAddr;
394
395 use arbitrary::{Arbitrary, Unstructured};
396
397 use super::*;
398
399 #[test]
400 fn test_access() {
401 let mut data = vec![0; 1024];
402 rand::fill(&mut data[..]);
403 let mut data = Unstructured::new(&data);
404
405 let mut alive = Alive::<String, SocketAddr>::arbitrary(&mut data).unwrap();
406 alive.set_incarnation(1);
407 assert_eq!(alive.incarnation(), 1);
408 alive.set_meta(Meta::empty());
409 assert_eq!(alive.meta(), &Meta::empty());
410 alive.set_node(Node::new("a".into(), "127.0.0.1:8081".parse().unwrap()));
411 assert_eq!(alive.node().id(), "a");
412 alive.set_protocol_version(ProtocolVersion::V1);
413 assert_eq!(alive.protocol_version(), ProtocolVersion::V1);
414 alive.set_delegate_version(DelegateVersion::V1);
415 assert_eq!(alive.delegate_version(), DelegateVersion::V1);
416 }
417}