memberlist_proto/
proto.rs1pub use decoder::*;
2pub use encoder::*;
3pub use error::*;
4pub use message::*;
5
6mod decoder;
7mod encoder;
8mod error;
9
10mod message;
11
12use core::future::Future;
13
14const PAYLOAD_LEN_SIZE: usize = 4;
15const MAX_MESSAGES_PER_BATCH: usize = 255;
16const BATCH_OVERHEAD: usize = 1 + 1 + PAYLOAD_LEN_SIZE;
20const LABEL_OVERHEAD: usize = 1 + 1;
23
24const MAX_PLAIN_MESSAGE_HEADER_SIZE: usize = 1 + 5;
27
28#[cfg(feature = "encryption")]
29const ENCRYPTED_MESSAGE_HEADER_SIZE: usize = 1 + 1 + PAYLOAD_LEN_SIZE; #[cfg(any(
32 feature = "snappy",
33 feature = "zstd",
34 feature = "lz4",
35 feature = "brotli"
36))]
37const COMPRESSED_MESSAGE_HEADER_SIZE: usize = 1 + 2 + PAYLOAD_LEN_SIZE + PAYLOAD_LEN_SIZE;
42
43#[cfg(any(
44 feature = "crc32",
45 feature = "xxhash32",
46 feature = "xxhash64",
47 feature = "xxhash3",
48 feature = "murmur3",
49))]
50const CHECKSUMED_MESSAGE_HEADER_SIZE: usize = 1 + 1 + PAYLOAD_LEN_SIZE; const COMPOOUND_MESSAGE_TAG: u8 = 1;
53const PING_MESSAGE_TAG: u8 = 2;
54const INDIRECT_PING_MESSAGE_TAG: u8 = 3;
55const ACK_MESSAGE_TAG: u8 = 4;
56const SUSPECT_MESSAGE_TAG: u8 = 5;
57const ALIVE_MESSAGE_TAG: u8 = 6;
58const DEAD_MESSAGE_TAG: u8 = 7;
59const PUSH_PULL_MESSAGE_TAG: u8 = 8;
60const USER_DATA_MESSAGE_TAG: u8 = 9;
61const NACK_MESSAGE_TAG: u8 = 10;
62const ERROR_RESPONSE_MESSAGE_TAG: u8 = 11;
63const LABELED_MESSAGE_TAG: u8 = 12;
64const CHECKSUMED_MESSAGE_TAG: u8 = 13;
65const COMPRESSED_MESSAGE_TAG: u8 = 14;
66const ENCRYPTED_MESSAGE_TAG: u8 = 15;
67
68#[inline]
69const fn is_plain_message_tag(tag: u8) -> bool {
70 matches!(
71 tag,
72 PING_MESSAGE_TAG
73 | INDIRECT_PING_MESSAGE_TAG
74 | ACK_MESSAGE_TAG
75 | SUSPECT_MESSAGE_TAG
76 | ALIVE_MESSAGE_TAG
77 | DEAD_MESSAGE_TAG
78 | PUSH_PULL_MESSAGE_TAG
79 | USER_DATA_MESSAGE_TAG
80 | NACK_MESSAGE_TAG
81 | ERROR_RESPONSE_MESSAGE_TAG
82 )
83}
84
85pub trait ProtoReader: Send + Sync {
87 fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send;
91
92 fn read_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send;
94
95 fn peek(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send;
99
100 fn peek_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send;
102}
103
104impl<T: ProtoReader> ProtoReader for &mut T {
105 fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send {
106 (**self).read(buf)
107 }
108
109 fn read_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send {
110 (**self).read_exact(buf)
111 }
112
113 fn peek(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send {
114 (**self).peek(buf)
115 }
116
117 fn peek_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send {
118 (**self).peek_exact(buf)
119 }
120}
121
122const _: () = {
123 use futures_util::io::AsyncRead;
124 use peekable::future::AsyncPeekable;
125
126 impl<T: AsyncRead + Send + Sync + Unpin> ProtoReader for AsyncPeekable<T> {
127 async fn peek(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
128 AsyncPeekable::peek(self, buf).await
129 }
130
131 async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
132 futures_util::io::AsyncReadExt::read_exact(self, buf).await
133 }
134
135 async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
136 futures_util::io::AsyncReadExt::read(self, buf).await
137 }
138
139 async fn peek_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
140 AsyncPeekable::peek_exact(self, buf).await
141 }
142 }
143};
144
145pub trait ProtoWriter: Send + Sync {
147 fn close(&mut self) -> impl Future<Output = std::io::Result<()>> + Send;
149
150 fn write_all(&mut self, payload: &[u8]) -> impl Future<Output = std::io::Result<()>> + Send;
152
153 fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> + Send;
155}
156
157impl<W> ProtoWriter for W
158where
159 W: futures_util::AsyncWrite + Send + Sync + Unpin,
160{
161 async fn close(&mut self) -> std::io::Result<()> {
162 futures_util::io::AsyncWriteExt::close(self).await
163 }
164
165 async fn write_all(&mut self, payload: &[u8]) -> std::io::Result<()> {
166 futures_util::io::AsyncWriteExt::write_all(self, payload).await
167 }
168
169 async fn flush(&mut self) -> std::io::Result<()> {
170 futures_util::io::AsyncWriteExt::flush(self).await
171 }
172}
173
174#[cfg(feature = "encryption")]
175struct AeadBuffer<'a> {
176 buf: &'a mut [u8],
177 len: usize,
178}
179
180#[cfg(feature = "encryption")]
181const _: () = {
182 impl<'a> AeadBuffer<'a> {
183 #[inline]
184 const fn new(buf: &'a mut [u8], len: usize) -> Self {
185 Self { buf, len }
186 }
187 }
188
189 impl AsRef<[u8]> for AeadBuffer<'_> {
190 fn as_ref(&self) -> &[u8] {
191 &self.buf[..self.len]
192 }
193 }
194
195 impl AsMut<[u8]> for AeadBuffer<'_> {
196 fn as_mut(&mut self) -> &mut [u8] {
197 &mut self.buf[..self.len]
198 }
199 }
200
201 impl aead::Buffer for AeadBuffer<'_> {
202 fn extend_from_slice(&mut self, other: &[u8]) -> aead::Result<()> {
203 if self.len >= self.buf.len() {
204 return Err(aead::Error);
205 }
206
207 self.buf[self.len..self.len + other.len()].copy_from_slice(other);
208 self.len += other.len();
209 Ok(())
210 }
211
212 fn truncate(&mut self, len: usize) {
213 if len >= self.len {
214 return;
215 }
216 self.buf[len..self.len].fill(0);
217 self.len = len;
218 }
219 }
220};