1use laminar_core::error_codes;
4
5#[derive(Debug, thiserror::Error)]
7pub enum DbError {
8 Sql(#[from] laminar_sql::Error),
10
11 Engine(#[from] laminar_core::Error),
13
14 Streaming(#[from] laminar_core::streaming::StreamingError),
16
17 DataFusion(#[from] datafusion_common::DataFusionError),
19
20 SourceNotFound(String),
22
23 SinkNotFound(String),
25
26 QueryNotFound(String),
28
29 SourceAlreadyExists(String),
31
32 SinkAlreadyExists(String),
34
35 StreamNotFound(String),
37
38 StreamAlreadyExists(String),
40
41 TableNotFound(String),
43
44 TableAlreadyExists(String),
46
47 InsertError(String),
49
50 SchemaMismatch(String),
52
53 InvalidOperation(String),
55
56 SqlParse(#[from] laminar_sql::parser::ParseError),
58
59 Shutdown,
61
62 Checkpoint(String),
64
65 UnresolvedConfigVar(String),
67
68 Connector(String),
70
71 Pipeline(String),
73
74 QueryPipeline {
77 context: String,
79 translated: String,
82 },
83
84 MaterializedView(String),
86
87 Storage(String),
89
90 Config(String),
92
93 Unsupported(String),
95}
96
97impl DbError {
98 pub fn query_pipeline(
103 context: impl Into<String>,
104 df_error: &datafusion_common::DataFusionError,
105 ) -> Self {
106 let translated = laminar_sql::error::translate_datafusion_error(&df_error.to_string());
107 Self::QueryPipeline {
108 context: context.into(),
109 translated: translated.to_string(),
110 }
111 }
112
113 pub fn query_pipeline_with_columns(
116 context: impl Into<String>,
117 df_error: &datafusion_common::DataFusionError,
118 available_columns: &[&str],
119 ) -> Self {
120 let translated = laminar_sql::error::translate_datafusion_error_with_context(
121 &df_error.to_string(),
122 Some(available_columns),
123 );
124 Self::QueryPipeline {
125 context: context.into(),
126 translated: translated.to_string(),
127 }
128 }
129
130 pub fn query_pipeline_arrow(
132 context: impl Into<String>,
133 arrow_error: &arrow::error::ArrowError,
134 ) -> Self {
135 let translated = laminar_sql::error::translate_datafusion_error(&arrow_error.to_string());
136 Self::QueryPipeline {
137 context: context.into(),
138 translated: translated.to_string(),
139 }
140 }
141
142 #[must_use]
147 pub fn code(&self) -> &'static str {
148 match self {
149 Self::Sql(_) | Self::SqlParse(_) => error_codes::SQL_UNSUPPORTED,
150 Self::Engine(_) | Self::Streaming(_) => error_codes::INTERNAL,
151 Self::DataFusion(_) => error_codes::QUERY_EXECUTION_FAILED,
152 Self::SourceNotFound(_) => error_codes::SOURCE_NOT_FOUND,
153 Self::SinkNotFound(_) => error_codes::SINK_NOT_FOUND,
154 Self::QueryNotFound(_) | Self::StreamNotFound(_) | Self::TableNotFound(_) => {
155 error_codes::SQL_TABLE_NOT_FOUND
156 }
157 Self::SourceAlreadyExists(_)
158 | Self::StreamAlreadyExists(_)
159 | Self::TableAlreadyExists(_) => error_codes::SOURCE_ALREADY_EXISTS,
160 Self::SinkAlreadyExists(_) => error_codes::SINK_ALREADY_EXISTS,
161 Self::InsertError(_) => error_codes::CONNECTOR_WRITE_ERROR,
162 Self::SchemaMismatch(_) => error_codes::SCHEMA_MISMATCH,
163 Self::InvalidOperation(_) | Self::Unsupported(_) => error_codes::INVALID_OPERATION,
164 Self::Shutdown => error_codes::SHUTDOWN,
165 Self::Checkpoint(_) => error_codes::CHECKPOINT_FAILED,
166 Self::UnresolvedConfigVar(_) => error_codes::UNRESOLVED_CONFIG_VAR,
167 Self::Connector(_) => error_codes::CONNECTOR_CONNECTION_FAILED,
168 Self::Pipeline(_) => error_codes::PIPELINE_ERROR,
169 Self::QueryPipeline { .. } => error_codes::QUERY_PIPELINE_ERROR,
170 Self::MaterializedView(_) => error_codes::MATERIALIZED_VIEW_ERROR,
171 Self::Storage(_) => error_codes::WAL_ERROR,
172 Self::Config(_) => error_codes::INVALID_CONFIG,
173 }
174 }
175
176 #[must_use]
178 pub fn is_transient(&self) -> bool {
179 matches!(
180 self,
181 Self::Streaming(_) | Self::Connector(_) | Self::Checkpoint(_)
182 )
183 }
184}
185
186impl std::fmt::Display for DbError {
187 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188 match self {
189 Self::Sql(e) => write!(f, "SQL error: {e}"),
190 Self::Engine(e) => write!(f, "Engine error: {e}"),
191 Self::Streaming(e) => write!(f, "Streaming error: {e}"),
192 Self::DataFusion(e) => {
193 let translated = laminar_sql::error::translate_datafusion_error(&e.to_string());
194 write!(f, "{translated}")
195 }
196 Self::SourceNotFound(name) => {
197 write!(f, "[{}] Source '{name}' not found", self.code())
198 }
199 Self::SinkNotFound(name) => {
200 write!(f, "[{}] Sink '{name}' not found", self.code())
201 }
202 Self::QueryNotFound(name) => {
203 write!(f, "[{}] Query '{name}' not found", self.code())
204 }
205 Self::SourceAlreadyExists(name) => {
206 write!(f, "[{}] Source '{name}' already exists", self.code())
207 }
208 Self::SinkAlreadyExists(name) => {
209 write!(f, "[{}] Sink '{name}' already exists", self.code())
210 }
211 Self::StreamNotFound(name) => {
212 write!(f, "[{}] Stream '{name}' not found", self.code())
213 }
214 Self::StreamAlreadyExists(name) => {
215 write!(f, "[{}] Stream '{name}' already exists", self.code())
216 }
217 Self::TableNotFound(name) => {
218 write!(f, "[{}] Table '{name}' not found", self.code())
219 }
220 Self::TableAlreadyExists(name) => {
221 write!(f, "[{}] Table '{name}' already exists", self.code())
222 }
223 Self::InsertError(msg) => {
224 write!(f, "[{}] Insert error: {msg}", self.code())
225 }
226 Self::SchemaMismatch(msg) => {
227 write!(f, "[{}] Schema mismatch: {msg}", self.code())
228 }
229 Self::InvalidOperation(msg) => {
230 write!(f, "[{}] Invalid operation: {msg}", self.code())
231 }
232 Self::SqlParse(e) => write!(f, "SQL parse error: {e}"),
233 Self::Shutdown => {
234 write!(f, "[{}] Database is shut down", self.code())
235 }
236 Self::Checkpoint(msg) => {
237 write!(f, "[{}] Checkpoint error: {msg}", self.code())
238 }
239 Self::UnresolvedConfigVar(msg) => {
240 write!(f, "[{}] Unresolved config variable: {msg}", self.code())
241 }
242 Self::Connector(msg) => {
243 write!(f, "[{}] Connector error: {msg}", self.code())
244 }
245 Self::Pipeline(msg) => {
246 write!(f, "[{}] Pipeline error: {msg}", self.code())
247 }
248 Self::QueryPipeline {
249 context,
250 translated,
251 } => write!(f, "Stream '{context}': {translated}"),
252 Self::MaterializedView(msg) => {
253 write!(f, "[{}] Materialized view error: {msg}", self.code())
254 }
255 Self::Storage(msg) => {
256 write!(f, "[{}] Storage error: {msg}", self.code())
257 }
258 Self::Config(msg) => {
259 write!(f, "[{}] Config error: {msg}", self.code())
260 }
261 Self::Unsupported(msg) => {
262 write!(f, "[{}] Unsupported: {msg}", self.code())
263 }
264 }
265 }
266}