use laminar_core::error_codes;
#[derive(Debug, thiserror::Error)]
pub enum DbError {
Sql(#[from] laminar_sql::Error),
Engine(#[from] laminar_core::Error),
Streaming(#[from] laminar_core::streaming::StreamingError),
DataFusion(#[from] datafusion_common::DataFusionError),
SourceNotFound(String),
SinkNotFound(String),
QueryNotFound(String),
SourceAlreadyExists(String),
SinkAlreadyExists(String),
StreamNotFound(String),
StreamAlreadyExists(String),
TableNotFound(String),
TableAlreadyExists(String),
InsertError(String),
SchemaMismatch(String),
InvalidOperation(String),
SqlParse(#[from] laminar_sql::parser::ParseError),
Shutdown,
Checkpoint(String),
CheckpointStore(#[from] laminar_storage::checkpoint_store::CheckpointStoreError),
UnresolvedConfigVar(String),
Connector(String),
ConnectorOp(#[from] laminar_connectors::error::ConnectorError),
Pipeline(String),
QueryPipeline {
context: String,
translated: String,
},
MaterializedView(String),
Storage(String),
Config(String),
Unsupported(String),
}
impl DbError {
pub fn query_pipeline(
context: impl Into<String>,
df_error: &datafusion_common::DataFusionError,
) -> Self {
let translated = laminar_sql::error::translate_datafusion_error(&df_error.to_string());
Self::QueryPipeline {
context: context.into(),
translated: translated.to_string(),
}
}
pub fn query_pipeline_with_columns(
context: impl Into<String>,
df_error: &datafusion_common::DataFusionError,
available_columns: &[&str],
) -> Self {
let translated = laminar_sql::error::translate_datafusion_error_with_context(
&df_error.to_string(),
Some(available_columns),
);
Self::QueryPipeline {
context: context.into(),
translated: translated.to_string(),
}
}
pub fn query_pipeline_arrow(
context: impl Into<String>,
arrow_error: &arrow::error::ArrowError,
) -> Self {
let translated = laminar_sql::error::translate_datafusion_error(&arrow_error.to_string());
Self::QueryPipeline {
context: context.into(),
translated: translated.to_string(),
}
}
#[must_use]
pub fn code(&self) -> &'static str {
match self {
Self::Sql(_) | Self::SqlParse(_) => error_codes::SQL_UNSUPPORTED,
Self::Engine(_) | Self::Streaming(_) => error_codes::INTERNAL,
Self::DataFusion(_) => error_codes::QUERY_EXECUTION_FAILED,
Self::SourceNotFound(_) => error_codes::SOURCE_NOT_FOUND,
Self::SinkNotFound(_) => error_codes::SINK_NOT_FOUND,
Self::QueryNotFound(_) | Self::StreamNotFound(_) | Self::TableNotFound(_) => {
error_codes::SQL_TABLE_NOT_FOUND
}
Self::SourceAlreadyExists(_)
| Self::StreamAlreadyExists(_)
| Self::TableAlreadyExists(_) => error_codes::SOURCE_ALREADY_EXISTS,
Self::SinkAlreadyExists(_) => error_codes::SINK_ALREADY_EXISTS,
Self::InsertError(_) => error_codes::CONNECTOR_WRITE_ERROR,
Self::SchemaMismatch(_) => error_codes::SCHEMA_MISMATCH,
Self::InvalidOperation(_) | Self::Unsupported(_) => error_codes::INVALID_OPERATION,
Self::Shutdown => error_codes::SHUTDOWN,
Self::Checkpoint(_) | Self::CheckpointStore(_) => error_codes::CHECKPOINT_FAILED,
Self::UnresolvedConfigVar(_) => error_codes::UNRESOLVED_CONFIG_VAR,
Self::Connector(_) | Self::ConnectorOp(_) => error_codes::CONNECTOR_CONNECTION_FAILED,
Self::Pipeline(_) => error_codes::PIPELINE_ERROR,
Self::QueryPipeline { .. } => error_codes::QUERY_PIPELINE_ERROR,
Self::MaterializedView(_) => error_codes::MATERIALIZED_VIEW_ERROR,
Self::Storage(_) => error_codes::WAL_ERROR,
Self::Config(_) => error_codes::INVALID_CONFIG,
}
}
#[must_use]
pub fn is_transient(&self) -> bool {
match self {
Self::Streaming(_)
| Self::Connector(_)
| Self::Checkpoint(_)
| Self::CheckpointStore(_) => true,
Self::ConnectorOp(e) => e.is_transient(),
_ => false,
}
}
}
impl std::fmt::Display for DbError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Sql(e) => write!(f, "SQL error: {e}"),
Self::Engine(e) => write!(f, "Engine error: {e}"),
Self::Streaming(e) => write!(f, "Streaming error: {e}"),
Self::DataFusion(e) => {
let translated = laminar_sql::error::translate_datafusion_error(&e.to_string());
write!(f, "{translated}")
}
Self::SourceNotFound(name) => {
write!(f, "[{}] Source '{name}' not found", self.code())
}
Self::SinkNotFound(name) => {
write!(f, "[{}] Sink '{name}' not found", self.code())
}
Self::QueryNotFound(name) => {
write!(f, "[{}] Query '{name}' not found", self.code())
}
Self::SourceAlreadyExists(name) => {
write!(f, "[{}] Source '{name}' already exists", self.code())
}
Self::SinkAlreadyExists(name) => {
write!(f, "[{}] Sink '{name}' already exists", self.code())
}
Self::StreamNotFound(name) => {
write!(f, "[{}] Stream '{name}' not found", self.code())
}
Self::StreamAlreadyExists(name) => {
write!(f, "[{}] Stream '{name}' already exists", self.code())
}
Self::TableNotFound(name) => {
write!(f, "[{}] Table '{name}' not found", self.code())
}
Self::TableAlreadyExists(name) => {
write!(f, "[{}] Table '{name}' already exists", self.code())
}
Self::InsertError(msg) => {
write!(f, "[{}] Insert error: {msg}", self.code())
}
Self::SchemaMismatch(msg) => {
write!(f, "[{}] Schema mismatch: {msg}", self.code())
}
Self::InvalidOperation(msg) => {
write!(f, "[{}] Invalid operation: {msg}", self.code())
}
Self::SqlParse(e) => write!(f, "SQL parse error: {e}"),
Self::Shutdown => {
write!(f, "[{}] Database is shut down", self.code())
}
Self::Checkpoint(msg) => {
write!(f, "[{}] Checkpoint error: {msg}", self.code())
}
Self::CheckpointStore(e) => {
write!(f, "[{}] Checkpoint store error: {e}", self.code())
}
Self::UnresolvedConfigVar(msg) => {
write!(f, "[{}] Unresolved config variable: {msg}", self.code())
}
Self::Connector(msg) => {
write!(f, "[{}] Connector error: {msg}", self.code())
}
Self::ConnectorOp(e) => {
write!(f, "[{}] Connector error: {e}", self.code())
}
Self::Pipeline(msg) => {
write!(f, "[{}] Pipeline error: {msg}", self.code())
}
Self::QueryPipeline {
context,
translated,
} => write!(f, "Stream '{context}': {translated}"),
Self::MaterializedView(msg) => {
write!(f, "[{}] Materialized view error: {msg}", self.code())
}
Self::Storage(msg) => {
write!(f, "[{}] Storage error: {msg}", self.code())
}
Self::Config(msg) => {
write!(f, "[{}] Config error: {msg}", self.code())
}
Self::Unsupported(msg) => {
write!(f, "[{}] Unsupported: {msg}", self.code())
}
}
}
}