memberlist_proto/
proto.rs

1pub use decoder::*;
2pub use encoder::*;
3pub use error::*;
4pub use message::*;
5
6mod decoder;
7mod encoder;
8mod error;
9
10mod message;
11
12use core::future::Future;
13
14const PAYLOAD_LEN_SIZE: usize = 4;
15const MAX_MESSAGES_PER_BATCH: usize = 255;
16/// - 1 byte for the batch message tag
17/// - 1 byte for the number of messages
18/// - 4 bytes for the total length of the messages in bytes
19const BATCH_OVERHEAD: usize = 1 + 1 + PAYLOAD_LEN_SIZE;
20/// - 1 byte for the label message tag
21/// - 1 byte for the label length
22const LABEL_OVERHEAD: usize = 1 + 1;
23
24/// - 1 byte for the message tag
25/// - 5 bytes for the max u32 varint encoded size
26const MAX_PLAIN_MESSAGE_HEADER_SIZE: usize = 1 + 5;
27
28#[cfg(feature = "encryption")]
29const ENCRYPTED_MESSAGE_HEADER_SIZE: usize = 1 + 1 + PAYLOAD_LEN_SIZE; // 1 byte for the encryption tag, 1 byte algo, 4 bytes for the length
30
31#[cfg(any(
32  feature = "snappy",
33  feature = "zstd",
34  feature = "lz4",
35  feature = "brotli"
36))]
37/// - 1 byte for the compression tag
38/// - 2 byte for the algo
39/// - 4 bytes for the uncompressed data length
40/// - 4 bytes for the compressed data length
41const COMPRESSED_MESSAGE_HEADER_SIZE: usize = 1 + 2 + PAYLOAD_LEN_SIZE + PAYLOAD_LEN_SIZE;
42
43#[cfg(any(
44  feature = "crc32",
45  feature = "xxhash32",
46  feature = "xxhash64",
47  feature = "xxhash3",
48  feature = "murmur3",
49))]
50const CHECKSUMED_MESSAGE_HEADER_SIZE: usize = 1 + 1 + PAYLOAD_LEN_SIZE; // 1 byte for the checksum tag, 1 byte for the algo, 4 bytes for the checksum
51
52const COMPOOUND_MESSAGE_TAG: u8 = 1;
53const PING_MESSAGE_TAG: u8 = 2;
54const INDIRECT_PING_MESSAGE_TAG: u8 = 3;
55const ACK_MESSAGE_TAG: u8 = 4;
56const SUSPECT_MESSAGE_TAG: u8 = 5;
57const ALIVE_MESSAGE_TAG: u8 = 6;
58const DEAD_MESSAGE_TAG: u8 = 7;
59const PUSH_PULL_MESSAGE_TAG: u8 = 8;
60const USER_DATA_MESSAGE_TAG: u8 = 9;
61const NACK_MESSAGE_TAG: u8 = 10;
62const ERROR_RESPONSE_MESSAGE_TAG: u8 = 11;
63const LABELED_MESSAGE_TAG: u8 = 12;
64const CHECKSUMED_MESSAGE_TAG: u8 = 13;
65const COMPRESSED_MESSAGE_TAG: u8 = 14;
66const ENCRYPTED_MESSAGE_TAG: u8 = 15;
67
68#[inline]
69const fn is_plain_message_tag(tag: u8) -> bool {
70  matches!(
71    tag,
72    PING_MESSAGE_TAG
73      | INDIRECT_PING_MESSAGE_TAG
74      | ACK_MESSAGE_TAG
75      | SUSPECT_MESSAGE_TAG
76      | ALIVE_MESSAGE_TAG
77      | DEAD_MESSAGE_TAG
78      | PUSH_PULL_MESSAGE_TAG
79      | USER_DATA_MESSAGE_TAG
80      | NACK_MESSAGE_TAG
81      | ERROR_RESPONSE_MESSAGE_TAG
82  )
83}
84
85/// The reader used in the memberlist proto
86pub trait ProtoReader: Send + Sync {
87  /// Read the payload from the proto reader to the buffer
88  ///
89  /// Returns the number of bytes read
90  fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send;
91
92  /// Read exactly the payload from the proto reader to the buffer
93  fn read_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send;
94
95  /// Peek the payload from the proto reader to the buffer
96  ///
97  /// Returns the number of bytes peeked
98  fn peek(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send;
99
100  /// Peek exactly the payload from the proto reader to the buffer
101  fn peek_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send;
102}
103
104impl<T: ProtoReader> ProtoReader for &mut T {
105  fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send {
106    (**self).read(buf)
107  }
108
109  fn read_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send {
110    (**self).read_exact(buf)
111  }
112
113  fn peek(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send {
114    (**self).peek(buf)
115  }
116
117  fn peek_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send {
118    (**self).peek_exact(buf)
119  }
120}
121
122const _: () = {
123  use futures_util::io::AsyncRead;
124  use peekable::future::AsyncPeekable;
125
126  impl<T: AsyncRead + Send + Sync + Unpin> ProtoReader for AsyncPeekable<T> {
127    async fn peek(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
128      AsyncPeekable::peek(self, buf).await
129    }
130
131    async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
132      futures_util::io::AsyncReadExt::read_exact(self, buf).await
133    }
134
135    async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
136      futures_util::io::AsyncReadExt::read(self, buf).await
137    }
138
139    async fn peek_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
140      AsyncPeekable::peek_exact(self, buf).await
141    }
142  }
143};
144
145/// The writer used in the memberlist proto
146pub trait ProtoWriter: Send + Sync {
147  /// Close the proto writer
148  fn close(&mut self) -> impl Future<Output = std::io::Result<()>> + Send;
149
150  /// Write the payload to the proto writer
151  fn write_all(&mut self, payload: &[u8]) -> impl Future<Output = std::io::Result<()>> + Send;
152
153  /// Flush the proto writer
154  fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> + Send;
155}
156
157impl<W> ProtoWriter for W
158where
159  W: futures_util::AsyncWrite + Send + Sync + Unpin,
160{
161  async fn close(&mut self) -> std::io::Result<()> {
162    futures_util::io::AsyncWriteExt::close(self).await
163  }
164
165  async fn write_all(&mut self, payload: &[u8]) -> std::io::Result<()> {
166    futures_util::io::AsyncWriteExt::write_all(self, payload).await
167  }
168
169  async fn flush(&mut self) -> std::io::Result<()> {
170    futures_util::io::AsyncWriteExt::flush(self).await
171  }
172}
173
174#[cfg(feature = "encryption")]
175struct AeadBuffer<'a> {
176  buf: &'a mut [u8],
177  len: usize,
178}
179
180#[cfg(feature = "encryption")]
181const _: () = {
182  impl<'a> AeadBuffer<'a> {
183    #[inline]
184    const fn new(buf: &'a mut [u8], len: usize) -> Self {
185      Self { buf, len }
186    }
187  }
188
189  impl AsRef<[u8]> for AeadBuffer<'_> {
190    fn as_ref(&self) -> &[u8] {
191      &self.buf[..self.len]
192    }
193  }
194
195  impl AsMut<[u8]> for AeadBuffer<'_> {
196    fn as_mut(&mut self) -> &mut [u8] {
197      &mut self.buf[..self.len]
198    }
199  }
200
201  impl aead::Buffer for AeadBuffer<'_> {
202    fn extend_from_slice(&mut self, other: &[u8]) -> aead::Result<()> {
203      if self.len >= self.buf.len() {
204        return Err(aead::Error);
205      }
206
207      self.buf[self.len..self.len + other.len()].copy_from_slice(other);
208      self.len += other.len();
209      Ok(())
210    }
211
212    fn truncate(&mut self, len: usize) {
213      if len >= self.len {
214        return;
215      }
216      self.buf[len..self.len].fill(0);
217      self.len = len;
218    }
219  }
220};