pub use crate::rpc::RpcError;
pub use crate::rpc::{ApiError, FlussError};
use arrow_schema::ArrowError;
use snafu::Snafu;
use std::{io, result};
use strum::ParseError;
pub type Result<T> = result::Result<T, Error>;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(
whatever,
display("Fluss hitting unexpected error {}: {:?}", message, source)
)]
UnexpectedError {
message: String,
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync + 'static>, Some)))]
source: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
},
#[snafu(
visibility(pub(crate)),
display("Fluss hitting unexpected io error {}: {:?}", message, source)
)]
IoUnexpectedError { message: String, source: io::Error },
#[snafu(
visibility(pub(crate)),
display(
"Fluss hitting remote storage unexpected error {}: {:?}",
message,
source
)
)]
RemoteStorageUnexpectedError {
message: String,
source: opendal::Error,
},
#[snafu(
visibility(pub(crate)),
display("Fluss hitting json serde error {}.", message)
)]
JsonSerdeError { message: String },
#[snafu(
visibility(pub(crate)),
display("Fluss hitting unexpected rpc error {}: {:?}", message, source)
)]
RpcError { message: String, source: RpcError },
#[snafu(
visibility(pub(crate)),
display("Fluss hitting row convert error {}.", message)
)]
RowConvertError { message: String },
#[snafu(
visibility(pub(crate)),
display("Fluss hitting Arrow error {}: {:?}.", message, source)
)]
ArrowError { message: String, source: ArrowError },
#[snafu(
visibility(pub(crate)),
display("Fluss hitting illegal argument error {}.", message)
)]
IllegalArgument { message: String },
#[snafu(
visibility(pub(crate)),
display("Fluss hitting IO not supported error {}.", message)
)]
IoUnsupported { message: String },
#[snafu(
visibility(pub(crate)),
display("Fluss hitting wakeup error {}.", message)
)]
WakeupError { message: String },
#[snafu(
visibility(pub(crate)),
display("Fluss hitting unsupported operation error {}.", message)
)]
UnsupportedOperation { message: String },
#[snafu(visibility(pub(crate)), display("Fluss writer closed: {}.", message))]
WriterClosed { message: String },
#[snafu(
visibility(pub(crate)),
display("Fluss buffer exhausted: {}.", message)
)]
BufferExhausted { message: String },
#[snafu(visibility(pub(crate)), display("Fluss API Error: {}.", api_error))]
FlussAPIError { api_error: ApiError },
}
impl Error {
pub fn table_not_exist(message: impl Into<String>) -> Self {
Error::FlussAPIError {
api_error: ApiError {
code: FlussError::TableNotExist.code(),
message: message.into(),
},
}
}
pub fn invalid_table(message: impl Into<String>) -> Self {
Error::FlussAPIError {
api_error: ApiError {
code: FlussError::InvalidTableException.code(),
message: message.into(),
},
}
}
pub fn partition_not_exist(message: impl Into<String>) -> Self {
Error::FlussAPIError {
api_error: ApiError {
code: FlussError::PartitionNotExists.code(),
message: message.into(),
},
}
}
pub fn invalid_partition(message: impl Into<String>) -> Self {
Error::FlussAPIError {
api_error: ApiError {
code: FlussError::PartitionSpecInvalidException.code(),
message: message.into(),
},
}
}
pub fn leader_not_available(message: impl Into<String>) -> Self {
Error::FlussAPIError {
api_error: ApiError {
code: FlussError::LeaderNotAvailableException.code(),
message: message.into(),
},
}
}
pub fn api_error(&self) -> Option<FlussError> {
if let Error::FlussAPIError { api_error } = self {
Some(FlussError::for_code(api_error.code))
} else {
None
}
}
pub fn is_retriable(&self) -> bool {
match self {
Error::RpcError { .. } => true,
Error::FlussAPIError { api_error } => api_error.is_retriable(),
_ => false,
}
}
}
impl From<ArrowError> for Error {
fn from(value: ArrowError) -> Self {
Error::ArrowError {
message: format!("{value}"),
source: value,
}
}
}
impl From<RpcError> for Error {
fn from(value: RpcError) -> Self {
Error::RpcError {
message: format!("{value}"),
source: value,
}
}
}
impl From<io::Error> for Error {
fn from(value: io::Error) -> Self {
Error::IoUnexpectedError {
message: format!("{value}"),
source: value,
}
}
}
impl From<opendal::Error> for Error {
fn from(value: opendal::Error) -> Self {
Error::RemoteStorageUnexpectedError {
message: format!("{value}"),
source: value,
}
}
}
impl From<ApiError> for Error {
fn from(value: ApiError) -> Self {
Error::FlussAPIError { api_error: value }
}
}
impl From<ParseError> for Error {
fn from(value: ParseError) -> Self {
Error::IllegalArgument {
message: value.to_string(),
}
}
}