use std::io;
use prost::DecodeError;
use thiserror::Error;
use super::{RpcStatus, handshake::RpcHandshakeError, server::RpcServerError};
use crate::{
PeerConnectionError,
connectivity::ConnectivityError,
peer_manager::PeerManagerError,
proto::rpc as rpc_proto,
traits::OrOptional,
};
#[derive(Debug, Error)]
pub enum RpcError {
#[error("Failed to decode message: {0}")]
DecodeError(#[from] DecodeError),
#[error("IO Error: {0}")]
Io(#[from] io::Error),
#[error("The client connection is closed")]
ClientClosed,
#[error("Request failed: {0}")]
RequestFailed(#[from] RpcStatus),
#[error("Remote peer unexpectedly closed the RPC connection")]
ServerClosedRequest,
#[error("Request cancelled")]
RequestCancelled,
#[error("Response did not match the request ID (expected {expected} actual {actual})")]
ResponseIdDidNotMatchRequest { expected: u16, actual: u16 },
#[error("Client internal error: {0}")]
ClientInternalError(String),
#[error("Handshake error: {0}")]
HandshakeError(#[from] RpcHandshakeError),
#[error("Server error: {0}")]
ServerError(#[from] RpcServerError),
#[error("Peer connection error: {0}")]
PeerConnectionError(#[from] PeerConnectionError),
#[error("Peer manager error: {0}")]
PeerManagerError(#[from] PeerManagerError),
#[error("Connectivity error: {0}")]
ConnectivityError(#[from] ConnectivityError),
#[error("Reply from peer timed out")]
ReplyTimeout,
#[error("Received an invalid ping response")]
InvalidPingResponse,
#[error("Unexpected ACK response. This is likely because of a previous ACK timeout")]
UnexpectedAckResponse,
#[error("Remote peer attempted to send more than {expected} payload chunks")]
RemotePeerExceededMaxChunkCount { expected: usize },
#[error("Request body was too large. Expected <= {expected} but got {got}")]
MaxRequestSizeExceeded { got: usize, expected: usize },
#[error(transparent)]
UnknownError(#[from] anyhow::Error),
}
impl RpcError {
pub fn client_internal_error<T: ToString>(err: &T) -> Self {
RpcError::ClientInternalError(err.to_string())
}
pub fn is_caused_by_server(&self) -> bool {
match self {
RpcError::ReplyTimeout |
RpcError::DecodeError(_) |
RpcError::RemotePeerExceededMaxChunkCount { .. } |
RpcError::HandshakeError(RpcHandshakeError::DecodeError(_)) |
RpcError::HandshakeError(RpcHandshakeError::ServerClosedRequest) |
RpcError::HandshakeError(RpcHandshakeError::Rejected(_)) |
RpcError::HandshakeError(RpcHandshakeError::TimedOut) |
RpcError::ServerClosedRequest |
RpcError::UnexpectedAckResponse |
RpcError::ResponseIdDidNotMatchRequest { .. } => true,
RpcError::RequestFailed(_) |
RpcError::Io(_) |
RpcError::ClientClosed |
RpcError::RequestCancelled |
RpcError::ClientInternalError(_) |
RpcError::ServerError(_) |
RpcError::PeerConnectionError(_) |
RpcError::PeerManagerError(_) |
RpcError::ConnectivityError(_) |
RpcError::InvalidPingResponse |
RpcError::MaxRequestSizeExceeded { .. } |
RpcError::HandshakeError(RpcHandshakeError::Io(_)) |
RpcError::HandshakeError(RpcHandshakeError::ClientNoSupportedVersion) |
RpcError::HandshakeError(RpcHandshakeError::ClientClosed) |
RpcError::UnknownError(_) => false,
}
}
}
#[derive(Debug, Error, Clone, Copy)]
pub enum HandshakeRejectReason {
#[error("protocol version not supported")]
UnsupportedVersion,
#[error("no more RPC server sessions available: {0}")]
NoServerSessionsAvailable(&'static str),
#[error("no more RPC client sessions available: {0}")]
NoClientSessionsAvailable(&'static str),
#[error("protocol not supported")]
ProtocolNotSupported,
#[error("unknown protocol error: {0}")]
Unknown(&'static str),
}
impl HandshakeRejectReason {
pub fn as_i32(&self) -> i32 {
rpc_proto::rpc_session_reply::HandshakeRejectReason::from(*self) as i32
}
pub fn from_i32(v: i32) -> Option<Self> {
rpc_proto::rpc_session_reply::HandshakeRejectReason::try_from(v)
.map(Into::into)
.ok()
}
}
impl From<rpc_proto::rpc_session_reply::HandshakeRejectReason> for HandshakeRejectReason {
fn from(reason: rpc_proto::rpc_session_reply::HandshakeRejectReason) -> Self {
#[allow(clippy::enum_glob_use)]
use rpc_proto::rpc_session_reply::HandshakeRejectReason::*;
match reason {
UnsupportedVersion => HandshakeRejectReason::UnsupportedVersion,
NoServerSessionsAvailable => HandshakeRejectReason::NoServerSessionsAvailable("session limit reached"),
NoClientSessionsAvailable => HandshakeRejectReason::NoClientSessionsAvailable("session limit reached"),
ProtocolNotSupported => HandshakeRejectReason::ProtocolNotSupported,
Unknown => HandshakeRejectReason::Unknown("reject reason is not known"),
}
}
}
impl From<HandshakeRejectReason> for rpc_proto::rpc_session_reply::HandshakeRejectReason {
fn from(reason: HandshakeRejectReason) -> Self {
#[allow(clippy::enum_glob_use)]
use rpc_proto::rpc_session_reply::HandshakeRejectReason::*;
match reason {
HandshakeRejectReason::UnsupportedVersion => UnsupportedVersion,
HandshakeRejectReason::NoServerSessionsAvailable(_) => NoServerSessionsAvailable,
HandshakeRejectReason::NoClientSessionsAvailable(_) => NoClientSessionsAvailable,
HandshakeRejectReason::ProtocolNotSupported => ProtocolNotSupported,
HandshakeRejectReason::Unknown(_) => Unknown,
}
}
}
impl<T> OrOptional<T> for Result<T, RpcError> {
type Error = RpcError;
fn or_optional(self) -> Result<Option<T>, Self::Error> {
self.map(Some).or_else(|err| {
if let RpcError::RequestFailed(ref status) = err {
if status.is_not_found() { Ok(None) } else { Err(err) }
} else {
Err(err)
}
})
}
}