memberlist_core/
error.rs

1use std::{borrow::Cow, sync::Arc};
2
3use memberlist_proto::{MessageType, ProtoEncoderError};
4use nodecraft::Node;
5use smallvec_wrapper::OneOrMore;
6use smol_str::SmolStr;
7
8use crate::{
9  delegate::{Delegate, DelegateError},
10  proto::{DecodeError, EncodeError, ErrorResponse},
11  transport::Transport,
12};
13
14#[cfg(any(
15  feature = "crc32",
16  feature = "xxhash64",
17  feature = "xxhash32",
18  feature = "xxhash3",
19  feature = "murmur3",
20))]
21use crate::proto::ChecksumError;
22
23#[cfg(any(
24  feature = "zstd",
25  feature = "lz4",
26  feature = "snappy",
27  feature = "brotli",
28))]
29use crate::proto::CompressionError;
30
31#[cfg(feature = "encryption")]
32pub use crate::proto::EncryptionError;
33
34pub use crate::transport::TransportError;
35
36/// Error type for the [`Memberlist`](crate::Memberlist)
37#[derive(thiserror::Error)]
38pub enum Error<T: Transport, D: Delegate> {
39  /// Returns when the node is not running.
40  #[error("node is not running, please bootstrap first")]
41  NotRunning,
42  /// Returns when timeout waiting for update broadcast.
43  #[error("timeout waiting for update broadcast")]
44  UpdateTimeout,
45  /// Returns when timeout waiting for leave broadcast.
46  #[error("timeout waiting for leave broadcast")]
47  LeaveTimeout,
48  /// Returns when lost connection with a peer.
49  #[error("no response from node {0}")]
50  Lost(Node<T::Id, T::ResolvedAddress>),
51  /// Delegate error
52  #[error(transparent)]
53  Delegate(#[from] DelegateError<D>),
54  /// Transport error
55  #[error(transparent)]
56  Transport(T::Error),
57  /// Returned when a message is received with an unexpected type.
58  #[error("unexpected message: expected {expected}, got {got}")]
59  UnexpectedMessage {
60    /// The expected message type.
61    expected: MessageType,
62    /// The actual message type.
63    got: MessageType,
64  },
65  /// Returned when receive a unknown message type value
66  #[error("unknown message type value {0}")]
67  UnknownMessageType(u8),
68  /// Returned when the sequence number of [`Ack`](crate::proto::Ack) is not
69  /// match the sequence number of [`Ping`](crate::proto::Ping).
70  #[error("sequence number mismatch: ping({ping}), ack({ack})")]
71  SequenceNumberMismatch {
72    /// The sequence number of [`Ping`](crate::proto::Ping).
73    ping: u32,
74    /// The sequence number of [`Ack`](crate::proto::Ack).
75    ack: u32,
76  },
77  /// Failed to encode message
78  #[error(transparent)]
79  Encode(#[from] EncodeError),
80  /// Failed to decode message
81  #[error(transparent)]
82  Decode(#[from] DecodeError),
83  /// Returned when a remote error is received.
84  #[error("remote error: {0}")]
85  Remote(SmolStr),
86  /// Multiple errors
87  #[error("errors:\n{}", format_multiple_errors(.0))]
88  Multiple(Arc<[Self]>),
89  /// Returned when a custom error is created by users.
90  #[error("{0}")]
91  Other(Cow<'static, str>),
92  /// Encryption error
93  #[error(transparent)]
94  #[cfg(feature = "encryption")]
95  #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
96  Encryption(#[from] EncryptionError),
97  /// Compressor error
98  #[error(transparent)]
99  #[cfg(any(
100    feature = "zstd",
101    feature = "lz4",
102    feature = "snappy",
103    feature = "brotli",
104  ))]
105  #[cfg_attr(
106    docsrs,
107    doc(cfg(any(
108      feature = "zstd",
109      feature = "lz4",
110      feature = "snappy",
111      feature = "brotli",
112    )))
113  )]
114  Compression(#[from] CompressionError),
115  /// Checksum error
116  #[error(transparent)]
117  #[cfg(any(
118    feature = "crc32",
119    feature = "xxhash64",
120    feature = "xxhash32",
121    feature = "xxhash3",
122    feature = "murmur3",
123  ))]
124  #[cfg_attr(
125    docsrs,
126    doc(cfg(any(
127      feature = "crc32",
128      feature = "xxhash64",
129      feature = "xxhash32",
130      feature = "xxhash3",
131      feature = "murmur3",
132    )))
133  )]
134  Checksum(#[from] ChecksumError),
135}
136
137impl<T: Transport, D: Delegate> core::fmt::Debug for Error<T, D> {
138  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
139    write!(f, "{self}")
140  }
141}
142
143impl<T: Transport, D: Delegate> From<ProtoEncoderError> for Error<T, D> {
144  fn from(value: ProtoEncoderError) -> Self {
145    match value {
146      #[cfg(any(
147        feature = "zstd",
148        feature = "lz4",
149        feature = "snappy",
150        feature = "brotli",
151      ))]
152      ProtoEncoderError::Compress(e) => Self::Compression(e),
153      #[cfg(any(
154        feature = "crc32",
155        feature = "xxhash64",
156        feature = "xxhash32",
157        feature = "xxhash3",
158        feature = "murmur3",
159      ))]
160      ProtoEncoderError::Checksum(e) => Self::Checksum(e),
161      #[cfg(feature = "encryption")]
162      ProtoEncoderError::Encrypt(e) => Self::Encryption(e),
163      ProtoEncoderError::Encode(e) => Self::Encode(e),
164    }
165  }
166}
167
168impl<T: Transport, D: Delegate> FromIterator<Error<T, D>> for Option<Error<T, D>> {
169  fn from_iter<I: IntoIterator<Item = Error<T, D>>>(iter: I) -> Self {
170    let errors = iter.into_iter().collect::<OneOrMore<_>>();
171    let num_errs = errors.len();
172    match num_errs {
173      0 => None,
174      _ => Some(match errors.into_either() {
175        either::Either::Left([e]) => e,
176        either::Either::Right(e) => Error::Multiple(e.into_vec().into()),
177      }),
178    }
179  }
180}
181
182impl<T: Transport, D: Delegate> Error<T, D> {
183  /// Returns an iterator over the errors.
184  #[auto_enums::auto_enum(Iterator)]
185  #[inline]
186  pub fn iter(&self) -> impl Iterator<Item = &Self> {
187    match self {
188      Self::Multiple(errors) => errors.iter(),
189      _ => std::iter::once(self),
190    }
191  }
192
193  /// Creates a new error with the given delegate error.
194  #[inline]
195  pub fn delegate(e: DelegateError<D>) -> Self {
196    Self::Delegate(e)
197  }
198
199  /// Creates a [`Error::SequenceNumberMismatch`] error.
200  #[inline]
201  pub fn sequence_number_mismatch(ping: u32, ack: u32) -> Self {
202    Self::SequenceNumberMismatch { ping, ack }
203  }
204
205  /// Creates a [`Error::UnexpectedMessage`] error.
206  #[inline]
207  pub fn unexpected_message(expected: MessageType, got: MessageType) -> Self {
208    Self::UnexpectedMessage { expected, got }
209  }
210
211  /// Creates a new error with the given transport error.
212  #[inline]
213  pub fn transport(e: T::Error) -> Self {
214    Self::Transport(e)
215  }
216
217  /// Creates a new error with the given remote error.
218  #[inline]
219  pub fn remote(e: ErrorResponse) -> Self {
220    Self::Remote(e.into())
221  }
222
223  /// Creates a new error with the given message.
224  #[inline]
225  pub fn custom(e: std::borrow::Cow<'static, str>) -> Self {
226    Self::Other(e)
227  }
228
229  #[inline]
230  pub(crate) async fn try_from_stream<S>(stream: S) -> Result<(), Self>
231  where
232    S: futures::stream::Stream<Item = Self>,
233  {
234    use futures::stream::StreamExt;
235
236    Self::try_from_one_or_more(stream.collect::<OneOrMore<_>>().await)
237  }
238
239  #[inline]
240  pub(crate) fn try_from_one_or_more(errs: OneOrMore<Self>) -> Result<(), Self> {
241    let num = errs.len();
242    match num {
243      0 => Ok(()),
244      _ => Err(match errs.into_either() {
245        either::Either::Left([e]) => e,
246        either::Either::Right(e) => Self::Multiple(e.into_vec().into()),
247      }),
248    }
249  }
250
251  #[inline]
252  pub(crate) fn is_remote_failure(&self) -> bool {
253    match self {
254      Self::Transport(e) => e.is_remote_failure(),
255      Self::Multiple(errors) => errors.iter().any(Self::is_remote_failure),
256      _ => false,
257    }
258  }
259}
260
261fn format_multiple_errors<T: Transport, D: Delegate>(errors: &[Error<T, D>]) -> String {
262  errors
263    .iter()
264    .enumerate()
265    .map(|(i, err)| format!("  {}. {}", i + 1, err))
266    .collect::<Vec<_>>()
267    .join("\n")
268}