use std::backtrace::{Backtrace, BacktraceStatus};
use std::fmt;
use std::fmt::{Debug, Display, Formatter};
use chrono::{DateTime, TimeZone as _, Utc};
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum ErrorKind {
PreconditionFailed,
Unexpected,
DataInvalid,
NamespaceAlreadyExists,
TableAlreadyExists,
NamespaceNotFound,
TableNotFound,
FeatureUnsupported,
CatalogCommitConflicts,
}
impl ErrorKind {
pub fn into_static(self) -> &'static str {
self.into()
}
}
impl From<ErrorKind> for &'static str {
fn from(v: ErrorKind) -> &'static str {
match v {
ErrorKind::Unexpected => "Unexpected",
ErrorKind::DataInvalid => "DataInvalid",
ErrorKind::FeatureUnsupported => "FeatureUnsupported",
ErrorKind::TableAlreadyExists => "TableAlreadyExists",
ErrorKind::TableNotFound => "TableNotFound",
ErrorKind::NamespaceAlreadyExists => "NamespaceAlreadyExists",
ErrorKind::NamespaceNotFound => "NamespaceNotFound",
ErrorKind::PreconditionFailed => "PreconditionFailed",
ErrorKind::CatalogCommitConflicts => "CatalogCommitConflicts",
}
}
}
impl Display for ErrorKind {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.into_static())
}
}
pub struct Error {
kind: ErrorKind,
message: String,
context: Vec<(&'static str, String)>,
source: Option<anyhow::Error>,
backtrace: Backtrace,
retryable: bool,
}
impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.kind)?;
if !self.context.is_empty() {
write!(f, ", context: {{ ")?;
write!(
f,
"{}",
self.context
.iter()
.map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<_>>()
.join(", ")
)?;
write!(f, " }}")?;
}
if !self.message.is_empty() {
write!(f, " => {}", self.message)?;
}
if let Some(source) = &self.source {
write!(f, ", source: {source}")?;
}
Ok(())
}
}
impl Debug for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
if f.alternate() {
let mut de = f.debug_struct("Error");
de.field("kind", &self.kind);
de.field("message", &self.message);
de.field("context", &self.context);
de.field("source", &self.source);
de.field("backtrace", &self.backtrace);
return de.finish();
}
write!(f, "{}", self.kind)?;
if !self.message.is_empty() {
write!(f, " => {}", self.message)?;
}
writeln!(f)?;
if !self.context.is_empty() {
writeln!(f)?;
writeln!(f, "Context:")?;
for (k, v) in self.context.iter() {
writeln!(f, " {k}: {v}")?;
}
}
if let Some(source) = &self.source {
writeln!(f)?;
writeln!(f, "Source: {source:#}")?;
}
if self.backtrace.status() == BacktraceStatus::Captured {
writeln!(f)?;
writeln!(f, "Backtrace:")?;
writeln!(f, "{}", self.backtrace)?;
}
Ok(())
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.source.as_ref().map(|v| v.as_ref())
}
}
impl Error {
pub fn new(kind: ErrorKind, message: impl Into<String>) -> Self {
Self {
kind,
message: message.into(),
context: Vec::default(),
source: None,
backtrace: Backtrace::capture(),
retryable: false,
}
}
pub fn with_retryable(mut self, retryable: bool) -> Self {
self.retryable = retryable;
self
}
pub fn with_context(mut self, key: &'static str, value: impl Into<String>) -> Self {
self.context.push((key, value.into()));
self
}
pub fn with_source(mut self, src: impl Into<anyhow::Error>) -> Self {
debug_assert!(self.source.is_none(), "the source error has been set");
self.source = Some(src.into());
self
}
#[cfg(test)]
fn with_backtrace(mut self, backtrace: Backtrace) -> Self {
self.backtrace = backtrace;
self
}
pub fn backtrace(&self) -> &Backtrace {
&self.backtrace
}
pub fn kind(&self) -> ErrorKind {
self.kind
}
pub fn retryable(&self) -> bool {
self.retryable
}
#[inline]
pub fn message(&self) -> &str {
self.message.as_str()
}
}
macro_rules! define_from_err {
($source: path, $error_kind: path, $msg: expr) => {
impl std::convert::From<$source> for crate::error::Error {
fn from(v: $source) -> Self {
Self::new($error_kind, $msg).with_source(v)
}
}
};
}
define_from_err!(
std::str::Utf8Error,
ErrorKind::Unexpected,
"handling invalid utf-8 characters"
);
define_from_err!(
core::num::ParseIntError,
ErrorKind::Unexpected,
"parsing integer from string"
);
define_from_err!(
std::array::TryFromSliceError,
ErrorKind::DataInvalid,
"failed to convert byte slice to array"
);
define_from_err!(
std::num::TryFromIntError,
ErrorKind::DataInvalid,
"failed to convert integer"
);
define_from_err!(
chrono::ParseError,
ErrorKind::DataInvalid,
"Failed to parse string to date or time"
);
define_from_err!(
uuid::Error,
ErrorKind::DataInvalid,
"Failed to convert between uuid und iceberg value"
);
define_from_err!(
apache_avro::Error,
ErrorKind::DataInvalid,
"Failure in conversion with avro"
);
define_from_err!(
url::ParseError,
ErrorKind::DataInvalid,
"Failed to parse url"
);
define_from_err!(
reqwest::Error,
ErrorKind::Unexpected,
"Failed to execute http request"
);
define_from_err!(
serde_json::Error,
ErrorKind::DataInvalid,
"Failed to parse json string"
);
define_from_err!(
parquet::errors::ParquetError,
ErrorKind::Unexpected,
"Failed to read a Parquet file"
);
define_from_err!(
futures::channel::mpsc::SendError,
ErrorKind::Unexpected,
"Failed to send a message to a channel"
);
define_from_err!(
arrow_schema::ArrowError,
ErrorKind::Unexpected,
"Arrow Schema Error"
);
define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
pub(crate) fn timestamp_ms_to_utc(timestamp_ms: i64) -> Result<DateTime<Utc>> {
match Utc.timestamp_millis_opt(timestamp_ms) {
chrono::LocalResult::Single(t) => Ok(t),
chrono::LocalResult::Ambiguous(_, _) => Err(Error::new(
ErrorKind::Unexpected,
"Ambiguous timestamp with two possible results",
)),
chrono::LocalResult::None => Err(Error::new(ErrorKind::DataInvalid, "Invalid timestamp")),
}
.map_err(|e| e.with_context("timestamp value", timestamp_ms.to_string()))
}
#[macro_export]
macro_rules! ensure_data_valid {
($cond: expr, $fmt: literal, $($arg:tt)*) => {
if !$cond {
return Err($crate::error::Error::new($crate::error::ErrorKind::DataInvalid, format!($fmt, $($arg)*)))
}
};
}
#[cfg(test)]
mod tests {
use anyhow::anyhow;
use pretty_assertions::assert_eq;
use super::*;
fn generate_error_with_backtrace_disabled() -> Error {
Error::new(
ErrorKind::Unexpected,
"something wrong happened".to_string(),
)
.with_context("path", "/path/to/file".to_string())
.with_context("called", "send_async".to_string())
.with_source(anyhow!("networking error"))
.with_backtrace(Backtrace::disabled())
}
fn generate_error_with_backtrace_enabled() -> Error {
Error::new(
ErrorKind::Unexpected,
"something wrong happened".to_string(),
)
.with_context("path", "/path/to/file".to_string())
.with_context("called", "send_async".to_string())
.with_source(anyhow!("networking error"))
.with_backtrace(Backtrace::force_capture())
}
#[test]
fn test_error_display_without_backtrace() {
let s = format!("{}", generate_error_with_backtrace_disabled());
assert_eq!(
s,
r#"Unexpected, context: { path: /path/to/file, called: send_async } => something wrong happened, source: networking error"#
)
}
#[test]
fn test_error_display_with_backtrace() {
let s = format!("{}", generate_error_with_backtrace_enabled());
assert_eq!(
s,
r#"Unexpected, context: { path: /path/to/file, called: send_async } => something wrong happened, source: networking error"#
)
}
#[test]
fn test_error_debug_without_backtrace() {
let s = format!("{:?}", generate_error_with_backtrace_disabled());
assert_eq!(
s,
r#"Unexpected => something wrong happened
Context:
path: /path/to/file
called: send_async
Source: networking error
"#
)
}
#[test]
fn test_error_debug_with_backtrace() {
let s = format!("{:?}", generate_error_with_backtrace_enabled());
let expected = r#"Unexpected => something wrong happened
Context:
path: /path/to/file
called: send_async
Source: networking error
Backtrace:
0:"#;
assert_eq!(&s[..expected.len()], expected,);
}
}