Skip to main content

laminar_db/
error.rs

1//! Error types for the `LaminarDB` facade.
2
3use laminar_core::error_codes;
4
5/// Errors from database operations.
6#[derive(Debug, thiserror::Error)]
7pub enum DbError {
8    /// SQL parse error
9    Sql(#[from] laminar_sql::Error),
10
11    /// Core engine error
12    Engine(#[from] laminar_core::Error),
13
14    /// Streaming API error
15    Streaming(#[from] laminar_core::streaming::StreamingError),
16
17    /// `DataFusion` error (translated to user-friendly messages on display)
18    DataFusion(#[from] datafusion_common::DataFusionError),
19
20    /// Source not found
21    SourceNotFound(String),
22
23    /// Sink not found
24    SinkNotFound(String),
25
26    /// Query not found
27    QueryNotFound(String),
28
29    /// Source already exists
30    SourceAlreadyExists(String),
31
32    /// Sink already exists
33    SinkAlreadyExists(String),
34
35    /// Stream not found
36    StreamNotFound(String),
37
38    /// Stream already exists
39    StreamAlreadyExists(String),
40
41    /// Table not found
42    TableNotFound(String),
43
44    /// Table already exists
45    TableAlreadyExists(String),
46
47    /// Insert error
48    InsertError(String),
49
50    /// Schema mismatch between Rust type and SQL definition
51    SchemaMismatch(String),
52
53    /// Invalid SQL statement for the operation
54    InvalidOperation(String),
55
56    /// SQL parse error (from streaming parser)
57    SqlParse(#[from] laminar_sql::parser::ParseError),
58
59    /// Database is shut down
60    Shutdown,
61
62    /// Checkpoint error
63    Checkpoint(String),
64
65    /// Unresolved config variable
66    UnresolvedConfigVar(String),
67
68    /// Connector error
69    Connector(String),
70
71    /// Pipeline error (start/shutdown lifecycle)
72    Pipeline(String),
73
74    /// Query pipeline error — wraps a `DataFusion` error with stream context.
75    /// Unlike `Pipeline`, this variant is translated to user-friendly messages.
76    QueryPipeline {
77        /// The stream or query name where the error occurred.
78        context: String,
79        /// The translated error message (already processed through
80        /// `translate_datafusion_error`).
81        translated: String,
82    },
83
84    /// Materialized view error
85    MaterializedView(String),
86
87    /// Storage backend error.
88    Storage(String),
89
90    /// Configuration / profile validation error
91    Config(String),
92
93    /// Operation is not yet implemented.
94    Unsupported(String),
95}
96
97impl DbError {
98    /// Create a `QueryPipeline` error from a `DataFusion` error with stream context.
99    ///
100    /// The `DataFusion` error is translated to a user-friendly message with
101    /// structured error codes. The raw `DataFusion` internals are never exposed.
102    pub fn query_pipeline(
103        context: impl Into<String>,
104        df_error: &datafusion_common::DataFusionError,
105    ) -> Self {
106        let translated = laminar_sql::error::translate_datafusion_error(&df_error.to_string());
107        Self::QueryPipeline {
108            context: context.into(),
109            translated: translated.to_string(),
110        }
111    }
112
113    /// Create a `QueryPipeline` error from a `DataFusion` error with stream
114    /// context and available column names for typo suggestions.
115    pub fn query_pipeline_with_columns(
116        context: impl Into<String>,
117        df_error: &datafusion_common::DataFusionError,
118        available_columns: &[&str],
119    ) -> Self {
120        let translated = laminar_sql::error::translate_datafusion_error_with_context(
121            &df_error.to_string(),
122            Some(available_columns),
123        );
124        Self::QueryPipeline {
125            context: context.into(),
126            translated: translated.to_string(),
127        }
128    }
129
130    /// Create a `QueryPipeline` error from an Arrow error with stream context.
131    pub fn query_pipeline_arrow(
132        context: impl Into<String>,
133        arrow_error: &arrow::error::ArrowError,
134    ) -> Self {
135        let translated = laminar_sql::error::translate_datafusion_error(&arrow_error.to_string());
136        Self::QueryPipeline {
137            context: context.into(),
138            translated: translated.to_string(),
139        }
140    }
141
142    /// Returns the structured `LDB-NNNN` error code for this error.
143    ///
144    /// Every `DbError` variant maps to a stable error code that can be used
145    /// for programmatic handling, log searching, and metrics.
146    #[must_use]
147    pub fn code(&self) -> &'static str {
148        match self {
149            Self::Sql(_) | Self::SqlParse(_) => error_codes::SQL_UNSUPPORTED,
150            Self::Engine(_) | Self::Streaming(_) => error_codes::INTERNAL,
151            Self::DataFusion(_) => error_codes::QUERY_EXECUTION_FAILED,
152            Self::SourceNotFound(_) => error_codes::SOURCE_NOT_FOUND,
153            Self::SinkNotFound(_) => error_codes::SINK_NOT_FOUND,
154            Self::QueryNotFound(_) | Self::StreamNotFound(_) | Self::TableNotFound(_) => {
155                error_codes::SQL_TABLE_NOT_FOUND
156            }
157            Self::SourceAlreadyExists(_)
158            | Self::StreamAlreadyExists(_)
159            | Self::TableAlreadyExists(_) => error_codes::SOURCE_ALREADY_EXISTS,
160            Self::SinkAlreadyExists(_) => error_codes::SINK_ALREADY_EXISTS,
161            Self::InsertError(_) => error_codes::CONNECTOR_WRITE_ERROR,
162            Self::SchemaMismatch(_) => error_codes::SCHEMA_MISMATCH,
163            Self::InvalidOperation(_) | Self::Unsupported(_) => error_codes::INVALID_OPERATION,
164            Self::Shutdown => error_codes::SHUTDOWN,
165            Self::Checkpoint(_) => error_codes::CHECKPOINT_FAILED,
166            Self::UnresolvedConfigVar(_) => error_codes::UNRESOLVED_CONFIG_VAR,
167            Self::Connector(_) => error_codes::CONNECTOR_CONNECTION_FAILED,
168            Self::Pipeline(_) => error_codes::PIPELINE_ERROR,
169            Self::QueryPipeline { .. } => error_codes::QUERY_PIPELINE_ERROR,
170            Self::MaterializedView(_) => error_codes::MATERIALIZED_VIEW_ERROR,
171            Self::Storage(_) => error_codes::WAL_ERROR,
172            Self::Config(_) => error_codes::INVALID_CONFIG,
173        }
174    }
175
176    /// Whether this error is transient (retryable).
177    #[must_use]
178    pub fn is_transient(&self) -> bool {
179        matches!(
180            self,
181            Self::Streaming(_) | Self::Connector(_) | Self::Checkpoint(_)
182        )
183    }
184}
185
186impl std::fmt::Display for DbError {
187    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188        match self {
189            Self::Sql(e) => write!(f, "SQL error: {e}"),
190            Self::Engine(e) => write!(f, "Engine error: {e}"),
191            Self::Streaming(e) => write!(f, "Streaming error: {e}"),
192            Self::DataFusion(e) => {
193                let translated = laminar_sql::error::translate_datafusion_error(&e.to_string());
194                write!(f, "{translated}")
195            }
196            Self::SourceNotFound(name) => {
197                write!(f, "[{}] Source '{name}' not found", self.code())
198            }
199            Self::SinkNotFound(name) => {
200                write!(f, "[{}] Sink '{name}' not found", self.code())
201            }
202            Self::QueryNotFound(name) => {
203                write!(f, "[{}] Query '{name}' not found", self.code())
204            }
205            Self::SourceAlreadyExists(name) => {
206                write!(f, "[{}] Source '{name}' already exists", self.code())
207            }
208            Self::SinkAlreadyExists(name) => {
209                write!(f, "[{}] Sink '{name}' already exists", self.code())
210            }
211            Self::StreamNotFound(name) => {
212                write!(f, "[{}] Stream '{name}' not found", self.code())
213            }
214            Self::StreamAlreadyExists(name) => {
215                write!(f, "[{}] Stream '{name}' already exists", self.code())
216            }
217            Self::TableNotFound(name) => {
218                write!(f, "[{}] Table '{name}' not found", self.code())
219            }
220            Self::TableAlreadyExists(name) => {
221                write!(f, "[{}] Table '{name}' already exists", self.code())
222            }
223            Self::InsertError(msg) => {
224                write!(f, "[{}] Insert error: {msg}", self.code())
225            }
226            Self::SchemaMismatch(msg) => {
227                write!(f, "[{}] Schema mismatch: {msg}", self.code())
228            }
229            Self::InvalidOperation(msg) => {
230                write!(f, "[{}] Invalid operation: {msg}", self.code())
231            }
232            Self::SqlParse(e) => write!(f, "SQL parse error: {e}"),
233            Self::Shutdown => {
234                write!(f, "[{}] Database is shut down", self.code())
235            }
236            Self::Checkpoint(msg) => {
237                write!(f, "[{}] Checkpoint error: {msg}", self.code())
238            }
239            Self::UnresolvedConfigVar(msg) => {
240                write!(f, "[{}] Unresolved config variable: {msg}", self.code())
241            }
242            Self::Connector(msg) => {
243                write!(f, "[{}] Connector error: {msg}", self.code())
244            }
245            Self::Pipeline(msg) => {
246                write!(f, "[{}] Pipeline error: {msg}", self.code())
247            }
248            Self::QueryPipeline {
249                context,
250                translated,
251            } => write!(f, "Stream '{context}': {translated}"),
252            Self::MaterializedView(msg) => {
253                write!(f, "[{}] Materialized view error: {msg}", self.code())
254            }
255            Self::Storage(msg) => {
256                write!(f, "[{}] Storage error: {msg}", self.code())
257            }
258            Self::Config(msg) => {
259                write!(f, "[{}] Config error: {msg}", self.code())
260            }
261            Self::Unsupported(msg) => {
262                write!(f, "[{}] Unsupported: {msg}", self.code())
263            }
264        }
265    }
266}