Skip to main content

laminar_sql/error/
mod.rs

1//! Error translation layer for DataFusion errors.
2//!
3//! Translates internal DataFusion/Arrow error messages into user-friendly
4//! LaminarDB errors with structured error codes (`LDB-NNNN`) and hints.
5//!
6//! # Error Code Ranges
7//!
8//! | Range | Category |
9//! |-------|----------|
10//! | `LDB-1001`..`LDB-1099` | SQL syntax errors |
11//! | `LDB-1100`..`LDB-1199` | Schema / column errors |
12//! | `LDB-1200`..`LDB-1299` | Type errors |
13//! | `LDB-2000`..`LDB-2099` | Window / watermark errors |
14//! | `LDB-3000`..`LDB-3099` | Join errors |
15//! | `LDB-9000`..`LDB-9099` | Internal errors |
16
17pub mod suggest;
18
19use suggest::closest_match;
20
21/// Structured error code constants.
22pub mod codes {
23    /// Unsupported SQL syntax.
24    pub const UNSUPPORTED_SQL: &str = "LDB-1001";
25    /// Query planning failed.
26    pub const PLANNING_FAILED: &str = "LDB-1002";
27    /// Column not found.
28    pub const COLUMN_NOT_FOUND: &str = "LDB-1100";
29    /// Table or source not found.
30    pub const TABLE_NOT_FOUND: &str = "LDB-1101";
31    /// Type mismatch.
32    pub const TYPE_MISMATCH: &str = "LDB-1200";
33
34    // Window / watermark errors (2000-2099)
35    /// Watermark required for this operation.
36    pub const WATERMARK_REQUIRED: &str = "LDB-2001";
37    /// Invalid window specification.
38    pub const WINDOW_INVALID: &str = "LDB-2002";
39    /// Window size must be positive.
40    pub const WINDOW_SIZE_INVALID: &str = "LDB-2003";
41    /// Late data rejected by window policy.
42    pub const LATE_DATA_REJECTED: &str = "LDB-2004";
43
44    // Join errors (3000-3099)
45    /// Join key column not found or invalid.
46    pub const JOIN_KEY_MISSING: &str = "LDB-3001";
47    /// Time bound required for stream-stream join.
48    pub const JOIN_TIME_BOUND_MISSING: &str = "LDB-3002";
49    /// Temporal join requires a primary key on the right-side table.
50    pub const TEMPORAL_JOIN_NO_PK: &str = "LDB-3003";
51    /// Unsupported join type for streaming queries.
52    pub const JOIN_TYPE_UNSUPPORTED: &str = "LDB-3004";
53
54    /// Internal query error (unrecognized pattern).
55    pub const INTERNAL: &str = "LDB-9000";
56    /// Query execution failed.
57    pub const EXECUTION_FAILED: &str = "LDB-9001";
58}
59
60/// A translated error with structured code, message, and optional hint.
61#[derive(Debug, Clone)]
62pub struct TranslatedError {
63    /// Structured error code (e.g. `"LDB-1100"`).
64    pub code: &'static str,
65    /// User-friendly error message.
66    pub message: String,
67    /// Optional hint for fixing the error.
68    pub hint: Option<String>,
69}
70
71impl std::fmt::Display for TranslatedError {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        write!(f, "[{}] {}", self.code, self.message)?;
74        if let Some(hint) = &self.hint {
75            write!(f, " (hint: {hint})")?;
76        }
77        Ok(())
78    }
79}
80
81/// Suggests a column name correction based on edit distance.
82///
83/// Returns a `"Did you mean '...'?"` string if a close match is found
84/// within 2 edits.
85#[must_use]
86pub fn suggest_column(input: &str, available: &[&str]) -> Option<String> {
87    closest_match(input, available, 2).map(|m| format!("Did you mean '{m}'?"))
88}
89
90// ---------------------------------------------------------------------------
91// Internal helpers
92// ---------------------------------------------------------------------------
93
94/// Strips known DataFusion/Arrow prefixes from an error message.
95fn sanitize(msg: &str) -> &str {
96    const PREFIXES: &[&str] = &[
97        "DataFusion error: ",
98        "Arrow error: ",
99        "Schema error: ",
100        "External error: ",
101    ];
102    let mut s = msg;
103    for prefix in PREFIXES {
104        if let Some(rest) = s.strip_prefix(prefix) {
105            s = rest;
106        }
107    }
108    s
109}
110
111/// Extracts a quoted name from a `"No field named ..."` message.
112fn extract_missing_column(msg: &str) -> Option<&str> {
113    let needle = "No field named ";
114    let idx = msg.find(needle)?;
115    let after = &msg[idx + needle.len()..];
116    extract_quoted(after)
117}
118
119/// Extracts a table name from a `"table '...' not found"` message.
120fn extract_missing_table(msg: &str) -> Option<&str> {
121    // DataFusion uses lowercase "table" in its messages
122    let lower = msg.to_ascii_lowercase();
123    let needle = "table '";
124    let idx = lower.find(needle)?;
125    let after = &msg[idx + needle.len()..];
126    after.find('\'').map(|end| &after[..end])
127}
128
129/// Extracts content from single or double quotes at the start of a string.
130fn extract_quoted(s: &str) -> Option<&str> {
131    if let Some(rest) = s.strip_prefix('\'') {
132        rest.find('\'').map(|end| &rest[..end])
133    } else if let Some(rest) = s.strip_prefix('"') {
134        rest.find('"').map(|end| &rest[..end])
135    } else {
136        // Bare word — up to whitespace or punctuation
137        let end = s.find(|c: char| c.is_whitespace() || c == ',' || c == ')');
138        let word = match end {
139            Some(i) => &s[..i],
140            None => s,
141        };
142        if word.is_empty() {
143            None
144        } else {
145            Some(word)
146        }
147    }
148}
149
150/// Translates a DataFusion error message into a user-friendly [`TranslatedError`].
151///
152/// Pattern-matches known DataFusion error formats and rewrites them with
153/// structured error codes and helpful messages. Unrecognized patterns fall
154/// back to `LDB-9000` with the message sanitized (internal prefixes stripped).
155///
156/// When `available_columns` is provided, column-not-found errors include a
157/// "Did you mean '...'?" hint based on edit distance.
158#[must_use]
159pub fn translate_datafusion_error(msg: &str) -> TranslatedError {
160    translate_datafusion_error_with_context(msg, None)
161}
162
163/// Like [`translate_datafusion_error`] but accepts an optional list of
164/// available column names for typo suggestions.
165#[must_use]
166pub fn translate_datafusion_error_with_context(
167    msg: &str,
168    available_columns: Option<&[&str]>,
169) -> TranslatedError {
170    let clean = sanitize(msg);
171
172    // Column not found
173    if let Some(col) = extract_missing_column(clean) {
174        let hint = available_columns.and_then(|cols| suggest_column(col, cols));
175        return TranslatedError {
176            code: codes::COLUMN_NOT_FOUND,
177            message: format!("Column '{col}' not found in query"),
178            hint,
179        };
180    }
181
182    // Table not found
183    if let Some(table) = extract_missing_table(clean) {
184        return TranslatedError {
185            code: codes::TABLE_NOT_FOUND,
186            message: format!("Table or source '{table}' not found"),
187            hint: Some("Use SHOW TABLES to see available sources and tables".to_string()),
188        };
189    }
190
191    // Type mismatch
192    if clean.contains("mismatch")
193        || clean.contains("must match")
194        || clean.contains("cannot be cast")
195    {
196        return TranslatedError {
197            code: codes::TYPE_MISMATCH,
198            message: format!("Type mismatch: {clean}"),
199            hint: Some("Check column types with DESCRIBE <table>".to_string()),
200        };
201    }
202
203    // Window / watermark errors
204    if let Some(translated) = check_window_errors(clean) {
205        return translated;
206    }
207
208    // Join errors
209    if let Some(translated) = check_join_errors(clean) {
210        return translated;
211    }
212
213    // Unsupported / not implemented
214    if clean.contains("Unsupported")
215        || clean.contains("NotImplemented")
216        || clean.contains("This feature is not implemented")
217    {
218        return TranslatedError {
219            code: codes::UNSUPPORTED_SQL,
220            message: format!("Unsupported SQL syntax: {clean}"),
221            hint: None,
222        };
223    }
224
225    // Planning error — Plan("...")
226    if clean.starts_with("Plan(\"") {
227        let detail = clean
228            .strip_prefix("Plan(\"")
229            .and_then(|s| s.strip_suffix("\")"))
230            .unwrap_or(clean);
231        return TranslatedError {
232            code: codes::PLANNING_FAILED,
233            message: format!("Query planning failed: {detail}"),
234            hint: None,
235        };
236    }
237
238    // Planning error — "Error during planning"
239    if clean.contains("Error during planning") {
240        return TranslatedError {
241            code: codes::PLANNING_FAILED,
242            message: format!("Query planning failed: {clean}"),
243            hint: None,
244        };
245    }
246
247    // Execution error
248    if clean.contains("Execution error") {
249        return TranslatedError {
250            code: codes::EXECUTION_FAILED,
251            message: format!("Query execution failed: {clean}"),
252            hint: None,
253        };
254    }
255
256    // Fallback — unknown pattern
257    TranslatedError {
258        code: codes::INTERNAL,
259        message: format!("Internal query error: {clean}"),
260        hint: Some("If this persists, file a bug report".to_string()),
261    }
262}
263
264/// Check for window/watermark-related error patterns.
265fn check_window_errors(clean: &str) -> Option<TranslatedError> {
266    let lower = clean.to_ascii_lowercase();
267
268    // "Window error:" prefix from parser — classify as WINDOW_INVALID
269    if lower.starts_with("window error:") {
270        return Some(TranslatedError {
271            code: codes::WINDOW_INVALID,
272            message: format!("Invalid window specification: {clean}"),
273            hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
274        });
275    }
276
277    if lower.contains("watermark") && (lower.contains("required") || lower.contains("missing")) {
278        return Some(TranslatedError {
279            code: codes::WATERMARK_REQUIRED,
280            message: format!("Watermark required: {clean}"),
281            hint: Some(
282                "Add WATERMARK FOR <column> AS <column> - INTERVAL '<n>' SECOND \
283                 to the CREATE SOURCE statement"
284                    .to_string(),
285            ),
286        });
287    }
288
289    if lower.contains("window") && (lower.contains("invalid") || lower.contains("not supported")) {
290        return Some(TranslatedError {
291            code: codes::WINDOW_INVALID,
292            message: format!("Invalid window specification: {clean}"),
293            hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
294        });
295    }
296
297    if lower.contains("window")
298        && lower.contains("size")
299        && (lower.contains("zero") || lower.contains("negative") || lower.contains("positive"))
300    {
301        return Some(TranslatedError {
302            code: codes::WINDOW_SIZE_INVALID,
303            message: format!("Invalid window size: {clean}"),
304            hint: Some("Window size must be a positive interval".to_string()),
305        });
306    }
307
308    // Late data rejected/dropped
309    if lower.contains("late")
310        && (lower.contains("data") || lower.contains("event"))
311        && (lower.contains("rejected") || lower.contains("dropped"))
312    {
313        return Some(TranslatedError {
314            code: codes::LATE_DATA_REJECTED,
315            message: format!("Late data rejected: {clean}"),
316            hint: Some(
317                "Increase the allowed lateness with ALLOWED LATENESS INTERVAL, \
318                 or route late data to a side output"
319                    .to_string(),
320            ),
321        });
322    }
323
324    None
325}
326
327/// Check for join-related error patterns.
328fn check_join_errors(clean: &str) -> Option<TranslatedError> {
329    let lower = clean.to_ascii_lowercase();
330
331    // "Streaming SQL error:" prefix — classify sub-patterns
332    if lower.starts_with("streaming sql error:") {
333        if lower.contains("using clause requires") {
334            return Some(TranslatedError {
335                code: codes::JOIN_KEY_MISSING,
336                message: format!("Join key error: {clean}"),
337                hint: Some(
338                    "Ensure the USING clause references columns that exist \
339                     in both sides of the join"
340                        .to_string(),
341                ),
342            });
343        }
344        if lower.contains("cannot extract time bound") || lower.contains("tolerance") {
345            return Some(TranslatedError {
346                code: codes::JOIN_TIME_BOUND_MISSING,
347                message: format!("Join time bound required: {clean}"),
348                hint: Some(
349                    "Stream-stream joins require a time bound in the ON clause, e.g.: \
350                     AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
351                        .to_string(),
352                ),
353            });
354        }
355    }
356
357    if lower.contains("join") && lower.contains("key") && lower.contains("not found") {
358        return Some(TranslatedError {
359            code: codes::JOIN_KEY_MISSING,
360            message: format!("Join key error: {clean}"),
361            hint: Some(
362                "Ensure the ON clause references columns that exist \
363                 in both sides of the join"
364                    .to_string(),
365            ),
366        });
367    }
368
369    if lower.contains("join")
370        && (lower.contains("time bound") || lower.contains("interval"))
371        && lower.contains("required")
372    {
373        return Some(TranslatedError {
374            code: codes::JOIN_TIME_BOUND_MISSING,
375            message: format!("Join time bound required: {clean}"),
376            hint: Some(
377                "Stream-stream joins require a time bound in the ON clause, e.g.: \
378                 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
379                    .to_string(),
380            ),
381        });
382    }
383
384    if lower.contains("temporal") && lower.contains("primary key") {
385        return Some(TranslatedError {
386            code: codes::TEMPORAL_JOIN_NO_PK,
387            message: format!("Temporal join error: {clean}"),
388            hint: Some(
389                "The right-side table of a temporal join requires a PRIMARY KEY".to_string(),
390            ),
391        });
392    }
393
394    // Unsupported join types for streaming
395    if (lower.contains("not supported for streaming")
396        || lower.contains("natural join not supported")
397        || lower.contains("cross join not supported")
398        || lower.contains("unsupported join"))
399        && lower.contains("join")
400    {
401        return Some(TranslatedError {
402            code: codes::JOIN_TYPE_UNSUPPORTED,
403            message: format!("Unsupported join type: {clean}"),
404            hint: Some(
405                "Streaming queries support INNER, LEFT, RIGHT, and FULL OUTER joins \
406                 with time bounds. CROSS and NATURAL joins are not supported."
407                    .to_string(),
408            ),
409        });
410    }
411
412    None
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418
419    // -- translate_datafusion_error tests --
420
421    #[test]
422    fn test_column_not_found_single_quotes() {
423        let t = translate_datafusion_error("No field named 'foo'");
424        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
425        assert!(t.message.contains("foo"));
426        assert!(t.message.contains("not found"));
427        assert!(!t.message.contains("DataFusion"));
428    }
429
430    #[test]
431    fn test_column_not_found_double_quotes() {
432        let t = translate_datafusion_error("No field named \"bar\"");
433        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
434        assert!(t.message.contains("bar"));
435    }
436
437    #[test]
438    fn test_column_not_found_with_prefix() {
439        let t = translate_datafusion_error("Schema error: No field named 'baz'");
440        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
441        assert!(t.message.contains("baz"));
442    }
443
444    #[test]
445    fn test_table_not_found() {
446        let t = translate_datafusion_error("table 'orders' not found");
447        assert_eq!(t.code, codes::TABLE_NOT_FOUND);
448        assert!(t.message.contains("orders"));
449        assert!(t.hint.is_some());
450        assert!(t.hint.unwrap().contains("SHOW TABLES"));
451    }
452
453    #[test]
454    fn test_type_mismatch() {
455        let t = translate_datafusion_error("column types must match for UNION");
456        assert_eq!(t.code, codes::TYPE_MISMATCH);
457        assert!(t.hint.is_some());
458        assert!(t.hint.unwrap().contains("DESCRIBE"));
459    }
460
461    #[test]
462    fn test_type_cannot_cast() {
463        let t = translate_datafusion_error("cannot be cast to Int64");
464        assert_eq!(t.code, codes::TYPE_MISMATCH);
465    }
466
467    #[test]
468    fn test_unsupported_sql() {
469        let t = translate_datafusion_error("This feature is not implemented: LATERAL JOIN");
470        assert_eq!(t.code, codes::UNSUPPORTED_SQL);
471        assert!(t.message.contains("LATERAL JOIN"));
472    }
473
474    #[test]
475    fn test_plan_error_with_column_extracts_column() {
476        // When a Plan error wraps a "No field named" message, the more
477        // specific column-not-found code is preferred over generic planning.
478        let t = translate_datafusion_error("Plan(\"No field named 'x' in schema\")");
479        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
480        assert!(t.message.contains("'x'"));
481    }
482
483    #[test]
484    fn test_plan_error_generic() {
485        let t = translate_datafusion_error("Plan(\"aggregate function not found\")");
486        assert_eq!(t.code, codes::PLANNING_FAILED);
487        assert!(t.message.contains("aggregate function not found"));
488    }
489
490    #[test]
491    fn test_error_during_planning() {
492        let t = translate_datafusion_error("Error during planning: ambiguous reference 'id'");
493        assert_eq!(t.code, codes::PLANNING_FAILED);
494    }
495
496    #[test]
497    fn test_execution_error() {
498        let t = translate_datafusion_error("Execution error: division by zero");
499        assert_eq!(t.code, codes::EXECUTION_FAILED);
500    }
501
502    #[test]
503    fn test_unknown_fallback() {
504        let t = translate_datafusion_error("some totally unknown error");
505        assert_eq!(t.code, codes::INTERNAL);
506        assert!(t.message.contains("Internal query error"));
507        assert!(t.hint.is_some());
508    }
509
510    #[test]
511    fn test_prefix_stripping() {
512        let t = translate_datafusion_error("DataFusion error: Arrow error: No field named 'x'");
513        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
514        assert!(t.message.contains("'x'"));
515        assert!(!t.message.contains("DataFusion"));
516        assert!(!t.message.contains("Arrow error"));
517    }
518
519    #[test]
520    fn test_display_with_hint() {
521        let t = translate_datafusion_error("table 'foo' not found");
522        let display = t.to_string();
523        assert!(display.starts_with("[LDB-1101]"));
524        assert!(display.contains("(hint:"));
525    }
526
527    #[test]
528    fn test_display_without_hint() {
529        let t = translate_datafusion_error("Execution error: oops");
530        let display = t.to_string();
531        assert!(display.starts_with("[LDB-9001]"));
532        assert!(!display.contains("hint"));
533    }
534
535    // -- suggest_column tests --
536
537    #[test]
538    fn test_suggest_column_found() {
539        let result = suggest_column("user_ie", &["user_id", "email"]);
540        assert_eq!(result, Some("Did you mean 'user_id'?".to_string()));
541    }
542
543    #[test]
544    fn test_suggest_column_none() {
545        let result = suggest_column("xyz", &["user_id", "email"]);
546        assert_eq!(result, None);
547    }
548
549    // -- translate_datafusion_error_with_context tests --
550
551    #[test]
552    fn test_column_not_found_with_suggestion() {
553        let cols = &["user_id", "email", "price"];
554        let t = translate_datafusion_error_with_context("No field named 'user_ie'", Some(cols));
555        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
556        assert!(t.message.contains("user_ie"));
557        assert!(t.hint.is_some());
558        assert!(
559            t.hint.as_ref().unwrap().contains("user_id"),
560            "hint should suggest 'user_id': {:?}",
561            t.hint
562        );
563    }
564
565    #[test]
566    fn test_column_not_found_no_close_match() {
567        let cols = &["user_id", "email"];
568        let t = translate_datafusion_error_with_context("No field named 'zzzzz'", Some(cols));
569        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
570        assert!(t.hint.is_none());
571    }
572
573    #[test]
574    fn test_column_not_found_without_context() {
575        let t = translate_datafusion_error("No field named 'foo'");
576        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
577        assert!(t.hint.is_none()); // no columns provided
578    }
579
580    // -- window error code tests --
581
582    #[test]
583    fn test_watermark_required_error() {
584        let t = translate_datafusion_error("Watermark required for EMIT ON WINDOW CLOSE");
585        assert_eq!(t.code, codes::WATERMARK_REQUIRED);
586        assert!(t.hint.is_some());
587        assert!(t.hint.unwrap().contains("WATERMARK FOR"));
588    }
589
590    #[test]
591    fn test_window_invalid_error() {
592        let t = translate_datafusion_error("Window type not supported for this operation");
593        assert_eq!(t.code, codes::WINDOW_INVALID);
594        assert!(t.hint.unwrap().contains("TUMBLE"));
595    }
596
597    // -- join error code tests --
598
599    #[test]
600    fn test_join_key_not_found_error() {
601        let t = translate_datafusion_error("Join key 'user_id' not found in right table");
602        assert_eq!(t.code, codes::JOIN_KEY_MISSING);
603        assert!(t.hint.unwrap().contains("ON clause"));
604    }
605
606    #[test]
607    fn test_temporal_join_pk_error() {
608        let t = translate_datafusion_error("Temporal join requires a primary key on right table");
609        assert_eq!(t.code, codes::TEMPORAL_JOIN_NO_PK);
610        assert!(t.hint.unwrap().contains("PRIMARY KEY"));
611    }
612
613    // -- LDB-2004 LATE_DATA_REJECTED tests --
614
615    #[test]
616    fn test_late_data_rejected() {
617        let t = translate_datafusion_error("late data rejected by window policy");
618        assert_eq!(t.code, codes::LATE_DATA_REJECTED);
619        assert!(t.hint.unwrap().contains("lateness"));
620    }
621
622    #[test]
623    fn test_late_event_dropped() {
624        let t = translate_datafusion_error("late event dropped after window close");
625        assert_eq!(t.code, codes::LATE_DATA_REJECTED);
626    }
627
628    // -- "Window error:" prefix test --
629
630    #[test]
631    fn test_window_error_prefix() {
632        let t = translate_datafusion_error("Window error: CUMULATE requires step <= size");
633        assert_eq!(t.code, codes::WINDOW_INVALID);
634        assert!(t.hint.unwrap().contains("CUMULATE"));
635    }
636
637    // -- LDB-3004 JOIN_TYPE_UNSUPPORTED tests --
638
639    #[test]
640    fn test_join_type_unsupported_cross() {
641        let t = translate_datafusion_error("cross join not supported for streaming queries");
642        assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
643        assert!(t.hint.unwrap().contains("CROSS"));
644    }
645
646    #[test]
647    fn test_join_type_unsupported_natural() {
648        let t = translate_datafusion_error("natural join not supported in streaming context");
649        assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
650    }
651
652    // -- "Streaming SQL error:" prefix tests --
653
654    #[test]
655    fn test_streaming_sql_error_using_clause() {
656        let t = translate_datafusion_error(
657            "Streaming SQL error: using clause requires matching columns",
658        );
659        assert_eq!(t.code, codes::JOIN_KEY_MISSING);
660    }
661
662    #[test]
663    fn test_streaming_sql_error_time_bound() {
664        let t = translate_datafusion_error(
665            "Streaming SQL error: cannot extract time bound from ON clause",
666        );
667        assert_eq!(t.code, codes::JOIN_TIME_BOUND_MISSING);
668    }
669}