Skip to main content

laminar_sql/error/
mod.rs

1//! Error translation layer for DataFusion errors.
2//!
3//! Translates internal DataFusion/Arrow error messages into user-friendly
4//! LaminarDB errors with structured error codes (`LDB-NNNN`) and hints.
5//!
6//! # Error Code Ranges
7//!
8//! | Range | Category |
9//! |-------|----------|
10//! | `LDB-1001`..`LDB-1099` | SQL syntax errors |
11//! | `LDB-1100`..`LDB-1199` | Schema / column errors |
12//! | `LDB-1200`..`LDB-1299` | Type errors |
13//! | `LDB-2000`..`LDB-2099` | Window / watermark errors |
14//! | `LDB-3000`..`LDB-3099` | Join errors |
15//! | `LDB-9000`..`LDB-9099` | Internal errors |
16
17pub mod suggest;
18
19use suggest::{closest_match, resolve_column_name};
20
21/// Structured error code constants.
22///
23/// Codes in the `LDB-1xxx`..`LDB-3xxx` ranges are re-exported from the
24/// canonical registry in `laminar_core::error_codes`. The `LDB-9xxx` codes
25/// are SQL-layer specific and defined here.
26pub mod codes {
27    // Re-export LDB-1xxx (SQL parsing & validation) from laminar-core.
28    pub use laminar_core::error_codes::SQL_COLUMN_NOT_FOUND as COLUMN_NOT_FOUND;
29    pub use laminar_core::error_codes::SQL_PLANNING_FAILED as PLANNING_FAILED;
30    pub use laminar_core::error_codes::SQL_TABLE_NOT_FOUND as TABLE_NOT_FOUND;
31    pub use laminar_core::error_codes::SQL_TYPE_MISMATCH as TYPE_MISMATCH;
32    pub use laminar_core::error_codes::SQL_UNSUPPORTED as UNSUPPORTED_SQL;
33
34    // Re-export LDB-2xxx (window / watermark) from laminar-core.
35    pub use laminar_core::error_codes::LATE_DATA_REJECTED;
36    pub use laminar_core::error_codes::WATERMARK_REQUIRED;
37    pub use laminar_core::error_codes::WINDOW_INVALID;
38    pub use laminar_core::error_codes::WINDOW_SIZE_INVALID;
39
40    // Re-export LDB-3xxx (join) from laminar-core.
41    pub use laminar_core::error_codes::JOIN_KEY_MISSING;
42    pub use laminar_core::error_codes::JOIN_TIME_BOUND_MISSING;
43    pub use laminar_core::error_codes::JOIN_TYPE_UNSUPPORTED;
44    pub use laminar_core::error_codes::TEMPORAL_JOIN_NO_PK;
45
46    /// Internal query error (unrecognized pattern).
47    pub const INTERNAL: &str = "LDB-9000";
48    /// Query execution failed.
49    pub const EXECUTION_FAILED: &str = "LDB-9001";
50}
51
52/// A translated error with structured code, message, and optional hint.
53#[derive(Debug, Clone)]
54pub struct TranslatedError {
55    /// Structured error code (e.g. `"LDB-1100"`).
56    pub code: &'static str,
57    /// User-friendly error message.
58    pub message: String,
59    /// Optional hint for fixing the error.
60    pub hint: Option<String>,
61}
62
63impl std::fmt::Display for TranslatedError {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        write!(f, "[{}] {}", self.code, self.message)?;
66        if let Some(hint) = &self.hint {
67            write!(f, " (hint: {hint})")?;
68        }
69        Ok(())
70    }
71}
72
73/// Suggests a column name correction based on edit distance.
74///
75/// Returns a `"Did you mean '...'?"` string if a close match is found
76/// within 2 edits.
77#[must_use]
78pub fn suggest_column(input: &str, available: &[&str]) -> Option<String> {
79    closest_match(input, available, 2).map(|m| format!("Did you mean '{m}'?"))
80}
81
82// ---------------------------------------------------------------------------
83// Internal helpers
84// ---------------------------------------------------------------------------
85
86/// Strips known DataFusion/Arrow prefixes from an error message.
87fn sanitize(msg: &str) -> &str {
88    const PREFIXES: &[&str] = &[
89        "DataFusion error: ",
90        "Arrow error: ",
91        "Schema error: ",
92        "External error: ",
93    ];
94    let mut s = msg;
95    for prefix in PREFIXES {
96        if let Some(rest) = s.strip_prefix(prefix) {
97            s = rest;
98        }
99    }
100    s
101}
102
103/// Extracts a quoted name from a `"No field named ..."` message.
104fn extract_missing_column(msg: &str) -> Option<&str> {
105    let needle = "No field named ";
106    let idx = msg.find(needle)?;
107    let after = &msg[idx + needle.len()..];
108    extract_quoted(after)
109}
110
111/// Extracts a table name from a `"table '...' not found"` message.
112fn extract_missing_table(msg: &str) -> Option<&str> {
113    // DataFusion uses lowercase "table" in its messages
114    let lower = msg.to_ascii_lowercase();
115    let needle = "table '";
116    let idx = lower.find(needle)?;
117    let after = &msg[idx + needle.len()..];
118    after.find('\'').map(|end| &after[..end])
119}
120
121/// Extracts content from single or double quotes at the start of a string.
122fn extract_quoted(s: &str) -> Option<&str> {
123    if let Some(rest) = s.strip_prefix('\'') {
124        rest.find('\'').map(|end| &rest[..end])
125    } else if let Some(rest) = s.strip_prefix('"') {
126        rest.find('"').map(|end| &rest[..end])
127    } else {
128        // Bare word — up to whitespace or punctuation
129        let end = s.find(|c: char| c.is_whitespace() || c == ',' || c == ')');
130        let word = match end {
131            Some(i) => &s[..i],
132            None => s,
133        };
134        if word.is_empty() {
135            None
136        } else {
137            Some(word)
138        }
139    }
140}
141
142/// Translates a DataFusion error message into a user-friendly [`TranslatedError`].
143///
144/// Pattern-matches known DataFusion error formats and rewrites them with
145/// structured error codes and helpful messages. Unrecognized patterns fall
146/// back to `LDB-9000` with the message sanitized (internal prefixes stripped).
147///
148/// When `available_columns` is provided, column-not-found errors include a
149/// "Did you mean '...'?" hint based on edit distance.
150#[must_use]
151pub fn translate_datafusion_error(msg: &str) -> TranslatedError {
152    translate_datafusion_error_with_context(msg, None)
153}
154
155/// Like [`translate_datafusion_error`] but accepts an optional list of
156/// available column names for typo suggestions.
157#[must_use]
158pub fn translate_datafusion_error_with_context(
159    msg: &str,
160    available_columns: Option<&[&str]>,
161) -> TranslatedError {
162    let clean = sanitize(msg);
163
164    // Column not found
165    if let Some(col) = extract_missing_column(clean) {
166        let hint = available_columns.and_then(|cols| match resolve_column_name(col, cols) {
167            Ok(actual) if actual != col => Some(format!(
168                "Column is named '{actual}' (case differs). \
169                         Use \"{actual}\" or match the exact casing."
170            )),
171            Err(suggest::ColumnResolveError::Ambiguous { matches, .. }) => Some(format!(
172                "Multiple columns match case-insensitively: {}. \
173                         Use double quotes for exact match.",
174                matches.join(", ")
175            )),
176            _ => suggest_column(col, cols),
177        });
178        return TranslatedError {
179            code: codes::COLUMN_NOT_FOUND,
180            message: format!("Column '{col}' not found in query"),
181            hint,
182        };
183    }
184
185    // Table not found
186    if let Some(table) = extract_missing_table(clean) {
187        return TranslatedError {
188            code: codes::TABLE_NOT_FOUND,
189            message: format!("Table or source '{table}' not found"),
190            hint: Some("Use SHOW TABLES to see available sources and tables".to_string()),
191        };
192    }
193
194    // Type mismatch
195    if clean.contains("mismatch")
196        || clean.contains("must match")
197        || clean.contains("cannot be cast")
198    {
199        return TranslatedError {
200            code: codes::TYPE_MISMATCH,
201            message: format!("Type mismatch: {clean}"),
202            hint: Some("Check column types with DESCRIBE <table>".to_string()),
203        };
204    }
205
206    // Window / watermark errors
207    if let Some(translated) = check_window_errors(clean) {
208        return translated;
209    }
210
211    // Join errors
212    if let Some(translated) = check_join_errors(clean) {
213        return translated;
214    }
215
216    // Unsupported / not implemented
217    if clean.contains("Unsupported")
218        || clean.contains("NotImplemented")
219        || clean.contains("This feature is not implemented")
220    {
221        return TranslatedError {
222            code: codes::UNSUPPORTED_SQL,
223            message: format!("Unsupported SQL syntax: {clean}"),
224            hint: None,
225        };
226    }
227
228    // Planning error — Plan("...")
229    if clean.starts_with("Plan(\"") {
230        let detail = clean
231            .strip_prefix("Plan(\"")
232            .and_then(|s| s.strip_suffix("\")"))
233            .unwrap_or(clean);
234        return TranslatedError {
235            code: codes::PLANNING_FAILED,
236            message: format!("Query planning failed: {detail}"),
237            hint: None,
238        };
239    }
240
241    // Planning error — "Error during planning"
242    if clean.contains("Error during planning") {
243        return TranslatedError {
244            code: codes::PLANNING_FAILED,
245            message: format!("Query planning failed: {clean}"),
246            hint: None,
247        };
248    }
249
250    // Execution error
251    if clean.contains("Execution error") {
252        return TranslatedError {
253            code: codes::EXECUTION_FAILED,
254            message: format!("Query execution failed: {clean}"),
255            hint: None,
256        };
257    }
258
259    // Fallback — unknown pattern
260    TranslatedError {
261        code: codes::INTERNAL,
262        message: format!("Internal query error: {clean}"),
263        hint: Some("If this persists, file a bug report".to_string()),
264    }
265}
266
267/// Check for window/watermark-related error patterns.
268fn check_window_errors(clean: &str) -> Option<TranslatedError> {
269    let lower = clean.to_ascii_lowercase();
270
271    // "Window error:" prefix from parser — classify as WINDOW_INVALID
272    if lower.starts_with("window error:") {
273        return Some(TranslatedError {
274            code: codes::WINDOW_INVALID,
275            message: format!("Invalid window specification: {clean}"),
276            hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
277        });
278    }
279
280    if lower.contains("watermark") && (lower.contains("required") || lower.contains("missing")) {
281        return Some(TranslatedError {
282            code: codes::WATERMARK_REQUIRED,
283            message: format!("Watermark required: {clean}"),
284            hint: Some(
285                "Add WATERMARK FOR <column> AS <column> - INTERVAL '<n>' SECOND \
286                 to the CREATE SOURCE statement"
287                    .to_string(),
288            ),
289        });
290    }
291
292    if lower.contains("window") && (lower.contains("invalid") || lower.contains("not supported")) {
293        return Some(TranslatedError {
294            code: codes::WINDOW_INVALID,
295            message: format!("Invalid window specification: {clean}"),
296            hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
297        });
298    }
299
300    if lower.contains("window")
301        && lower.contains("size")
302        && (lower.contains("zero") || lower.contains("negative") || lower.contains("positive"))
303    {
304        return Some(TranslatedError {
305            code: codes::WINDOW_SIZE_INVALID,
306            message: format!("Invalid window size: {clean}"),
307            hint: Some("Window size must be a positive interval".to_string()),
308        });
309    }
310
311    // Late data rejected/dropped
312    if lower.contains("late")
313        && (lower.contains("data") || lower.contains("event"))
314        && (lower.contains("rejected") || lower.contains("dropped"))
315    {
316        return Some(TranslatedError {
317            code: codes::LATE_DATA_REJECTED,
318            message: format!("Late data rejected: {clean}"),
319            hint: Some(
320                "Increase the allowed lateness with ALLOWED LATENESS INTERVAL, \
321                 or route late data to a side output"
322                    .to_string(),
323            ),
324        });
325    }
326
327    None
328}
329
330/// Check for join-related error patterns.
331fn check_join_errors(clean: &str) -> Option<TranslatedError> {
332    let lower = clean.to_ascii_lowercase();
333
334    // "Streaming SQL error:" prefix — classify sub-patterns
335    if lower.starts_with("streaming sql error:") {
336        if lower.contains("using clause requires") {
337            return Some(TranslatedError {
338                code: codes::JOIN_KEY_MISSING,
339                message: format!("Join key error: {clean}"),
340                hint: Some(
341                    "Ensure the USING clause references columns that exist \
342                     in both sides of the join"
343                        .to_string(),
344                ),
345            });
346        }
347        if lower.contains("cannot extract time bound") || lower.contains("tolerance") {
348            return Some(TranslatedError {
349                code: codes::JOIN_TIME_BOUND_MISSING,
350                message: format!("Join time bound required: {clean}"),
351                hint: Some(
352                    "Stream-stream joins require a time bound in the ON clause, e.g.: \
353                     AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
354                        .to_string(),
355                ),
356            });
357        }
358    }
359
360    if lower.contains("join") && lower.contains("key") && lower.contains("not found") {
361        return Some(TranslatedError {
362            code: codes::JOIN_KEY_MISSING,
363            message: format!("Join key error: {clean}"),
364            hint: Some(
365                "Ensure the ON clause references columns that exist \
366                 in both sides of the join"
367                    .to_string(),
368            ),
369        });
370    }
371
372    if lower.contains("join")
373        && (lower.contains("time bound") || lower.contains("interval"))
374        && lower.contains("required")
375    {
376        return Some(TranslatedError {
377            code: codes::JOIN_TIME_BOUND_MISSING,
378            message: format!("Join time bound required: {clean}"),
379            hint: Some(
380                "Stream-stream joins require a time bound in the ON clause, e.g.: \
381                 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
382                    .to_string(),
383            ),
384        });
385    }
386
387    if lower.contains("temporal") && lower.contains("primary key") {
388        return Some(TranslatedError {
389            code: codes::TEMPORAL_JOIN_NO_PK,
390            message: format!("Temporal join error: {clean}"),
391            hint: Some(
392                "The right-side table of a temporal join requires a PRIMARY KEY".to_string(),
393            ),
394        });
395    }
396
397    // Unsupported join types for streaming
398    if (lower.contains("not supported for streaming")
399        || lower.contains("natural join not supported")
400        || lower.contains("cross join not supported")
401        || lower.contains("unsupported join"))
402        && lower.contains("join")
403    {
404        return Some(TranslatedError {
405            code: codes::JOIN_TYPE_UNSUPPORTED,
406            message: format!("Unsupported join type: {clean}"),
407            hint: Some(
408                "Streaming queries support INNER, LEFT, RIGHT, and FULL OUTER joins \
409                 with time bounds. CROSS and NATURAL joins are not supported."
410                    .to_string(),
411            ),
412        });
413    }
414
415    None
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421
422    // -- translate_datafusion_error tests --
423
424    #[test]
425    fn test_column_not_found_single_quotes() {
426        let t = translate_datafusion_error("No field named 'foo'");
427        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
428        assert!(t.message.contains("foo"));
429        assert!(t.message.contains("not found"));
430        assert!(!t.message.contains("DataFusion"));
431    }
432
433    #[test]
434    fn test_column_not_found_double_quotes() {
435        let t = translate_datafusion_error("No field named \"bar\"");
436        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
437        assert!(t.message.contains("bar"));
438    }
439
440    #[test]
441    fn test_column_not_found_with_prefix() {
442        let t = translate_datafusion_error("Schema error: No field named 'baz'");
443        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
444        assert!(t.message.contains("baz"));
445    }
446
447    #[test]
448    fn test_table_not_found() {
449        let t = translate_datafusion_error("table 'orders' not found");
450        assert_eq!(t.code, codes::TABLE_NOT_FOUND);
451        assert!(t.message.contains("orders"));
452        assert!(t.hint.is_some());
453        assert!(t.hint.unwrap().contains("SHOW TABLES"));
454    }
455
456    #[test]
457    fn test_type_mismatch() {
458        let t = translate_datafusion_error("column types must match for UNION");
459        assert_eq!(t.code, codes::TYPE_MISMATCH);
460        assert!(t.hint.is_some());
461        assert!(t.hint.unwrap().contains("DESCRIBE"));
462    }
463
464    #[test]
465    fn test_type_cannot_cast() {
466        let t = translate_datafusion_error("cannot be cast to Int64");
467        assert_eq!(t.code, codes::TYPE_MISMATCH);
468    }
469
470    #[test]
471    fn test_unsupported_sql() {
472        let t = translate_datafusion_error("This feature is not implemented: LATERAL JOIN");
473        assert_eq!(t.code, codes::UNSUPPORTED_SQL);
474        assert!(t.message.contains("LATERAL JOIN"));
475    }
476
477    #[test]
478    fn test_plan_error_with_column_extracts_column() {
479        // When a Plan error wraps a "No field named" message, the more
480        // specific column-not-found code is preferred over generic planning.
481        let t = translate_datafusion_error("Plan(\"No field named 'x' in schema\")");
482        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
483        assert!(t.message.contains("'x'"));
484    }
485
486    #[test]
487    fn test_plan_error_generic() {
488        let t = translate_datafusion_error("Plan(\"aggregate function not found\")");
489        assert_eq!(t.code, codes::PLANNING_FAILED);
490        assert!(t.message.contains("aggregate function not found"));
491    }
492
493    #[test]
494    fn test_error_during_planning() {
495        let t = translate_datafusion_error("Error during planning: ambiguous reference 'id'");
496        assert_eq!(t.code, codes::PLANNING_FAILED);
497    }
498
499    #[test]
500    fn test_execution_error() {
501        let t = translate_datafusion_error("Execution error: division by zero");
502        assert_eq!(t.code, codes::EXECUTION_FAILED);
503    }
504
505    #[test]
506    fn test_unknown_fallback() {
507        let t = translate_datafusion_error("some totally unknown error");
508        assert_eq!(t.code, codes::INTERNAL);
509        assert!(t.message.contains("Internal query error"));
510        assert!(t.hint.is_some());
511    }
512
513    #[test]
514    fn test_prefix_stripping() {
515        let t = translate_datafusion_error("DataFusion error: Arrow error: No field named 'x'");
516        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
517        assert!(t.message.contains("'x'"));
518        assert!(!t.message.contains("DataFusion"));
519        assert!(!t.message.contains("Arrow error"));
520    }
521
522    #[test]
523    fn test_display_with_hint() {
524        let t = translate_datafusion_error("table 'foo' not found");
525        let display = t.to_string();
526        assert!(display.starts_with("[LDB-1101]"));
527        assert!(display.contains("(hint:"));
528    }
529
530    #[test]
531    fn test_display_without_hint() {
532        let t = translate_datafusion_error("Execution error: oops");
533        let display = t.to_string();
534        assert!(display.starts_with("[LDB-9001]"));
535        assert!(!display.contains("hint"));
536    }
537
538    // -- suggest_column tests --
539
540    #[test]
541    fn test_suggest_column_found() {
542        let result = suggest_column("user_ie", &["user_id", "email"]);
543        assert_eq!(result, Some("Did you mean 'user_id'?".to_string()));
544    }
545
546    #[test]
547    fn test_suggest_column_none() {
548        let result = suggest_column("xyz", &["user_id", "email"]);
549        assert_eq!(result, None);
550    }
551
552    // -- translate_datafusion_error_with_context tests --
553
554    #[test]
555    fn test_column_not_found_with_suggestion() {
556        let cols = &["user_id", "email", "price"];
557        let t = translate_datafusion_error_with_context("No field named 'user_ie'", Some(cols));
558        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
559        assert!(t.message.contains("user_ie"));
560        assert!(t.hint.is_some());
561        assert!(
562            t.hint.as_ref().unwrap().contains("user_id"),
563            "hint should suggest 'user_id': {:?}",
564            t.hint
565        );
566    }
567
568    #[test]
569    fn test_column_not_found_no_close_match() {
570        let cols = &["user_id", "email"];
571        let t = translate_datafusion_error_with_context("No field named 'zzzzz'", Some(cols));
572        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
573        assert!(t.hint.is_none());
574    }
575
576    #[test]
577    fn test_column_not_found_without_context() {
578        let t = translate_datafusion_error("No field named 'foo'");
579        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
580        assert!(t.hint.is_none()); // no columns provided
581    }
582
583    #[test]
584    fn test_column_not_found_case_insensitive_hint() {
585        let cols = &["tradeId", "symbol", "lastPrice"];
586        let t = translate_datafusion_error_with_context("No field named 'tradeid'", Some(cols));
587        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
588        assert!(t.hint.is_some());
589        let hint = t.hint.unwrap();
590        assert!(
591            hint.contains("tradeId"),
592            "hint should mention actual name: {hint}"
593        );
594        assert!(hint.contains("case"), "hint should mention case: {hint}");
595    }
596
597    #[test]
598    fn test_column_not_found_ambiguous_case_hint() {
599        let cols = &["price", "Price", "PRICE"];
600        let t = translate_datafusion_error_with_context("No field named 'pRiCe'", Some(cols));
601        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
602        assert!(t.hint.is_some());
603        let hint = t.hint.unwrap();
604        assert!(
605            hint.contains("Multiple"),
606            "hint should mention ambiguity: {hint}"
607        );
608    }
609
610    // -- window error code tests --
611
612    #[test]
613    fn test_watermark_required_error() {
614        let t = translate_datafusion_error("Watermark required for EMIT ON WINDOW CLOSE");
615        assert_eq!(t.code, codes::WATERMARK_REQUIRED);
616        assert!(t.hint.is_some());
617        assert!(t.hint.unwrap().contains("WATERMARK FOR"));
618    }
619
620    #[test]
621    fn test_window_invalid_error() {
622        let t = translate_datafusion_error("Window type not supported for this operation");
623        assert_eq!(t.code, codes::WINDOW_INVALID);
624        assert!(t.hint.unwrap().contains("TUMBLE"));
625    }
626
627    // -- join error code tests --
628
629    #[test]
630    fn test_join_key_not_found_error() {
631        let t = translate_datafusion_error("Join key 'user_id' not found in right table");
632        assert_eq!(t.code, codes::JOIN_KEY_MISSING);
633        assert!(t.hint.unwrap().contains("ON clause"));
634    }
635
636    #[test]
637    fn test_temporal_join_pk_error() {
638        let t = translate_datafusion_error("Temporal join requires a primary key on right table");
639        assert_eq!(t.code, codes::TEMPORAL_JOIN_NO_PK);
640        assert!(t.hint.unwrap().contains("PRIMARY KEY"));
641    }
642
643    // -- LDB-2004 LATE_DATA_REJECTED tests --
644
645    #[test]
646    fn test_late_data_rejected() {
647        let t = translate_datafusion_error("late data rejected by window policy");
648        assert_eq!(t.code, codes::LATE_DATA_REJECTED);
649        assert!(t.hint.unwrap().contains("lateness"));
650    }
651
652    #[test]
653    fn test_late_event_dropped() {
654        let t = translate_datafusion_error("late event dropped after window close");
655        assert_eq!(t.code, codes::LATE_DATA_REJECTED);
656    }
657
658    // -- "Window error:" prefix test --
659
660    #[test]
661    fn test_window_error_prefix() {
662        let t = translate_datafusion_error("Window error: CUMULATE requires step <= size");
663        assert_eq!(t.code, codes::WINDOW_INVALID);
664        assert!(t.hint.unwrap().contains("CUMULATE"));
665    }
666
667    // -- LDB-3004 JOIN_TYPE_UNSUPPORTED tests --
668
669    #[test]
670    fn test_join_type_unsupported_cross() {
671        let t = translate_datafusion_error("cross join not supported for streaming queries");
672        assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
673        assert!(t.hint.unwrap().contains("CROSS"));
674    }
675
676    #[test]
677    fn test_join_type_unsupported_natural() {
678        let t = translate_datafusion_error("natural join not supported in streaming context");
679        assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
680    }
681
682    // -- "Streaming SQL error:" prefix tests --
683
684    #[test]
685    fn test_streaming_sql_error_using_clause() {
686        let t = translate_datafusion_error(
687            "Streaming SQL error: using clause requires matching columns",
688        );
689        assert_eq!(t.code, codes::JOIN_KEY_MISSING);
690    }
691
692    #[test]
693    fn test_streaming_sql_error_time_bound() {
694        let t = translate_datafusion_error(
695            "Streaming SQL error: cannot extract time bound from ON clause",
696        );
697        assert_eq!(t.code, codes::JOIN_TIME_BOUND_MISSING);
698    }
699}