1use std::{borrow::Cow, collections::HashMap};
2
3use memberlist_core::{
4 delegate::DelegateError as MemberlistDelegateError,
5 transport::{AddressResolver, MaybeResolvedAddress, Node, Transport},
6 types::{SmallVec, TinyVec},
7};
8use smol_str::SmolStr;
9
10use crate::{
11 delegate::{Delegate, MergeDelegate, TransformDelegate},
12 serf::{SerfDelegate, SerfState},
13 types::Member,
14};
15
16pub use crate::snapshot::SnapshotError;
17
18#[derive(thiserror::Error)]
20pub enum SerfDelegateError<D: Delegate> {
21 #[error(transparent)]
23 Serf(#[from] SerfError),
24 #[error(transparent)]
26 TransformDelegate(<D as TransformDelegate>::Error),
27 #[error(transparent)]
29 MergeDelegate(<D as MergeDelegate>::Error),
30}
31
32impl<D: Delegate> core::fmt::Debug for SerfDelegateError<D> {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 match self {
35 Self::TransformDelegate(err) => write!(f, "{err:?}"),
36 Self::MergeDelegate(err) => write!(f, "{err:?}"),
37 Self::Serf(err) => write!(f, "{err:?}"),
38 }
39 }
40}
41
42impl<D: Delegate> SerfDelegateError<D> {
43 #[inline]
45 pub const fn transform(err: <D as TransformDelegate>::Error) -> Self {
46 Self::TransformDelegate(err)
47 }
48
49 #[inline]
51 pub const fn merge(err: <D as MergeDelegate>::Error) -> Self {
52 Self::MergeDelegate(err)
53 }
54
55 #[inline]
57 pub const fn serf(err: crate::error::SerfError) -> Self {
58 Self::Serf(err)
59 }
60}
61
62impl<T, D> From<MemberlistDelegateError<SerfDelegate<T, D>>> for SerfDelegateError<D>
63where
64 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
65 T: Transport,
66{
67 fn from(value: MemberlistDelegateError<SerfDelegate<T, D>>) -> Self {
68 match value {
69 MemberlistDelegateError::AliveDelegate(e) => e,
70 MemberlistDelegateError::MergeDelegate(e) => e,
71 }
72 }
73}
74
75#[derive(thiserror::Error)]
77pub enum Error<T, D>
78where
79 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
80 T: Transport,
81{
82 #[error(transparent)]
84 Memberlist(#[from] MemberlistError<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>),
85 #[error(transparent)]
87 Serf(#[from] SerfError),
88 #[error(transparent)]
90 Transport(T::Error),
91 #[error(transparent)]
93 Delegate(#[from] SerfDelegateError<D>),
94 #[error(transparent)]
96 Relay(#[from] RelayError<T, D>),
97}
98
99impl<T, D> From<memberlist_core::error::Error<T, SerfDelegate<T, D>>> for Error<T, D>
100where
101 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
102 T: Transport,
103{
104 fn from(value: memberlist_core::error::Error<T, SerfDelegate<T, D>>) -> Self {
105 match value {
106 memberlist_core::error::Error::NotRunning => Self::Memberlist(MemberlistError::NotRunning),
107 memberlist_core::error::Error::UpdateTimeout => {
108 Self::Memberlist(MemberlistError::UpdateTimeout)
109 }
110 memberlist_core::error::Error::LeaveTimeout => {
111 Self::Memberlist(MemberlistError::LeaveTimeout)
112 }
113 memberlist_core::error::Error::Lost(n) => Self::Memberlist(MemberlistError::Lost(n)),
114 memberlist_core::error::Error::Delegate(e) => match e.into() {
115 SerfDelegateError::Serf(e) => Self::Serf(e),
116 e => Self::Delegate(e),
117 },
118 memberlist_core::error::Error::Transport(e) => Self::Transport(e),
119 memberlist_core::error::Error::UnexpectedMessage { expected, got } => {
120 Self::Memberlist(MemberlistError::UnexpectedMessage { expected, got })
121 }
122 memberlist_core::error::Error::SequenceNumberMismatch { ping, ack } => {
123 Self::Memberlist(MemberlistError::SequenceNumberMismatch { ping, ack })
124 }
125 memberlist_core::error::Error::Remote(e) => Self::Memberlist(MemberlistError::Remote(e)),
126 memberlist_core::error::Error::Other(e) => Self::Memberlist(MemberlistError::Other(e)),
127 }
128 }
129}
130
131impl<T, D> core::fmt::Debug for Error<T, D>
132where
133 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
134 T: Transport,
135{
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 match self {
138 Self::Memberlist(e) => write!(f, "{e:?}"),
139 Self::Serf(e) => write!(f, "{e:?}"),
140 Self::Transport(e) => write!(f, "{e:?}"),
141 Self::Delegate(e) => write!(f, "{e:?}"),
142 Self::Relay(e) => write!(f, "{e:?}"),
143 }
144 }
145}
146
147impl<T, D> From<SnapshotError> for Error<T, D>
148where
149 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
150 T: Transport,
151{
152 fn from(value: SnapshotError) -> Self {
153 Self::Serf(SerfError::Snapshot(value))
154 }
155}
156
157impl<T, D> Error<T, D>
158where
159 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
160 T: Transport,
161{
162 #[inline]
164 pub fn transform_delegate(err: <D as TransformDelegate>::Error) -> Self {
165 Self::Delegate(SerfDelegateError::TransformDelegate(err))
166 }
167
168 #[inline]
170 pub const fn merge_delegate(err: <D as MergeDelegate>::Error) -> Self {
171 Self::Delegate(SerfDelegateError::MergeDelegate(err))
172 }
173
174 #[inline]
176 pub const fn query_response_too_large(limit: usize, got: usize) -> Self {
177 Self::Serf(SerfError::QueryResponseTooLarge { limit, got })
178 }
179
180 #[inline]
182 pub const fn query_timeout() -> Self {
183 Self::Serf(SerfError::QueryTimeout)
184 }
185
186 #[inline]
188 pub const fn query_already_responsed() -> Self {
189 Self::Serf(SerfError::QueryAlreadyResponsed)
190 }
191
192 #[inline]
194 pub const fn query_response_delivery_failed() -> Self {
195 Self::Serf(SerfError::QueryResponseDeliveryFailed)
196 }
197
198 #[inline]
200 pub const fn relayed_response_too_large(size: usize) -> Self {
201 Self::Serf(SerfError::RelayedResponseTooLarge(size))
202 }
203
204 #[inline]
206 pub const fn relay(err: RelayError<T, D>) -> Self {
207 Self::Relay(err)
208 }
209
210 #[inline]
212 pub const fn fail_truncate_response() -> Self {
213 Self::Serf(SerfError::FailTruncateResponse)
214 }
215
216 #[inline]
218 pub const fn tags_too_large(size: usize) -> Self {
219 Self::Serf(SerfError::TagsTooLarge(size))
220 }
221
222 #[inline]
224 pub const fn query_too_large(size: usize) -> Self {
225 Self::Serf(SerfError::QueryTooLarge(size))
226 }
227
228 #[inline]
230 pub const fn user_event_limit_too_large(size: usize) -> Self {
231 Self::Serf(SerfError::UserEventLimitTooLarge(size))
232 }
233
234 #[inline]
236 pub const fn user_event_too_large(size: usize) -> Self {
237 Self::Serf(SerfError::UserEventTooLarge(size))
238 }
239
240 #[inline]
242 pub const fn raw_user_event_too_large(size: usize) -> Self {
243 Self::Serf(SerfError::RawUserEventTooLarge(size))
244 }
245
246 #[inline]
248 pub const fn broadcast_channel_closed() -> Self {
249 Self::Serf(SerfError::BroadcastChannelClosed)
250 }
251
252 #[inline]
254 pub const fn removal_broadcast_timeout() -> Self {
255 Self::Serf(SerfError::RemovalBroadcastTimeout)
256 }
257
258 #[inline]
260 pub const fn snapshot(err: SnapshotError) -> Self {
261 Self::Serf(SerfError::Snapshot(err))
262 }
263
264 #[inline]
266 pub const fn memberlist(
267 err: MemberlistError<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
268 ) -> Self {
269 Self::Memberlist(err)
270 }
271
272 #[inline]
274 pub const fn bad_leave_status(status: SerfState) -> Self {
275 Self::Serf(SerfError::BadLeaveStatus(status))
276 }
277
278 #[inline]
280 pub const fn bad_join_status(status: SerfState) -> Self {
281 Self::Serf(SerfError::BadJoinStatus(status))
282 }
283
284 #[inline]
286 pub const fn coordinates_disabled() -> Self {
287 Self::Serf(SerfError::CoordinatesDisabled)
288 }
289}
290
291#[derive(Debug, thiserror::Error)]
293pub enum SerfError {
294 #[error("serf: user event exceeds configured limit of {0} bytes before encoding")]
296 UserEventLimitTooLarge(usize),
297 #[error("serf: user event exceeds sane limit of {0} bytes before encoding")]
299 UserEventTooLarge(usize),
300 #[error("serf: join called on {0} statues")]
302 BadJoinStatus(SerfState),
303 #[error("serf: leave called on {0} statues")]
305 BadLeaveStatus(SerfState),
306 #[error("serf: user event exceeds sane limit of {0} bytes after encoding")]
308 RawUserEventTooLarge(usize),
309 #[error("serf: query exceeds limit of {0} bytes")]
311 QueryTooLarge(usize),
312 #[error("serf: query response is past the deadline")]
314 QueryTimeout,
315 #[error("serf: query response ({got} bytes) exceeds limit of {limit} bytes")]
317 QueryResponseTooLarge {
318 limit: usize,
320 got: usize,
322 },
323 #[error("serf: query response already sent")]
325 QueryAlreadyResponsed,
326 #[error("serf: failed to truncate response so that it fits into message")]
328 FailTruncateResponse,
329 #[error("serf: encoded length of tags exceeds limit of {0} bytes")]
331 TagsTooLarge(usize),
332 #[error("serf: relayed response exceeds limit of {0} bytes")]
334 RelayedResponseTooLarge(usize),
335 #[error("serf: failed to deliver query response, dropping")]
337 QueryResponseDeliveryFailed,
338 #[error("serf: coordinates are disabled")]
340 CoordinatesDisabled,
341 #[error("serf: {0}")]
343 Snapshot(#[from] SnapshotError),
344 #[error("serf: timed out broadcasting node removal")]
346 RemovalBroadcastTimeout,
347 #[error("serf: timed out broadcasting channel closed")]
349 BroadcastChannelClosed,
350}
351
352#[derive(Debug, thiserror::Error)]
354pub enum MemberlistError<I, A> {
355 #[error("memberlist: node is not running, please bootstrap first")]
357 NotRunning,
358 #[error("memberlist: timeout waiting for update broadcast")]
360 UpdateTimeout,
361 #[error("memberlist: timeout waiting for leave broadcast")]
363 LeaveTimeout,
364 #[error("memberlist: no response from node {0}")]
366 Lost(Node<I, A>),
367 #[error("memberlist: unexpected message: expected {expected}, got {got}")]
369 UnexpectedMessage {
370 expected: &'static str,
372 got: &'static str,
374 },
375 #[error("memberlist: sequence number mismatch: ping({ping}), ack({ack})")]
378 SequenceNumberMismatch {
379 ping: u32,
381 ack: u32,
383 },
384 #[error("memberlist: remote error: {0}")]
386 Remote(SmolStr),
387 #[error("memberlist: {0}")]
389 Other(Cow<'static, str>),
390}
391
392pub struct RelayError<T, D>(
394 #[allow(clippy::type_complexity)]
395 TinyVec<(
396 Member<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
397 memberlist_core::error::Error<T, SerfDelegate<T, D>>,
398 )>,
399)
400where
401 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
402 T: Transport;
403
404impl<T, D>
405 From<
406 TinyVec<(
407 Member<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
408 memberlist_core::error::Error<T, SerfDelegate<T, D>>,
409 )>,
410 > for RelayError<T, D>
411where
412 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
413 T: Transport,
414{
415 fn from(
416 value: TinyVec<(
417 Member<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
418 memberlist_core::error::Error<T, SerfDelegate<T, D>>,
419 )>,
420 ) -> Self {
421 Self(value)
422 }
423}
424
425impl<T, D> core::fmt::Display for RelayError<T, D>
426where
427 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
428 T: Transport,
429{
430 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
431 writeln!(f, "relay errors:")?;
432
433 for (member, err) in self.0.iter() {
434 writeln!(
435 f,
436 "\tfailed to send relay response to {}: {}",
437 member.node().id(),
438 err
439 )?;
440 }
441 Ok(())
442 }
443}
444
445impl<T, D> core::fmt::Debug for RelayError<T, D>
446where
447 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
448 T: Transport,
449{
450 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
451 core::fmt::Display::fmt(self, f)
452 }
453}
454
455impl<T, D> std::error::Error for RelayError<T, D>
456where
457 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
458 T: Transport,
459{
460}
461
462pub struct JoinError<T, D>
464where
465 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
466 T: Transport,
467{
468 pub(crate) joined: SmallVec<Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>,
469 pub(crate) errors: HashMap<Node<T::Id, MaybeResolvedAddress<T>>, Error<T, D>>,
470 pub(crate) broadcast_error: Option<Error<T, D>>,
471}
472
473impl<T, D> JoinError<T, D>
474where
475 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
476 T: Transport,
477{
478 #[inline]
480 pub const fn broadcast_error(&self) -> Option<&Error<T, D>> {
481 self.broadcast_error.as_ref()
482 }
483
484 #[inline]
486 pub const fn errors(&self) -> &HashMap<Node<T::Id, MaybeResolvedAddress<T>>, Error<T, D>> {
487 &self.errors
488 }
489
490 #[inline]
492 pub const fn joined(
493 &self,
494 ) -> &SmallVec<Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>> {
495 &self.joined
496 }
497
498 #[inline]
500 pub fn num_joined(&self) -> usize {
501 self.joined.len()
502 }
503}
504
505impl<T, D> core::fmt::Debug for JoinError<T, D>
506where
507 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
508 T: Transport,
509{
510 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
511 write!(f, "{}", self)
512 }
513}
514
515impl<T, D> core::fmt::Display for JoinError<T, D>
516where
517 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
518 T: Transport,
519{
520 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
521 if !self.joined.is_empty() {
522 writeln!(f, "Successes: {:?}", self.joined)?;
523 }
524
525 if !self.errors.is_empty() {
526 writeln!(f, "Failures:")?;
527 for (address, err) in self.errors.iter() {
528 writeln!(f, "\t{}: {}", address, err)?;
529 }
530 }
531
532 if let Some(err) = &self.broadcast_error {
533 writeln!(f, "Broadcast Error: {err}")?;
534 }
535 Ok(())
536 }
537}
538
539impl<T, D> std::error::Error for JoinError<T, D>
540where
541 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
542 T: Transport,
543{
544}