Skip to main content

laminar_sql/error/
mod.rs

1//! Error translation layer for DataFusion errors.
2//!
3//! Translates internal DataFusion/Arrow error messages into user-friendly
4//! LaminarDB errors with structured error codes (`LDB-NNNN`) and hints.
5//!
6//! # Error Code Ranges
7//!
8//! | Range | Category |
9//! |-------|----------|
10//! | `LDB-1001`..`LDB-1099` | SQL syntax errors |
11//! | `LDB-1100`..`LDB-1199` | Schema / column errors |
12//! | `LDB-1200`..`LDB-1299` | Type errors |
13//! | `LDB-2000`..`LDB-2099` | Window / watermark errors |
14//! | `LDB-3000`..`LDB-3099` | Join errors |
15//! | `LDB-9000`..`LDB-9099` | Internal errors |
16
17pub mod suggest;
18
19use suggest::{closest_match, resolve_column_name};
20
21/// Structured error code constants.
22pub mod codes {
23    /// Unsupported SQL syntax.
24    pub const UNSUPPORTED_SQL: &str = "LDB-1001";
25    /// Query planning failed.
26    pub const PLANNING_FAILED: &str = "LDB-1002";
27    /// Column not found.
28    pub const COLUMN_NOT_FOUND: &str = "LDB-1100";
29    /// Table or source not found.
30    pub const TABLE_NOT_FOUND: &str = "LDB-1101";
31    /// Type mismatch.
32    pub const TYPE_MISMATCH: &str = "LDB-1200";
33
34    // Window / watermark errors (2000-2099)
35    /// Watermark required for this operation.
36    pub const WATERMARK_REQUIRED: &str = "LDB-2001";
37    /// Invalid window specification.
38    pub const WINDOW_INVALID: &str = "LDB-2002";
39    /// Window size must be positive.
40    pub const WINDOW_SIZE_INVALID: &str = "LDB-2003";
41    /// Late data rejected by window policy.
42    pub const LATE_DATA_REJECTED: &str = "LDB-2004";
43
44    // Join errors (3000-3099)
45    /// Join key column not found or invalid.
46    pub const JOIN_KEY_MISSING: &str = "LDB-3001";
47    /// Time bound required for stream-stream join.
48    pub const JOIN_TIME_BOUND_MISSING: &str = "LDB-3002";
49    /// Temporal join requires a primary key on the right-side table.
50    pub const TEMPORAL_JOIN_NO_PK: &str = "LDB-3003";
51    /// Unsupported join type for streaming queries.
52    pub const JOIN_TYPE_UNSUPPORTED: &str = "LDB-3004";
53
54    /// Internal query error (unrecognized pattern).
55    pub const INTERNAL: &str = "LDB-9000";
56    /// Query execution failed.
57    pub const EXECUTION_FAILED: &str = "LDB-9001";
58}
59
60/// A translated error with structured code, message, and optional hint.
61#[derive(Debug, Clone)]
62pub struct TranslatedError {
63    /// Structured error code (e.g. `"LDB-1100"`).
64    pub code: &'static str,
65    /// User-friendly error message.
66    pub message: String,
67    /// Optional hint for fixing the error.
68    pub hint: Option<String>,
69}
70
71impl std::fmt::Display for TranslatedError {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        write!(f, "[{}] {}", self.code, self.message)?;
74        if let Some(hint) = &self.hint {
75            write!(f, " (hint: {hint})")?;
76        }
77        Ok(())
78    }
79}
80
81/// Suggests a column name correction based on edit distance.
82///
83/// Returns a `"Did you mean '...'?"` string if a close match is found
84/// within 2 edits.
85#[must_use]
86pub fn suggest_column(input: &str, available: &[&str]) -> Option<String> {
87    closest_match(input, available, 2).map(|m| format!("Did you mean '{m}'?"))
88}
89
90// ---------------------------------------------------------------------------
91// Internal helpers
92// ---------------------------------------------------------------------------
93
94/// Strips known DataFusion/Arrow prefixes from an error message.
95fn sanitize(msg: &str) -> &str {
96    const PREFIXES: &[&str] = &[
97        "DataFusion error: ",
98        "Arrow error: ",
99        "Schema error: ",
100        "External error: ",
101    ];
102    let mut s = msg;
103    for prefix in PREFIXES {
104        if let Some(rest) = s.strip_prefix(prefix) {
105            s = rest;
106        }
107    }
108    s
109}
110
111/// Extracts a quoted name from a `"No field named ..."` message.
112fn extract_missing_column(msg: &str) -> Option<&str> {
113    let needle = "No field named ";
114    let idx = msg.find(needle)?;
115    let after = &msg[idx + needle.len()..];
116    extract_quoted(after)
117}
118
119/// Extracts a table name from a `"table '...' not found"` message.
120fn extract_missing_table(msg: &str) -> Option<&str> {
121    // DataFusion uses lowercase "table" in its messages
122    let lower = msg.to_ascii_lowercase();
123    let needle = "table '";
124    let idx = lower.find(needle)?;
125    let after = &msg[idx + needle.len()..];
126    after.find('\'').map(|end| &after[..end])
127}
128
129/// Extracts content from single or double quotes at the start of a string.
130fn extract_quoted(s: &str) -> Option<&str> {
131    if let Some(rest) = s.strip_prefix('\'') {
132        rest.find('\'').map(|end| &rest[..end])
133    } else if let Some(rest) = s.strip_prefix('"') {
134        rest.find('"').map(|end| &rest[..end])
135    } else {
136        // Bare word — up to whitespace or punctuation
137        let end = s.find(|c: char| c.is_whitespace() || c == ',' || c == ')');
138        let word = match end {
139            Some(i) => &s[..i],
140            None => s,
141        };
142        if word.is_empty() {
143            None
144        } else {
145            Some(word)
146        }
147    }
148}
149
150/// Translates a DataFusion error message into a user-friendly [`TranslatedError`].
151///
152/// Pattern-matches known DataFusion error formats and rewrites them with
153/// structured error codes and helpful messages. Unrecognized patterns fall
154/// back to `LDB-9000` with the message sanitized (internal prefixes stripped).
155///
156/// When `available_columns` is provided, column-not-found errors include a
157/// "Did you mean '...'?" hint based on edit distance.
158#[must_use]
159pub fn translate_datafusion_error(msg: &str) -> TranslatedError {
160    translate_datafusion_error_with_context(msg, None)
161}
162
163/// Like [`translate_datafusion_error`] but accepts an optional list of
164/// available column names for typo suggestions.
165#[must_use]
166pub fn translate_datafusion_error_with_context(
167    msg: &str,
168    available_columns: Option<&[&str]>,
169) -> TranslatedError {
170    let clean = sanitize(msg);
171
172    // Column not found
173    if let Some(col) = extract_missing_column(clean) {
174        let hint = available_columns.and_then(|cols| match resolve_column_name(col, cols) {
175            Ok(actual) if actual != col => Some(format!(
176                "Column is named '{actual}' (case differs). \
177                         Use \"{actual}\" or match the exact casing."
178            )),
179            Err(suggest::ColumnResolveError::Ambiguous { matches, .. }) => Some(format!(
180                "Multiple columns match case-insensitively: {}. \
181                         Use double quotes for exact match.",
182                matches.join(", ")
183            )),
184            _ => suggest_column(col, cols),
185        });
186        return TranslatedError {
187            code: codes::COLUMN_NOT_FOUND,
188            message: format!("Column '{col}' not found in query"),
189            hint,
190        };
191    }
192
193    // Table not found
194    if let Some(table) = extract_missing_table(clean) {
195        return TranslatedError {
196            code: codes::TABLE_NOT_FOUND,
197            message: format!("Table or source '{table}' not found"),
198            hint: Some("Use SHOW TABLES to see available sources and tables".to_string()),
199        };
200    }
201
202    // Type mismatch
203    if clean.contains("mismatch")
204        || clean.contains("must match")
205        || clean.contains("cannot be cast")
206    {
207        return TranslatedError {
208            code: codes::TYPE_MISMATCH,
209            message: format!("Type mismatch: {clean}"),
210            hint: Some("Check column types with DESCRIBE <table>".to_string()),
211        };
212    }
213
214    // Window / watermark errors
215    if let Some(translated) = check_window_errors(clean) {
216        return translated;
217    }
218
219    // Join errors
220    if let Some(translated) = check_join_errors(clean) {
221        return translated;
222    }
223
224    // Unsupported / not implemented
225    if clean.contains("Unsupported")
226        || clean.contains("NotImplemented")
227        || clean.contains("This feature is not implemented")
228    {
229        return TranslatedError {
230            code: codes::UNSUPPORTED_SQL,
231            message: format!("Unsupported SQL syntax: {clean}"),
232            hint: None,
233        };
234    }
235
236    // Planning error — Plan("...")
237    if clean.starts_with("Plan(\"") {
238        let detail = clean
239            .strip_prefix("Plan(\"")
240            .and_then(|s| s.strip_suffix("\")"))
241            .unwrap_or(clean);
242        return TranslatedError {
243            code: codes::PLANNING_FAILED,
244            message: format!("Query planning failed: {detail}"),
245            hint: None,
246        };
247    }
248
249    // Planning error — "Error during planning"
250    if clean.contains("Error during planning") {
251        return TranslatedError {
252            code: codes::PLANNING_FAILED,
253            message: format!("Query planning failed: {clean}"),
254            hint: None,
255        };
256    }
257
258    // Execution error
259    if clean.contains("Execution error") {
260        return TranslatedError {
261            code: codes::EXECUTION_FAILED,
262            message: format!("Query execution failed: {clean}"),
263            hint: None,
264        };
265    }
266
267    // Fallback — unknown pattern
268    TranslatedError {
269        code: codes::INTERNAL,
270        message: format!("Internal query error: {clean}"),
271        hint: Some("If this persists, file a bug report".to_string()),
272    }
273}
274
275/// Check for window/watermark-related error patterns.
276fn check_window_errors(clean: &str) -> Option<TranslatedError> {
277    let lower = clean.to_ascii_lowercase();
278
279    // "Window error:" prefix from parser — classify as WINDOW_INVALID
280    if lower.starts_with("window error:") {
281        return Some(TranslatedError {
282            code: codes::WINDOW_INVALID,
283            message: format!("Invalid window specification: {clean}"),
284            hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
285        });
286    }
287
288    if lower.contains("watermark") && (lower.contains("required") || lower.contains("missing")) {
289        return Some(TranslatedError {
290            code: codes::WATERMARK_REQUIRED,
291            message: format!("Watermark required: {clean}"),
292            hint: Some(
293                "Add WATERMARK FOR <column> AS <column> - INTERVAL '<n>' SECOND \
294                 to the CREATE SOURCE statement"
295                    .to_string(),
296            ),
297        });
298    }
299
300    if lower.contains("window") && (lower.contains("invalid") || lower.contains("not supported")) {
301        return Some(TranslatedError {
302            code: codes::WINDOW_INVALID,
303            message: format!("Invalid window specification: {clean}"),
304            hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
305        });
306    }
307
308    if lower.contains("window")
309        && lower.contains("size")
310        && (lower.contains("zero") || lower.contains("negative") || lower.contains("positive"))
311    {
312        return Some(TranslatedError {
313            code: codes::WINDOW_SIZE_INVALID,
314            message: format!("Invalid window size: {clean}"),
315            hint: Some("Window size must be a positive interval".to_string()),
316        });
317    }
318
319    // Late data rejected/dropped
320    if lower.contains("late")
321        && (lower.contains("data") || lower.contains("event"))
322        && (lower.contains("rejected") || lower.contains("dropped"))
323    {
324        return Some(TranslatedError {
325            code: codes::LATE_DATA_REJECTED,
326            message: format!("Late data rejected: {clean}"),
327            hint: Some(
328                "Increase the allowed lateness with ALLOWED LATENESS INTERVAL, \
329                 or route late data to a side output"
330                    .to_string(),
331            ),
332        });
333    }
334
335    None
336}
337
338/// Check for join-related error patterns.
339fn check_join_errors(clean: &str) -> Option<TranslatedError> {
340    let lower = clean.to_ascii_lowercase();
341
342    // "Streaming SQL error:" prefix — classify sub-patterns
343    if lower.starts_with("streaming sql error:") {
344        if lower.contains("using clause requires") {
345            return Some(TranslatedError {
346                code: codes::JOIN_KEY_MISSING,
347                message: format!("Join key error: {clean}"),
348                hint: Some(
349                    "Ensure the USING clause references columns that exist \
350                     in both sides of the join"
351                        .to_string(),
352                ),
353            });
354        }
355        if lower.contains("cannot extract time bound") || lower.contains("tolerance") {
356            return Some(TranslatedError {
357                code: codes::JOIN_TIME_BOUND_MISSING,
358                message: format!("Join time bound required: {clean}"),
359                hint: Some(
360                    "Stream-stream joins require a time bound in the ON clause, e.g.: \
361                     AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
362                        .to_string(),
363                ),
364            });
365        }
366    }
367
368    if lower.contains("join") && lower.contains("key") && lower.contains("not found") {
369        return Some(TranslatedError {
370            code: codes::JOIN_KEY_MISSING,
371            message: format!("Join key error: {clean}"),
372            hint: Some(
373                "Ensure the ON clause references columns that exist \
374                 in both sides of the join"
375                    .to_string(),
376            ),
377        });
378    }
379
380    if lower.contains("join")
381        && (lower.contains("time bound") || lower.contains("interval"))
382        && lower.contains("required")
383    {
384        return Some(TranslatedError {
385            code: codes::JOIN_TIME_BOUND_MISSING,
386            message: format!("Join time bound required: {clean}"),
387            hint: Some(
388                "Stream-stream joins require a time bound in the ON clause, e.g.: \
389                 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
390                    .to_string(),
391            ),
392        });
393    }
394
395    if lower.contains("temporal") && lower.contains("primary key") {
396        return Some(TranslatedError {
397            code: codes::TEMPORAL_JOIN_NO_PK,
398            message: format!("Temporal join error: {clean}"),
399            hint: Some(
400                "The right-side table of a temporal join requires a PRIMARY KEY".to_string(),
401            ),
402        });
403    }
404
405    // Unsupported join types for streaming
406    if (lower.contains("not supported for streaming")
407        || lower.contains("natural join not supported")
408        || lower.contains("cross join not supported")
409        || lower.contains("unsupported join"))
410        && lower.contains("join")
411    {
412        return Some(TranslatedError {
413            code: codes::JOIN_TYPE_UNSUPPORTED,
414            message: format!("Unsupported join type: {clean}"),
415            hint: Some(
416                "Streaming queries support INNER, LEFT, RIGHT, and FULL OUTER joins \
417                 with time bounds. CROSS and NATURAL joins are not supported."
418                    .to_string(),
419            ),
420        });
421    }
422
423    None
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429
430    // -- translate_datafusion_error tests --
431
432    #[test]
433    fn test_column_not_found_single_quotes() {
434        let t = translate_datafusion_error("No field named 'foo'");
435        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
436        assert!(t.message.contains("foo"));
437        assert!(t.message.contains("not found"));
438        assert!(!t.message.contains("DataFusion"));
439    }
440
441    #[test]
442    fn test_column_not_found_double_quotes() {
443        let t = translate_datafusion_error("No field named \"bar\"");
444        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
445        assert!(t.message.contains("bar"));
446    }
447
448    #[test]
449    fn test_column_not_found_with_prefix() {
450        let t = translate_datafusion_error("Schema error: No field named 'baz'");
451        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
452        assert!(t.message.contains("baz"));
453    }
454
455    #[test]
456    fn test_table_not_found() {
457        let t = translate_datafusion_error("table 'orders' not found");
458        assert_eq!(t.code, codes::TABLE_NOT_FOUND);
459        assert!(t.message.contains("orders"));
460        assert!(t.hint.is_some());
461        assert!(t.hint.unwrap().contains("SHOW TABLES"));
462    }
463
464    #[test]
465    fn test_type_mismatch() {
466        let t = translate_datafusion_error("column types must match for UNION");
467        assert_eq!(t.code, codes::TYPE_MISMATCH);
468        assert!(t.hint.is_some());
469        assert!(t.hint.unwrap().contains("DESCRIBE"));
470    }
471
472    #[test]
473    fn test_type_cannot_cast() {
474        let t = translate_datafusion_error("cannot be cast to Int64");
475        assert_eq!(t.code, codes::TYPE_MISMATCH);
476    }
477
478    #[test]
479    fn test_unsupported_sql() {
480        let t = translate_datafusion_error("This feature is not implemented: LATERAL JOIN");
481        assert_eq!(t.code, codes::UNSUPPORTED_SQL);
482        assert!(t.message.contains("LATERAL JOIN"));
483    }
484
485    #[test]
486    fn test_plan_error_with_column_extracts_column() {
487        // When a Plan error wraps a "No field named" message, the more
488        // specific column-not-found code is preferred over generic planning.
489        let t = translate_datafusion_error("Plan(\"No field named 'x' in schema\")");
490        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
491        assert!(t.message.contains("'x'"));
492    }
493
494    #[test]
495    fn test_plan_error_generic() {
496        let t = translate_datafusion_error("Plan(\"aggregate function not found\")");
497        assert_eq!(t.code, codes::PLANNING_FAILED);
498        assert!(t.message.contains("aggregate function not found"));
499    }
500
501    #[test]
502    fn test_error_during_planning() {
503        let t = translate_datafusion_error("Error during planning: ambiguous reference 'id'");
504        assert_eq!(t.code, codes::PLANNING_FAILED);
505    }
506
507    #[test]
508    fn test_execution_error() {
509        let t = translate_datafusion_error("Execution error: division by zero");
510        assert_eq!(t.code, codes::EXECUTION_FAILED);
511    }
512
513    #[test]
514    fn test_unknown_fallback() {
515        let t = translate_datafusion_error("some totally unknown error");
516        assert_eq!(t.code, codes::INTERNAL);
517        assert!(t.message.contains("Internal query error"));
518        assert!(t.hint.is_some());
519    }
520
521    #[test]
522    fn test_prefix_stripping() {
523        let t = translate_datafusion_error("DataFusion error: Arrow error: No field named 'x'");
524        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
525        assert!(t.message.contains("'x'"));
526        assert!(!t.message.contains("DataFusion"));
527        assert!(!t.message.contains("Arrow error"));
528    }
529
530    #[test]
531    fn test_display_with_hint() {
532        let t = translate_datafusion_error("table 'foo' not found");
533        let display = t.to_string();
534        assert!(display.starts_with("[LDB-1101]"));
535        assert!(display.contains("(hint:"));
536    }
537
538    #[test]
539    fn test_display_without_hint() {
540        let t = translate_datafusion_error("Execution error: oops");
541        let display = t.to_string();
542        assert!(display.starts_with("[LDB-9001]"));
543        assert!(!display.contains("hint"));
544    }
545
546    // -- suggest_column tests --
547
548    #[test]
549    fn test_suggest_column_found() {
550        let result = suggest_column("user_ie", &["user_id", "email"]);
551        assert_eq!(result, Some("Did you mean 'user_id'?".to_string()));
552    }
553
554    #[test]
555    fn test_suggest_column_none() {
556        let result = suggest_column("xyz", &["user_id", "email"]);
557        assert_eq!(result, None);
558    }
559
560    // -- translate_datafusion_error_with_context tests --
561
562    #[test]
563    fn test_column_not_found_with_suggestion() {
564        let cols = &["user_id", "email", "price"];
565        let t = translate_datafusion_error_with_context("No field named 'user_ie'", Some(cols));
566        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
567        assert!(t.message.contains("user_ie"));
568        assert!(t.hint.is_some());
569        assert!(
570            t.hint.as_ref().unwrap().contains("user_id"),
571            "hint should suggest 'user_id': {:?}",
572            t.hint
573        );
574    }
575
576    #[test]
577    fn test_column_not_found_no_close_match() {
578        let cols = &["user_id", "email"];
579        let t = translate_datafusion_error_with_context("No field named 'zzzzz'", Some(cols));
580        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
581        assert!(t.hint.is_none());
582    }
583
584    #[test]
585    fn test_column_not_found_without_context() {
586        let t = translate_datafusion_error("No field named 'foo'");
587        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
588        assert!(t.hint.is_none()); // no columns provided
589    }
590
591    #[test]
592    fn test_column_not_found_case_insensitive_hint() {
593        let cols = &["tradeId", "symbol", "lastPrice"];
594        let t = translate_datafusion_error_with_context("No field named 'tradeid'", Some(cols));
595        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
596        assert!(t.hint.is_some());
597        let hint = t.hint.unwrap();
598        assert!(
599            hint.contains("tradeId"),
600            "hint should mention actual name: {hint}"
601        );
602        assert!(hint.contains("case"), "hint should mention case: {hint}");
603    }
604
605    #[test]
606    fn test_column_not_found_ambiguous_case_hint() {
607        let cols = &["price", "Price", "PRICE"];
608        let t = translate_datafusion_error_with_context("No field named 'pRiCe'", Some(cols));
609        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
610        assert!(t.hint.is_some());
611        let hint = t.hint.unwrap();
612        assert!(
613            hint.contains("Multiple"),
614            "hint should mention ambiguity: {hint}"
615        );
616    }
617
618    // -- window error code tests --
619
620    #[test]
621    fn test_watermark_required_error() {
622        let t = translate_datafusion_error("Watermark required for EMIT ON WINDOW CLOSE");
623        assert_eq!(t.code, codes::WATERMARK_REQUIRED);
624        assert!(t.hint.is_some());
625        assert!(t.hint.unwrap().contains("WATERMARK FOR"));
626    }
627
628    #[test]
629    fn test_window_invalid_error() {
630        let t = translate_datafusion_error("Window type not supported for this operation");
631        assert_eq!(t.code, codes::WINDOW_INVALID);
632        assert!(t.hint.unwrap().contains("TUMBLE"));
633    }
634
635    // -- join error code tests --
636
637    #[test]
638    fn test_join_key_not_found_error() {
639        let t = translate_datafusion_error("Join key 'user_id' not found in right table");
640        assert_eq!(t.code, codes::JOIN_KEY_MISSING);
641        assert!(t.hint.unwrap().contains("ON clause"));
642    }
643
644    #[test]
645    fn test_temporal_join_pk_error() {
646        let t = translate_datafusion_error("Temporal join requires a primary key on right table");
647        assert_eq!(t.code, codes::TEMPORAL_JOIN_NO_PK);
648        assert!(t.hint.unwrap().contains("PRIMARY KEY"));
649    }
650
651    // -- LDB-2004 LATE_DATA_REJECTED tests --
652
653    #[test]
654    fn test_late_data_rejected() {
655        let t = translate_datafusion_error("late data rejected by window policy");
656        assert_eq!(t.code, codes::LATE_DATA_REJECTED);
657        assert!(t.hint.unwrap().contains("lateness"));
658    }
659
660    #[test]
661    fn test_late_event_dropped() {
662        let t = translate_datafusion_error("late event dropped after window close");
663        assert_eq!(t.code, codes::LATE_DATA_REJECTED);
664    }
665
666    // -- "Window error:" prefix test --
667
668    #[test]
669    fn test_window_error_prefix() {
670        let t = translate_datafusion_error("Window error: CUMULATE requires step <= size");
671        assert_eq!(t.code, codes::WINDOW_INVALID);
672        assert!(t.hint.unwrap().contains("CUMULATE"));
673    }
674
675    // -- LDB-3004 JOIN_TYPE_UNSUPPORTED tests --
676
677    #[test]
678    fn test_join_type_unsupported_cross() {
679        let t = translate_datafusion_error("cross join not supported for streaming queries");
680        assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
681        assert!(t.hint.unwrap().contains("CROSS"));
682    }
683
684    #[test]
685    fn test_join_type_unsupported_natural() {
686        let t = translate_datafusion_error("natural join not supported in streaming context");
687        assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
688    }
689
690    // -- "Streaming SQL error:" prefix tests --
691
692    #[test]
693    fn test_streaming_sql_error_using_clause() {
694        let t = translate_datafusion_error(
695            "Streaming SQL error: using clause requires matching columns",
696        );
697        assert_eq!(t.code, codes::JOIN_KEY_MISSING);
698    }
699
700    #[test]
701    fn test_streaming_sql_error_time_bound() {
702        let t = translate_datafusion_error(
703            "Streaming SQL error: cannot extract time bound from ON clause",
704        );
705        assert_eq!(t.code, codes::JOIN_TIME_BOUND_MISSING);
706    }
707}