serf_core/
error.rs

1use std::{borrow::Cow, collections::HashMap};
2
3use memberlist_core::{
4  delegate::DelegateError as MemberlistDelegateError,
5  transport::{AddressResolver, MaybeResolvedAddress, Node, Transport},
6  types::{SmallVec, TinyVec},
7};
8use smol_str::SmolStr;
9
10use crate::{
11  delegate::{Delegate, MergeDelegate, TransformDelegate},
12  serf::{SerfDelegate, SerfState},
13  types::Member,
14};
15
16pub use crate::snapshot::SnapshotError;
17
18/// Error trait for [`Delegate`]
19#[derive(thiserror::Error)]
20pub enum SerfDelegateError<D: Delegate> {
21  /// Serf error
22  #[error(transparent)]
23  Serf(#[from] SerfError),
24  /// [`TransformDelegate`] error
25  #[error(transparent)]
26  TransformDelegate(<D as TransformDelegate>::Error),
27  /// [`MergeDelegate`] error
28  #[error(transparent)]
29  MergeDelegate(<D as MergeDelegate>::Error),
30}
31
32impl<D: Delegate> core::fmt::Debug for SerfDelegateError<D> {
33  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34    match self {
35      Self::TransformDelegate(err) => write!(f, "{err:?}"),
36      Self::MergeDelegate(err) => write!(f, "{err:?}"),
37      Self::Serf(err) => write!(f, "{err:?}"),
38    }
39  }
40}
41
42impl<D: Delegate> SerfDelegateError<D> {
43  /// Create a delegate error from an alive delegate error.
44  #[inline]
45  pub const fn transform(err: <D as TransformDelegate>::Error) -> Self {
46    Self::TransformDelegate(err)
47  }
48
49  /// Create a delegate error from a merge delegate error.
50  #[inline]
51  pub const fn merge(err: <D as MergeDelegate>::Error) -> Self {
52    Self::MergeDelegate(err)
53  }
54
55  /// Create a delegate error from a serf error.
56  #[inline]
57  pub const fn serf(err: crate::error::SerfError) -> Self {
58    Self::Serf(err)
59  }
60}
61
62impl<T, D> From<MemberlistDelegateError<SerfDelegate<T, D>>> for SerfDelegateError<D>
63where
64  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
65  T: Transport,
66{
67  fn from(value: MemberlistDelegateError<SerfDelegate<T, D>>) -> Self {
68    match value {
69      MemberlistDelegateError::AliveDelegate(e) => e,
70      MemberlistDelegateError::MergeDelegate(e) => e,
71    }
72  }
73}
74
75/// Error type for the serf crate.
76#[derive(thiserror::Error)]
77pub enum Error<T, D>
78where
79  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
80  T: Transport,
81{
82  /// Returned when the underlyhing memberlist error
83  #[error(transparent)]
84  Memberlist(#[from] MemberlistError<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>),
85  /// Returned when the serf error
86  #[error(transparent)]
87  Serf(#[from] SerfError),
88  /// Returned when the transport error
89  #[error(transparent)]
90  Transport(T::Error),
91  /// Returned when the delegate error
92  #[error(transparent)]
93  Delegate(#[from] SerfDelegateError<D>),
94  /// Returned when the relay error
95  #[error(transparent)]
96  Relay(#[from] RelayError<T, D>),
97}
98
99impl<T, D> From<memberlist_core::error::Error<T, SerfDelegate<T, D>>> for Error<T, D>
100where
101  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
102  T: Transport,
103{
104  fn from(value: memberlist_core::error::Error<T, SerfDelegate<T, D>>) -> Self {
105    match value {
106      memberlist_core::error::Error::NotRunning => Self::Memberlist(MemberlistError::NotRunning),
107      memberlist_core::error::Error::UpdateTimeout => {
108        Self::Memberlist(MemberlistError::UpdateTimeout)
109      }
110      memberlist_core::error::Error::LeaveTimeout => {
111        Self::Memberlist(MemberlistError::LeaveTimeout)
112      }
113      memberlist_core::error::Error::Lost(n) => Self::Memberlist(MemberlistError::Lost(n)),
114      memberlist_core::error::Error::Delegate(e) => match e.into() {
115        SerfDelegateError::Serf(e) => Self::Serf(e),
116        e => Self::Delegate(e),
117      },
118      memberlist_core::error::Error::Transport(e) => Self::Transport(e),
119      memberlist_core::error::Error::UnexpectedMessage { expected, got } => {
120        Self::Memberlist(MemberlistError::UnexpectedMessage { expected, got })
121      }
122      memberlist_core::error::Error::SequenceNumberMismatch { ping, ack } => {
123        Self::Memberlist(MemberlistError::SequenceNumberMismatch { ping, ack })
124      }
125      memberlist_core::error::Error::Remote(e) => Self::Memberlist(MemberlistError::Remote(e)),
126      memberlist_core::error::Error::Other(e) => Self::Memberlist(MemberlistError::Other(e)),
127    }
128  }
129}
130
131impl<T, D> core::fmt::Debug for Error<T, D>
132where
133  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
134  T: Transport,
135{
136  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137    match self {
138      Self::Memberlist(e) => write!(f, "{e:?}"),
139      Self::Serf(e) => write!(f, "{e:?}"),
140      Self::Transport(e) => write!(f, "{e:?}"),
141      Self::Delegate(e) => write!(f, "{e:?}"),
142      Self::Relay(e) => write!(f, "{e:?}"),
143    }
144  }
145}
146
147impl<T, D> From<SnapshotError> for Error<T, D>
148where
149  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
150  T: Transport,
151{
152  fn from(value: SnapshotError) -> Self {
153    Self::Serf(SerfError::Snapshot(value))
154  }
155}
156
157impl<T, D> Error<T, D>
158where
159  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
160  T: Transport,
161{
162  /// Create error from a transform error
163  #[inline]
164  pub fn transform_delegate(err: <D as TransformDelegate>::Error) -> Self {
165    Self::Delegate(SerfDelegateError::TransformDelegate(err))
166  }
167
168  /// Create a merge delegate error
169  #[inline]
170  pub const fn merge_delegate(err: <D as MergeDelegate>::Error) -> Self {
171    Self::Delegate(SerfDelegateError::MergeDelegate(err))
172  }
173
174  /// Create a query response too large error
175  #[inline]
176  pub const fn query_response_too_large(limit: usize, got: usize) -> Self {
177    Self::Serf(SerfError::QueryResponseTooLarge { limit, got })
178  }
179
180  /// Create a query timeout error
181  #[inline]
182  pub const fn query_timeout() -> Self {
183    Self::Serf(SerfError::QueryTimeout)
184  }
185
186  /// Create a query already response error
187  #[inline]
188  pub const fn query_already_responsed() -> Self {
189    Self::Serf(SerfError::QueryAlreadyResponsed)
190  }
191
192  /// Create a query response delivery failed error
193  #[inline]
194  pub const fn query_response_delivery_failed() -> Self {
195    Self::Serf(SerfError::QueryResponseDeliveryFailed)
196  }
197
198  /// Create a relayed response too large error
199  #[inline]
200  pub const fn relayed_response_too_large(size: usize) -> Self {
201    Self::Serf(SerfError::RelayedResponseTooLarge(size))
202  }
203
204  /// Create a relay error
205  #[inline]
206  pub const fn relay(err: RelayError<T, D>) -> Self {
207    Self::Relay(err)
208  }
209
210  /// Create a fail truncate response error
211  #[inline]
212  pub const fn fail_truncate_response() -> Self {
213    Self::Serf(SerfError::FailTruncateResponse)
214  }
215
216  /// Create a tags too large error
217  #[inline]
218  pub const fn tags_too_large(size: usize) -> Self {
219    Self::Serf(SerfError::TagsTooLarge(size))
220  }
221
222  /// Create a query too large error
223  #[inline]
224  pub const fn query_too_large(size: usize) -> Self {
225    Self::Serf(SerfError::QueryTooLarge(size))
226  }
227
228  /// Create a user event limit too large error
229  #[inline]
230  pub const fn user_event_limit_too_large(size: usize) -> Self {
231    Self::Serf(SerfError::UserEventLimitTooLarge(size))
232  }
233
234  /// Create a user event limit too large error
235  #[inline]
236  pub const fn user_event_too_large(size: usize) -> Self {
237    Self::Serf(SerfError::UserEventTooLarge(size))
238  }
239
240  /// Create a raw user event too large error
241  #[inline]
242  pub const fn raw_user_event_too_large(size: usize) -> Self {
243    Self::Serf(SerfError::RawUserEventTooLarge(size))
244  }
245
246  /// Create a broadcast channel closed error
247  #[inline]
248  pub const fn broadcast_channel_closed() -> Self {
249    Self::Serf(SerfError::BroadcastChannelClosed)
250  }
251
252  /// Create a removal broadcast timeout error
253  #[inline]
254  pub const fn removal_broadcast_timeout() -> Self {
255    Self::Serf(SerfError::RemovalBroadcastTimeout)
256  }
257
258  /// Create a snapshot error
259  #[inline]
260  pub const fn snapshot(err: SnapshotError) -> Self {
261    Self::Serf(SerfError::Snapshot(err))
262  }
263
264  /// Create a memberlist error
265  #[inline]
266  pub const fn memberlist(
267    err: MemberlistError<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
268  ) -> Self {
269    Self::Memberlist(err)
270  }
271
272  /// Create a bad leave status error
273  #[inline]
274  pub const fn bad_leave_status(status: SerfState) -> Self {
275    Self::Serf(SerfError::BadLeaveStatus(status))
276  }
277
278  /// Create a bad join status error
279  #[inline]
280  pub const fn bad_join_status(status: SerfState) -> Self {
281    Self::Serf(SerfError::BadJoinStatus(status))
282  }
283
284  /// Create a coordinates disabled error
285  #[inline]
286  pub const fn coordinates_disabled() -> Self {
287    Self::Serf(SerfError::CoordinatesDisabled)
288  }
289}
290
291/// [`Serf`](crate::Serf) error.
292#[derive(Debug, thiserror::Error)]
293pub enum SerfError {
294  /// Returned when the user event exceeds the configured limit.
295  #[error("serf: user event exceeds configured limit of {0} bytes before encoding")]
296  UserEventLimitTooLarge(usize),
297  /// Returned when the user event exceeds the sane limit.
298  #[error("serf: user event exceeds sane limit of {0} bytes before encoding")]
299  UserEventTooLarge(usize),
300  /// Returned when the join status is bad.
301  #[error("serf: join called on {0} statues")]
302  BadJoinStatus(SerfState),
303  /// Returned when the leave status is bad.
304  #[error("serf: leave called on {0} statues")]
305  BadLeaveStatus(SerfState),
306  /// Returned when the encoded user event exceeds the sane limit after encoding.
307  #[error("serf: user event exceeds sane limit of {0} bytes after encoding")]
308  RawUserEventTooLarge(usize),
309  /// Returned when the query size exceeds the configured limit.
310  #[error("serf: query exceeds limit of {0} bytes")]
311  QueryTooLarge(usize),
312  /// Returned when the query is timeout.
313  #[error("serf: query response is past the deadline")]
314  QueryTimeout,
315  /// Returned when the query response is too large.
316  #[error("serf: query response ({got} bytes) exceeds limit of {limit} bytes")]
317  QueryResponseTooLarge {
318    /// The query response size limit.
319    limit: usize,
320    /// The query response size.
321    got: usize,
322  },
323  /// Returned when the query has already been responded.
324  #[error("serf: query response already sent")]
325  QueryAlreadyResponsed,
326  /// Returned when failed to truncate response so that it fits into message.
327  #[error("serf: failed to truncate response so that it fits into message")]
328  FailTruncateResponse,
329  /// Returned when the tags too large.
330  #[error("serf: encoded length of tags exceeds limit of {0} bytes")]
331  TagsTooLarge(usize),
332  /// Returned when the relayed response is too large.
333  #[error("serf: relayed response exceeds limit of {0} bytes")]
334  RelayedResponseTooLarge(usize),
335  /// Returned when failed to deliver query response, dropping.
336  #[error("serf: failed to deliver query response, dropping")]
337  QueryResponseDeliveryFailed,
338  /// Returned when the coordinates are disabled.
339  #[error("serf: coordinates are disabled")]
340  CoordinatesDisabled,
341  /// Returned when snapshot error.
342  #[error("serf: {0}")]
343  Snapshot(#[from] SnapshotError),
344  /// Returned when timed out broadcasting node removal.
345  #[error("serf: timed out broadcasting node removal")]
346  RemovalBroadcastTimeout,
347  /// Returned when the timed out broadcasting channel closed.
348  #[error("serf: timed out broadcasting channel closed")]
349  BroadcastChannelClosed,
350}
351
352/// Error type for [`Memberlist`](memberlist_core::Memberlist).
353#[derive(Debug, thiserror::Error)]
354pub enum MemberlistError<I, A> {
355  /// Returns when the node is not running.
356  #[error("memberlist: node is not running, please bootstrap first")]
357  NotRunning,
358  /// Returns when timeout waiting for update broadcast.
359  #[error("memberlist: timeout waiting for update broadcast")]
360  UpdateTimeout,
361  /// Returns when timeout waiting for leave broadcast.
362  #[error("memberlist: timeout waiting for leave broadcast")]
363  LeaveTimeout,
364  /// Returns when lost connection with a peer.
365  #[error("memberlist: no response from node {0}")]
366  Lost(Node<I, A>),
367  /// Returned when a message is received with an unexpected type.
368  #[error("memberlist: unexpected message: expected {expected}, got {got}")]
369  UnexpectedMessage {
370    /// The expected message type.
371    expected: &'static str,
372    /// The actual message type.
373    got: &'static str,
374  },
375  /// Returned when the sequence number of [`Ack`](crate::types::Ack) is not
376  /// match the sequence number of [`Ping`](crate::types::Ping).
377  #[error("memberlist: sequence number mismatch: ping({ping}), ack({ack})")]
378  SequenceNumberMismatch {
379    /// The sequence number of [`Ping`](crate::types::Ping).
380    ping: u32,
381    /// The sequence number of [`Ack`](crate::types::Ack).
382    ack: u32,
383  },
384  /// Returned when a remote error is received.
385  #[error("memberlist: remote error: {0}")]
386  Remote(SmolStr),
387  /// Returned when a custom error is created by users.
388  #[error("memberlist: {0}")]
389  Other(Cow<'static, str>),
390}
391
392/// Relay error from remote nodes.
393pub struct RelayError<T, D>(
394  #[allow(clippy::type_complexity)]
395  TinyVec<(
396    Member<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
397    memberlist_core::error::Error<T, SerfDelegate<T, D>>,
398  )>,
399)
400where
401  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
402  T: Transport;
403
404impl<T, D>
405  From<
406    TinyVec<(
407      Member<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
408      memberlist_core::error::Error<T, SerfDelegate<T, D>>,
409    )>,
410  > for RelayError<T, D>
411where
412  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
413  T: Transport,
414{
415  fn from(
416    value: TinyVec<(
417      Member<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
418      memberlist_core::error::Error<T, SerfDelegate<T, D>>,
419    )>,
420  ) -> Self {
421    Self(value)
422  }
423}
424
425impl<T, D> core::fmt::Display for RelayError<T, D>
426where
427  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
428  T: Transport,
429{
430  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
431    writeln!(f, "relay errors:")?;
432
433    for (member, err) in self.0.iter() {
434      writeln!(
435        f,
436        "\tfailed to send relay response to {}: {}",
437        member.node().id(),
438        err
439      )?;
440    }
441    Ok(())
442  }
443}
444
445impl<T, D> core::fmt::Debug for RelayError<T, D>
446where
447  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
448  T: Transport,
449{
450  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
451    core::fmt::Display::fmt(self, f)
452  }
453}
454
455impl<T, D> std::error::Error for RelayError<T, D>
456where
457  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
458  T: Transport,
459{
460}
461
462/// `JoinError` is returned when join is partially/totally failed.
463pub struct JoinError<T, D>
464where
465  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
466  T: Transport,
467{
468  pub(crate) joined: SmallVec<Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>,
469  pub(crate) errors: HashMap<Node<T::Id, MaybeResolvedAddress<T>>, Error<T, D>>,
470  pub(crate) broadcast_error: Option<Error<T, D>>,
471}
472
473impl<T, D> JoinError<T, D>
474where
475  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
476  T: Transport,
477{
478  /// Returns the broadcast error that occurred during the join.
479  #[inline]
480  pub const fn broadcast_error(&self) -> Option<&Error<T, D>> {
481    self.broadcast_error.as_ref()
482  }
483
484  /// Returns the errors that occurred during the join.
485  #[inline]
486  pub const fn errors(&self) -> &HashMap<Node<T::Id, MaybeResolvedAddress<T>>, Error<T, D>> {
487    &self.errors
488  }
489
490  /// Returns the nodes have successfully joined.
491  #[inline]
492  pub const fn joined(
493    &self,
494  ) -> &SmallVec<Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>> {
495    &self.joined
496  }
497
498  /// Returns how many nodes have successfully joined.
499  #[inline]
500  pub fn num_joined(&self) -> usize {
501    self.joined.len()
502  }
503}
504
505impl<T, D> core::fmt::Debug for JoinError<T, D>
506where
507  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
508  T: Transport,
509{
510  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
511    write!(f, "{}", self)
512  }
513}
514
515impl<T, D> core::fmt::Display for JoinError<T, D>
516where
517  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
518  T: Transport,
519{
520  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
521    if !self.joined.is_empty() {
522      writeln!(f, "Successes: {:?}", self.joined)?;
523    }
524
525    if !self.errors.is_empty() {
526      writeln!(f, "Failures:")?;
527      for (address, err) in self.errors.iter() {
528        writeln!(f, "\t{}: {}", address, err)?;
529      }
530    }
531
532    if let Some(err) = &self.broadcast_error {
533      writeln!(f, "Broadcast Error: {err}")?;
534    }
535    Ok(())
536  }
537}
538
539impl<T, D> std::error::Error for JoinError<T, D>
540where
541  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
542  T: Transport,
543{
544}