use std::sync::Arc;
use memberlist_core::{
delegate::DelegateError as MemberlistDelegateError, proto::TinyVec, transport::Transport,
};
use crate::{
delegate::{Delegate, MergeDelegate},
serf::{SerfDelegate, SerfState},
types::Member,
};
pub use crate::snapshot::SnapshotError;
#[derive(thiserror::Error)]
pub enum SerfDelegateError<D: Delegate> {
#[error(transparent)]
Serf(#[from] SerfError),
#[error(transparent)]
Merge(<D as MergeDelegate>::Error),
}
impl<D: Delegate> core::fmt::Debug for SerfDelegateError<D> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Merge(err) => write!(f, "{err:?}"),
Self::Serf(err) => write!(f, "{err:?}"),
}
}
}
impl<D: Delegate> SerfDelegateError<D> {
#[inline]
pub const fn merge(err: <D as MergeDelegate>::Error) -> Self {
Self::Merge(err)
}
#[inline]
pub const fn serf(err: crate::error::SerfError) -> Self {
Self::Serf(err)
}
}
impl<T, D> From<MemberlistDelegateError<SerfDelegate<T, D>>> for SerfDelegateError<D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
fn from(value: MemberlistDelegateError<SerfDelegate<T, D>>) -> Self {
match value {
MemberlistDelegateError::AliveDelegate(e) => e,
MemberlistDelegateError::MergeDelegate(e) => e,
}
}
}
#[derive(thiserror::Error)]
pub enum Error<T, D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
#[error(transparent)]
Memberlist(#[from] memberlist_core::error::Error<T, SerfDelegate<T, D>>),
#[error(transparent)]
Serf(#[from] SerfError),
#[error(transparent)]
Relay(#[from] RelayError<T, D>),
#[error("errors:\n{}", format_multiple_errors(.0))]
Multiple(Arc<[Self]>),
}
impl<T, D> core::fmt::Debug for Error<T, D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Memberlist(e) => write!(f, "{e:?}"),
Self::Serf(e) => write!(f, "{e:?}"),
Self::Relay(e) => write!(f, "{e:?}"),
Self::Multiple(e) => write!(f, "{e:?}"),
}
}
}
impl<T, D> From<SnapshotError> for Error<T, D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
fn from(value: SnapshotError) -> Self {
Self::Serf(SerfError::Snapshot(value))
}
}
impl<T, D> From<memberlist_core::proto::EncodeError> for Error<T, D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
fn from(e: memberlist_core::proto::EncodeError) -> Self {
Self::Serf(e.into())
}
}
impl<T, D> From<memberlist_core::proto::DecodeError> for Error<T, D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
fn from(e: memberlist_core::proto::DecodeError) -> Self {
Self::Serf(e.into())
}
}
impl<T, D> Error<T, D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
#[inline]
pub const fn query_response_too_large(limit: usize, got: usize) -> Self {
Self::Serf(SerfError::QueryResponseTooLarge { limit, got })
}
#[inline]
pub const fn query_timeout() -> Self {
Self::Serf(SerfError::QueryTimeout)
}
#[inline]
pub const fn query_already_responsed() -> Self {
Self::Serf(SerfError::QueryAlreadyResponsed)
}
#[inline]
pub const fn query_response_delivery_failed() -> Self {
Self::Serf(SerfError::QueryResponseDeliveryFailed)
}
#[inline]
pub const fn relayed_response_too_large(size: usize) -> Self {
Self::Serf(SerfError::RelayedResponseTooLarge(size))
}
#[inline]
pub const fn relay(err: RelayError<T, D>) -> Self {
Self::Relay(err)
}
#[inline]
pub const fn fail_truncate_response() -> Self {
Self::Serf(SerfError::FailTruncateResponse)
}
#[inline]
pub const fn tags_too_large(size: usize) -> Self {
Self::Serf(SerfError::TagsTooLarge(size))
}
#[inline]
pub const fn query_too_large(size: usize) -> Self {
Self::Serf(SerfError::QueryTooLarge(size))
}
#[inline]
pub const fn user_event_limit_too_large(size: usize) -> Self {
Self::Serf(SerfError::UserEventLimitTooLarge(size))
}
#[inline]
pub const fn user_event_too_large(size: usize) -> Self {
Self::Serf(SerfError::UserEventTooLarge(size))
}
#[inline]
pub const fn raw_user_event_too_large(size: usize) -> Self {
Self::Serf(SerfError::RawUserEventTooLarge(size))
}
#[inline]
pub const fn broadcast_channel_closed() -> Self {
Self::Serf(SerfError::BroadcastChannelClosed)
}
#[inline]
pub const fn removal_broadcast_timeout() -> Self {
Self::Serf(SerfError::RemovalBroadcastTimeout)
}
#[inline]
pub const fn snapshot(err: SnapshotError) -> Self {
Self::Serf(SerfError::Snapshot(err))
}
#[inline]
pub const fn bad_leave_status(status: SerfState) -> Self {
Self::Serf(SerfError::BadLeaveStatus(status))
}
#[inline]
pub const fn bad_join_status(status: SerfState) -> Self {
Self::Serf(SerfError::BadJoinStatus(status))
}
#[inline]
pub const fn coordinates_disabled() -> Self {
Self::Serf(SerfError::CoordinatesDisabled)
}
}
#[derive(Debug, thiserror::Error)]
pub enum SerfError {
#[error("user event exceeds configured limit of {0} bytes before encoding")]
UserEventLimitTooLarge(usize),
#[error("user event exceeds sane limit of {0} bytes before encoding")]
UserEventTooLarge(usize),
#[error("join called on {0} statues")]
BadJoinStatus(SerfState),
#[error("leave called on {0} statues")]
BadLeaveStatus(SerfState),
#[error("user event exceeds sane limit of {0} bytes after encoding")]
RawUserEventTooLarge(usize),
#[error("query exceeds limit of {0} bytes")]
QueryTooLarge(usize),
#[error("query response is past the deadline")]
QueryTimeout,
#[error("query response ({got} bytes) exceeds limit of {limit} bytes")]
QueryResponseTooLarge {
limit: usize,
got: usize,
},
#[error("query response already sent")]
QueryAlreadyResponsed,
#[error("failed to truncate response so that it fits into message")]
FailTruncateResponse,
#[error("encoded length of tags exceeds limit of {0} bytes")]
TagsTooLarge(usize),
#[error("relayed response exceeds limit of {0} bytes")]
RelayedResponseTooLarge(usize),
#[error("failed to deliver query response, dropping")]
QueryResponseDeliveryFailed,
#[error("coordinates are disabled")]
CoordinatesDisabled,
#[error(transparent)]
Snapshot(#[from] SnapshotError),
#[error(transparent)]
Decode(#[from] memberlist_core::proto::DecodeError),
#[error(transparent)]
Encode(#[from] memberlist_core::proto::EncodeError),
#[error("timed out broadcasting node removal")]
RemovalBroadcastTimeout,
#[error("timed out broadcasting channel closed")]
BroadcastChannelClosed,
}
pub struct RelayError<T, D>(
#[allow(clippy::type_complexity)]
TinyVec<(
Member<T::Id, T::ResolvedAddress>,
memberlist_core::error::Error<T, SerfDelegate<T, D>>,
)>,
)
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport;
impl<T, D>
From<
TinyVec<(
Member<T::Id, T::ResolvedAddress>,
memberlist_core::error::Error<T, SerfDelegate<T, D>>,
)>,
> for RelayError<T, D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
fn from(
value: TinyVec<(
Member<T::Id, T::ResolvedAddress>,
memberlist_core::error::Error<T, SerfDelegate<T, D>>,
)>,
) -> Self {
Self(value)
}
}
impl<T, D> core::fmt::Display for RelayError<T, D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
writeln!(f, "relay errors:")?;
for (member, err) in self.0.iter() {
writeln!(
f,
"\tfailed to send relay response to {}: {}",
member.node().id(),
err
)?;
}
Ok(())
}
}
impl<T, D> core::fmt::Debug for RelayError<T, D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
core::fmt::Display::fmt(self, f)
}
}
impl<T, D> std::error::Error for RelayError<T, D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
}
fn format_multiple_errors<T, D>(errors: &[Error<T, D>]) -> String
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
errors
.iter()
.enumerate()
.map(|(i, err)| format!(" {}. {}", i + 1, err))
.collect::<Vec<_>>()
.join("\n")
}