1use byteorder::{ByteOrder, NetworkEndian};
2
3use super::{LamportTime, LamportTimeTransformError, Transformable};
4
5#[viewit::viewit(setters(prefix = "with"))]
8#[derive(Debug, Clone, Eq, PartialEq)]
9#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
10pub struct LeaveMessage<I> {
11 #[viewit(
13 getter(const, attrs(doc = "Returns the lamport time for this message")),
14 setter(
15 const,
16 attrs(doc = "Sets the lamport time for this message (Builder pattern)")
17 )
18 )]
19 ltime: LamportTime,
20 #[viewit(
22 getter(const, style = "ref", attrs(doc = "Returns the node")),
23 setter(attrs(doc = "Sets the node (Builder pattern)"))
24 )]
25 id: I,
26
27 #[viewit(
29 getter(const, style = "ref", attrs(doc = "Returns if prune or not")),
30 setter(attrs(doc = "Sets prune or not (Builder pattern)"))
31 )]
32 prune: bool,
33}
34
35#[derive(thiserror::Error)]
37pub enum LeaveMessageTransformError<I: Transformable> {
38 #[error("not enough bytes to decode LeaveMessage")]
40 NotEnoughBytes,
41 #[error("encode buffer too small")]
43 EncodeBufferTooSmall,
44 #[error(transparent)]
46 Id(I::Error),
47 #[error(transparent)]
49 LamportTime(#[from] LamportTimeTransformError),
50}
51
52impl<I: Transformable> core::fmt::Debug for LeaveMessageTransformError<I> {
53 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
54 write!(f, "{}", self)
55 }
56}
57
58impl<I> Transformable for LeaveMessage<I>
59where
60 I: Transformable,
61{
62 type Error = LeaveMessageTransformError<I>;
63
64 fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
65 let encoded_len = self.encoded_len();
66 if dst.len() < encoded_len {
67 return Err(Self::Error::EncodeBufferTooSmall);
68 }
69
70 let mut offset = 0;
71 NetworkEndian::write_u32(&mut dst[offset..], encoded_len as u32);
72 offset += 4;
73 dst[offset] = self.prune as u8;
74 offset += 1;
75 offset += self.ltime.encode(&mut dst[offset..])?;
76 offset += self
77 .id
78 .encode(&mut dst[offset..])
79 .map_err(Self::Error::Id)?;
80
81 debug_assert_eq!(
82 offset, encoded_len,
83 "expect write {} bytes, but actual write {} bytes",
84 encoded_len, offset
85 );
86
87 Ok(offset)
88 }
89
90 fn encoded_len(&self) -> usize {
91 4 + 1 + self.id.encoded_len() + self.ltime.encoded_len()
92 }
93
94 fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
95 where
96 Self: Sized,
97 {
98 if src.len() < 5 {
99 return Err(Self::Error::NotEnoughBytes);
100 }
101
102 let len = NetworkEndian::read_u32(&src[0..4]) as usize;
103 if src.len() + 5 < len {
104 return Err(Self::Error::NotEnoughBytes);
105 }
106
107 let mut offset = 4;
108 let prune = src[offset] != 0;
109 offset += 1;
110
111 let (read, ltime) = LamportTime::decode(&src[offset..])?;
112 offset += read;
113
114 let (read, id) = I::decode(&src[offset..]).map_err(Self::Error::Id)?;
115 offset += read;
116
117 debug_assert_eq!(
118 offset, len,
119 "expect read {} bytes, but actual read {} bytes",
120 len, offset
121 );
122
123 Ok((offset, Self { ltime, id, prune }))
124 }
125}
126
127#[cfg(test)]
128mod tests {
129
130 use rand::{distributions::Alphanumeric, thread_rng, Rng};
131 use smol_str::SmolStr;
132
133 use super::*;
134
135 impl LeaveMessage<SmolStr> {
136 fn random(size: usize) -> Self {
137 let id = thread_rng()
138 .sample_iter(Alphanumeric)
139 .take(size)
140 .collect::<Vec<u8>>();
141 let id = String::from_utf8(id).unwrap().into();
142
143 Self {
144 ltime: LamportTime::random(),
145 id,
146 prune: thread_rng().gen(),
147 }
148 }
149 }
150
151 #[test]
152 fn test_leave_message_transform() {
153 futures::executor::block_on(async {
154 for i in 0..100 {
155 let filter = LeaveMessage::random(i);
156 let mut buf = vec![0; filter.encoded_len()];
157 let encoded_len = filter.encode(&mut buf).unwrap();
158 assert_eq!(encoded_len, filter.encoded_len());
159
160 let (decoded_len, decoded) = LeaveMessage::<SmolStr>::decode(&buf).unwrap();
161 assert_eq!(decoded_len, encoded_len);
162 assert_eq!(decoded, filter);
163
164 let (decoded_len, decoded) =
165 LeaveMessage::<SmolStr>::decode_from_reader(&mut std::io::Cursor::new(&buf)).unwrap();
166 assert_eq!(decoded_len, encoded_len);
167 assert_eq!(decoded, filter);
168
169 let (decoded_len, decoded) =
170 LeaveMessage::<SmolStr>::decode_from_async_reader(&mut futures::io::Cursor::new(&buf))
171 .await
172 .unwrap();
173 assert_eq!(decoded_len, encoded_len);
174 assert_eq!(decoded, filter);
175 }
176 });
177 }
178}