1use std::{borrow::Cow, sync::Arc};
2
3use memberlist_proto::{MessageType, ProtoEncoderError};
4use nodecraft::Node;
5use smallvec_wrapper::OneOrMore;
6use smol_str::SmolStr;
7
8use crate::{
9 delegate::{Delegate, DelegateError},
10 proto::{DecodeError, EncodeError, ErrorResponse},
11 transport::Transport,
12};
13
14#[cfg(any(
15 feature = "crc32",
16 feature = "xxhash64",
17 feature = "xxhash32",
18 feature = "xxhash3",
19 feature = "murmur3",
20))]
21use crate::proto::ChecksumError;
22
23#[cfg(any(
24 feature = "zstd",
25 feature = "lz4",
26 feature = "snappy",
27 feature = "brotli",
28))]
29use crate::proto::CompressionError;
30
31#[cfg(feature = "encryption")]
32pub use crate::proto::EncryptionError;
33
34pub use crate::transport::TransportError;
35
36#[derive(thiserror::Error)]
38pub enum Error<T: Transport, D: Delegate> {
39 #[error("node is not running, please bootstrap first")]
41 NotRunning,
42 #[error("timeout waiting for update broadcast")]
44 UpdateTimeout,
45 #[error("timeout waiting for leave broadcast")]
47 LeaveTimeout,
48 #[error("no response from node {0}")]
50 Lost(Node<T::Id, T::ResolvedAddress>),
51 #[error(transparent)]
53 Delegate(#[from] DelegateError<D>),
54 #[error(transparent)]
56 Transport(T::Error),
57 #[error("unexpected message: expected {expected}, got {got}")]
59 UnexpectedMessage {
60 expected: MessageType,
62 got: MessageType,
64 },
65 #[error("unknown message type value {0}")]
67 UnknownMessageType(u8),
68 #[error("sequence number mismatch: ping({ping}), ack({ack})")]
71 SequenceNumberMismatch {
72 ping: u32,
74 ack: u32,
76 },
77 #[error(transparent)]
79 Encode(#[from] EncodeError),
80 #[error(transparent)]
82 Decode(#[from] DecodeError),
83 #[error("remote error: {0}")]
85 Remote(SmolStr),
86 #[error("errors:\n{}", format_multiple_errors(.0))]
88 Multiple(Arc<[Self]>),
89 #[error("{0}")]
91 Other(Cow<'static, str>),
92 #[error(transparent)]
94 #[cfg(feature = "encryption")]
95 #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
96 Encryption(#[from] EncryptionError),
97 #[error(transparent)]
99 #[cfg(any(
100 feature = "zstd",
101 feature = "lz4",
102 feature = "snappy",
103 feature = "brotli",
104 ))]
105 #[cfg_attr(
106 docsrs,
107 doc(cfg(any(
108 feature = "zstd",
109 feature = "lz4",
110 feature = "snappy",
111 feature = "brotli",
112 )))
113 )]
114 Compression(#[from] CompressionError),
115 #[error(transparent)]
117 #[cfg(any(
118 feature = "crc32",
119 feature = "xxhash64",
120 feature = "xxhash32",
121 feature = "xxhash3",
122 feature = "murmur3",
123 ))]
124 #[cfg_attr(
125 docsrs,
126 doc(cfg(any(
127 feature = "crc32",
128 feature = "xxhash64",
129 feature = "xxhash32",
130 feature = "xxhash3",
131 feature = "murmur3",
132 )))
133 )]
134 Checksum(#[from] ChecksumError),
135}
136
137impl<T: Transport, D: Delegate> core::fmt::Debug for Error<T, D> {
138 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
139 write!(f, "{self}")
140 }
141}
142
143impl<T: Transport, D: Delegate> From<ProtoEncoderError> for Error<T, D> {
144 fn from(value: ProtoEncoderError) -> Self {
145 match value {
146 #[cfg(any(
147 feature = "zstd",
148 feature = "lz4",
149 feature = "snappy",
150 feature = "brotli",
151 ))]
152 ProtoEncoderError::Compress(e) => Self::Compression(e),
153 #[cfg(any(
154 feature = "crc32",
155 feature = "xxhash64",
156 feature = "xxhash32",
157 feature = "xxhash3",
158 feature = "murmur3",
159 ))]
160 ProtoEncoderError::Checksum(e) => Self::Checksum(e),
161 #[cfg(feature = "encryption")]
162 ProtoEncoderError::Encrypt(e) => Self::Encryption(e),
163 ProtoEncoderError::Encode(e) => Self::Encode(e),
164 }
165 }
166}
167
168impl<T: Transport, D: Delegate> FromIterator<Error<T, D>> for Option<Error<T, D>> {
169 fn from_iter<I: IntoIterator<Item = Error<T, D>>>(iter: I) -> Self {
170 let errors = iter.into_iter().collect::<OneOrMore<_>>();
171 let num_errs = errors.len();
172 match num_errs {
173 0 => None,
174 _ => Some(match errors.into_either() {
175 either::Either::Left([e]) => e,
176 either::Either::Right(e) => Error::Multiple(e.into_vec().into()),
177 }),
178 }
179 }
180}
181
182impl<T: Transport, D: Delegate> Error<T, D> {
183 #[auto_enums::auto_enum(Iterator)]
185 #[inline]
186 pub fn iter(&self) -> impl Iterator<Item = &Self> {
187 match self {
188 Self::Multiple(errors) => errors.iter(),
189 _ => std::iter::once(self),
190 }
191 }
192
193 #[inline]
195 pub fn delegate(e: DelegateError<D>) -> Self {
196 Self::Delegate(e)
197 }
198
199 #[inline]
201 pub fn sequence_number_mismatch(ping: u32, ack: u32) -> Self {
202 Self::SequenceNumberMismatch { ping, ack }
203 }
204
205 #[inline]
207 pub fn unexpected_message(expected: MessageType, got: MessageType) -> Self {
208 Self::UnexpectedMessage { expected, got }
209 }
210
211 #[inline]
213 pub fn transport(e: T::Error) -> Self {
214 Self::Transport(e)
215 }
216
217 #[inline]
219 pub fn remote(e: ErrorResponse) -> Self {
220 Self::Remote(e.into())
221 }
222
223 #[inline]
225 pub fn custom(e: std::borrow::Cow<'static, str>) -> Self {
226 Self::Other(e)
227 }
228
229 #[inline]
230 pub(crate) async fn try_from_stream<S>(stream: S) -> Result<(), Self>
231 where
232 S: futures::stream::Stream<Item = Self>,
233 {
234 use futures::stream::StreamExt;
235
236 Self::try_from_one_or_more(stream.collect::<OneOrMore<_>>().await)
237 }
238
239 #[inline]
240 pub(crate) fn try_from_one_or_more(errs: OneOrMore<Self>) -> Result<(), Self> {
241 let num = errs.len();
242 match num {
243 0 => Ok(()),
244 _ => Err(match errs.into_either() {
245 either::Either::Left([e]) => e,
246 either::Either::Right(e) => Self::Multiple(e.into_vec().into()),
247 }),
248 }
249 }
250
251 #[inline]
252 pub(crate) fn is_remote_failure(&self) -> bool {
253 match self {
254 Self::Transport(e) => e.is_remote_failure(),
255 Self::Multiple(errors) => errors.iter().any(Self::is_remote_failure),
256 _ => false,
257 }
258 }
259}
260
261fn format_multiple_errors<T: Transport, D: Delegate>(errors: &[Error<T, D>]) -> String {
262 errors
263 .iter()
264 .enumerate()
265 .map(|(i, err)| format!(" {}. {}", i + 1, err))
266 .collect::<Vec<_>>()
267 .join("\n")
268}