use std::time::Duration;
use thiserror::Error;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Error {
#[error("transport error: {0}")]
Transport(#[from] TransportError),
#[error("protocol error: {0}")]
Protocol(#[from] ProtocolError),
#[error("environment error: {0}")]
Environment(#[from] EnvError),
#[error("model error: {0}")]
Model(#[from] ModelError),
#[error("timeout after {0:?}")]
Timeout(Duration),
#[error("cancelled: {0}")]
Cancelled(String),
#[error("client error: {0}")]
Client(#[from] ClientError),
}
impl Error {
pub fn is_recoverable(&self) -> bool {
match self {
Self::Timeout(_) => true,
Self::Environment(error) => error.is_recoverable,
Self::Model(error) => error.is_recoverable,
Self::Transport(TransportError::Unavailable(_)) => true,
Self::Transport(TransportError::Status {
code: tonic::Code::DeadlineExceeded,
..
}) => true,
Self::Transport(TransportError::Io(_)) => false,
Self::Transport(TransportError::ConnectionClosed) => false,
_ => false,
}
}
pub fn is_fatal_handshake(&self) -> bool {
matches!(self, Self::Protocol(ProtocolError::HandshakeFailed(_)))
}
}
pub fn status_to_grpc_error(status: tonic::Status) -> Error {
use tonic::Code;
let code = status.code();
let message = status.message().to_string();
match code {
Code::Unavailable | Code::ResourceExhausted | Code::Aborted => {
Error::Transport(TransportError::Unavailable(message))
}
Code::Cancelled => Error::Cancelled(message),
_ => Error::Transport(TransportError::Status { code, message }),
}
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum TransportError {
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("connection closed")]
ConnectionClosed,
#[error("failed to bind: {0}")]
BindFailed(String),
#[error("failed to connect: {0}")]
ConnectFailed(String),
#[error("invalid address: {0}")]
InvalidAddress(String),
#[error("message too large: {size} > {max}")]
MessageTooLarge { size: usize, max: usize },
#[error("server unavailable: {0}")]
Unavailable(String),
#[error("grpc status {code:?}: {message}")]
Status {
code: tonic::Code,
message: String,
},
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum ProtocolError {
#[error("encode error: {0}")]
EncodeError(String),
#[error("decode error: {0}")]
DecodeError(String),
#[error("handshake failed: {0}")]
HandshakeFailed(String),
#[error("unexpected message: expected {expected}, got {actual}")]
UnexpectedMessage { expected: String, actual: String },
}
#[derive(Debug, Error)]
pub struct EnvError {
pub code: EnvErrorCode,
pub message: String,
pub is_recoverable: bool,
pub debug_info: Option<String>,
}
impl std::fmt::Display for EnvError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{:?}] {}", self.code, self.message)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum EnvErrorCode {
Unspecified,
Timeout,
InvalidAction,
NotReady,
Busy,
Internal,
Crashed,
Cancelled,
Closed,
}
impl EnvError {
pub fn new(code: EnvErrorCode, message: impl Into<String>) -> Self {
let is_recoverable = matches!(
code,
EnvErrorCode::Timeout
| EnvErrorCode::InvalidAction
| EnvErrorCode::NotReady
| EnvErrorCode::Busy
);
Self {
code,
message: message.into(),
is_recoverable,
debug_info: None,
}
}
}
impl From<rlmesh_proto::env::v1::EnvErrorCode> for EnvErrorCode {
fn from(code: rlmesh_proto::env::v1::EnvErrorCode) -> Self {
use rlmesh_proto::env::v1::EnvErrorCode as ProtoCode;
match code {
ProtoCode::Unspecified => EnvErrorCode::Unspecified,
ProtoCode::Timeout => EnvErrorCode::Timeout,
ProtoCode::InvalidAction => EnvErrorCode::InvalidAction,
ProtoCode::NotReady => EnvErrorCode::NotReady,
ProtoCode::Busy => EnvErrorCode::Busy,
ProtoCode::Internal => EnvErrorCode::Internal,
ProtoCode::Crashed => EnvErrorCode::Crashed,
ProtoCode::Cancelled => EnvErrorCode::Cancelled,
ProtoCode::Closed => EnvErrorCode::Closed,
}
}
}
impl From<EnvErrorCode> for rlmesh_proto::env::v1::EnvErrorCode {
fn from(code: EnvErrorCode) -> Self {
use rlmesh_proto::env::v1::EnvErrorCode as ProtoCode;
match code {
EnvErrorCode::Unspecified => ProtoCode::Unspecified,
EnvErrorCode::Timeout => ProtoCode::Timeout,
EnvErrorCode::InvalidAction => ProtoCode::InvalidAction,
EnvErrorCode::NotReady => ProtoCode::NotReady,
EnvErrorCode::Busy => ProtoCode::Busy,
EnvErrorCode::Internal => ProtoCode::Internal,
EnvErrorCode::Crashed => ProtoCode::Crashed,
EnvErrorCode::Cancelled => ProtoCode::Cancelled,
EnvErrorCode::Closed => ProtoCode::Closed,
}
}
}
#[derive(Debug, Error)]
pub struct ModelError {
pub code: ModelErrorCode,
pub message: String,
pub is_recoverable: bool,
pub debug_info: Option<String>,
}
impl std::fmt::Display for ModelError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{:?}] {}", self.code, self.message)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ModelErrorCode {
Unspecified,
InvalidRequest,
NotConfigured,
Busy,
Internal,
Cancelled,
Closed,
}
impl From<rlmesh_proto::model::v1::ModelErrorCode> for ModelErrorCode {
fn from(code: rlmesh_proto::model::v1::ModelErrorCode) -> Self {
use rlmesh_proto::model::v1::ModelErrorCode as ProtoCode;
match code {
ProtoCode::Unspecified => ModelErrorCode::Unspecified,
ProtoCode::InvalidRequest => ModelErrorCode::InvalidRequest,
ProtoCode::NotConfigured => ModelErrorCode::NotConfigured,
ProtoCode::Busy => ModelErrorCode::Busy,
ProtoCode::Internal => ModelErrorCode::Internal,
ProtoCode::Cancelled => ModelErrorCode::Cancelled,
ProtoCode::Closed => ModelErrorCode::Closed,
}
}
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum ClientError {
#[error("not connected")]
NotConnected,
#[error("handshake not completed")]
NotHandshaked,
}
pub type Result<T> = std::result::Result<T, Error>;
#[cfg(test)]
mod status_mapping_tests {
use super::*;
use tonic::{Code, Status};
#[test]
fn unavailable_status_maps_to_recoverable_transport_error() {
let error = status_to_grpc_error(Status::new(Code::Unavailable, "try later"));
assert!(error.is_recoverable(), "Unavailable must be retryable");
match error {
Error::Transport(TransportError::Unavailable(message)) => {
assert_eq!(message, "try later");
}
other => panic!("expected Unavailable transport error, got {other:?}"),
}
}
#[test]
fn unimplemented_status_preserves_code_and_is_not_recoverable() {
let error = status_to_grpc_error(Status::new(Code::Unimplemented, "no such method"));
assert!(!error.is_recoverable(), "Unimplemented must be permanent");
match error {
Error::Transport(TransportError::Status { code, message }) => {
assert_eq!(code, Code::Unimplemented);
assert_eq!(message, "no such method");
}
other => panic!("expected structured Status error, got {other:?}"),
}
let error = status_to_grpc_error(Status::new(Code::Unimplemented, "x"));
assert!(!error.to_string().contains("failed to connect"));
}
#[test]
fn deadline_exceeded_status_keeps_code_and_recoverability() {
let error = status_to_grpc_error(Status::new(Code::DeadlineExceeded, "slow"));
assert!(matches!(
error,
Error::Transport(TransportError::Status {
code: Code::DeadlineExceeded,
..
})
));
assert!(error.is_recoverable());
assert!(!error.to_string().contains("0ns"));
assert!(error.to_string().contains("slow"));
}
#[test]
fn handshake_failure_is_fatal_but_connect_failure_is_not() {
let fatal = Error::Protocol(ProtocolError::HandshakeFailed("pin mismatch".into()));
assert!(fatal.is_fatal_handshake());
let transient = Error::Transport(TransportError::ConnectFailed("binding".into()));
assert!(!transient.is_fatal_handshake());
}
}