use thiserror::Error;
pub mod codes {
pub const CONNECTION_FAILED: i32 = 100;
pub const CONNECTION_CLOSED: i32 = 101;
pub const CONNECTION_IN_USE: i32 = 102;
pub const TABLE_NOT_FOUND: i32 = 200;
pub const TABLE_EXISTS: i32 = 201;
pub const SCHEMA_MISMATCH: i32 = 202;
pub const INVALID_SCHEMA: i32 = 203;
pub const INGESTION_FAILED: i32 = 300;
pub const WRITER_CLOSED: i32 = 301;
pub const BATCH_SCHEMA_MISMATCH: i32 = 302;
pub const QUERY_FAILED: i32 = 400;
pub const SQL_PARSE_ERROR: i32 = 401;
pub const QUERY_CANCELLED: i32 = 402;
pub const SUBSCRIPTION_FAILED: i32 = 500;
pub const SUBSCRIPTION_CLOSED: i32 = 501;
pub const SUBSCRIPTION_TIMEOUT: i32 = 502;
pub const INTERNAL_ERROR: i32 = 900;
pub const SHUTDOWN: i32 = 901;
}
#[derive(Debug, Clone, Error)]
pub enum ApiError {
#[error("Connection error ({code}): {message}")]
Connection {
code: i32,
message: String,
},
#[error("Schema error ({code}): {message}")]
Schema {
code: i32,
message: String,
},
#[error("Ingestion error ({code}): {message}")]
Ingestion {
code: i32,
message: String,
},
#[error("Query error ({code}): {message}")]
Query {
code: i32,
message: String,
},
#[error("Subscription error ({code}): {message}")]
Subscription {
code: i32,
message: String,
},
#[error("Internal error ({code}): {message}")]
Internal {
code: i32,
message: String,
},
}
macro_rules! api_error_msg {
($name:ident, $variant:ident, $code:expr) => {
pub fn $name(message: impl Into<String>) -> Self {
Self::$variant {
code: $code,
message: message.into(),
}
}
};
}
macro_rules! api_error_fixed {
($name:ident, $variant:ident, $code:expr, $msg:literal) => {
#[must_use]
pub fn $name() -> Self {
Self::$variant {
code: $code,
message: $msg.into(),
}
}
};
}
impl ApiError {
#[must_use]
pub fn code(&self) -> i32 {
match self {
Self::Connection { code, .. }
| Self::Schema { code, .. }
| Self::Ingestion { code, .. }
| Self::Query { code, .. }
| Self::Subscription { code, .. }
| Self::Internal { code, .. } => *code,
}
}
#[must_use]
pub fn message(&self) -> &str {
match self {
Self::Connection { message, .. }
| Self::Schema { message, .. }
| Self::Ingestion { message, .. }
| Self::Query { message, .. }
| Self::Subscription { message, .. }
| Self::Internal { message, .. } => message,
}
}
api_error_msg!(connection, Connection, codes::CONNECTION_FAILED);
api_error_msg!(schema_mismatch, Schema, codes::SCHEMA_MISMATCH);
api_error_msg!(ingestion, Ingestion, codes::INGESTION_FAILED);
api_error_msg!(query, Query, codes::QUERY_FAILED);
api_error_msg!(sql_parse, Query, codes::SQL_PARSE_ERROR);
api_error_msg!(subscription, Subscription, codes::SUBSCRIPTION_FAILED);
api_error_msg!(internal, Internal, codes::INTERNAL_ERROR);
api_error_fixed!(
subscription_closed,
Subscription,
codes::SUBSCRIPTION_CLOSED,
"Subscription closed"
);
api_error_fixed!(
subscription_timeout,
Subscription,
codes::SUBSCRIPTION_TIMEOUT,
"Subscription timeout"
);
api_error_fixed!(shutdown, Internal, codes::SHUTDOWN, "Database is shut down");
#[must_use]
pub fn table_not_found(table: &str) -> Self {
Self::Schema {
code: codes::TABLE_NOT_FOUND,
message: format!("Table not found: {table}"),
}
}
#[must_use]
pub fn table_exists(table: &str) -> Self {
Self::Schema {
code: codes::TABLE_EXISTS,
message: format!("Table already exists: {table}"),
}
}
}
impl From<crate::DbError> for ApiError {
fn from(e: crate::DbError) -> Self {
use crate::DbError;
match e {
DbError::SourceNotFound(name)
| DbError::TableNotFound(name)
| DbError::StreamNotFound(name)
| DbError::SinkNotFound(name)
| DbError::QueryNotFound(name) => Self::table_not_found(&name),
DbError::SourceAlreadyExists(name)
| DbError::TableAlreadyExists(name)
| DbError::StreamAlreadyExists(name)
| DbError::SinkAlreadyExists(name) => Self::table_exists(&name),
DbError::SchemaMismatch(msg) => Self::schema_mismatch(msg),
DbError::InsertError(msg) => Self::ingestion(msg),
DbError::Sql(e) => Self::sql_parse(e.to_string()),
DbError::SqlParse(e) => Self::sql_parse(e.to_string()),
DbError::Streaming(e) => Self::ingestion(e.to_string()),
DbError::Shutdown => Self::shutdown(),
DbError::InvalidOperation(msg) => Self::Query {
code: codes::QUERY_FAILED,
message: msg,
},
DbError::DataFusion(e) => {
let translated = laminar_sql::error::translate_datafusion_error(&e.to_string());
Self::query(translated.to_string())
}
DbError::Engine(e) => Self::internal(format!("Engine error: {e}")),
DbError::Connector(msg) => Self::internal(format!("Connector error: {msg}")),
DbError::Pipeline(msg) => Self::internal(format!("Pipeline error: {msg}")),
DbError::QueryPipeline {
context,
translated,
} => Self::query(format!("Stream '{context}': {translated}")),
DbError::MaterializedView(msg) => {
Self::query(format!("Materialized view error: {msg}"))
}
other => Self::internal(other.to_string()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_error_codes() {
let err = ApiError::table_not_found("missing");
assert_eq!(err.code(), codes::TABLE_NOT_FOUND);
assert!(err.message().contains("missing"));
let err = ApiError::shutdown();
assert_eq!(err.code(), codes::SHUTDOWN);
}
#[test]
fn test_error_display() {
let err = ApiError::connection("failed to connect");
let s = err.to_string();
assert!(s.contains("100"));
assert!(s.contains("failed to connect"));
}
#[test]
fn test_error_conversion() {
let db_err = crate::DbError::SourceNotFound("foo".into());
let api_err: ApiError = db_err.into();
assert_eq!(api_err.code(), codes::TABLE_NOT_FOUND);
assert!(api_err.message().contains("foo"));
}
#[test]
fn test_datafusion_error_becomes_query_not_internal() {
let datafusion_err =
datafusion_common::DataFusionError::Plan("No field named 'foo'".to_string());
let db_error = crate::DbError::DataFusion(datafusion_err);
let api_err: ApiError = db_error.into();
assert_eq!(api_err.code(), codes::QUERY_FAILED);
assert!(
api_err.message().contains("foo"),
"message was: {}",
api_err.message()
);
assert!(
!api_err.message().contains("DataFusion"),
"message was: {}",
api_err.message()
);
}
#[test]
fn test_query_pipeline_error_becomes_query_not_internal() {
let db_err = crate::DbError::QueryPipeline {
context: "my_stream".to_string(),
translated: "[LDB-1100] Column 'foo' not found in query".to_string(),
};
let api_err: ApiError = db_err.into();
assert_eq!(api_err.code(), codes::QUERY_FAILED);
assert!(
api_err.message().contains("my_stream"),
"message should include stream name: {}",
api_err.message()
);
assert!(
api_err.message().contains("LDB-1100"),
"message should include error code: {}",
api_err.message()
);
}
#[test]
fn test_materialized_view_error_becomes_query() {
let db_err = crate::DbError::MaterializedView("view failed".into());
let api_err: ApiError = db_err.into();
assert_eq!(api_err.code(), codes::QUERY_FAILED);
assert!(api_err.message().contains("view failed"));
}
}