Skip to main content

laminar_sql/error/
mod.rs

1//! Error translation layer for DataFusion errors.
2//!
3//! Translates internal DataFusion/Arrow error messages into user-friendly
4//! LaminarDB errors with structured error codes (`LDB-NNNN`) and hints.
5//!
6//! # Error Code Ranges
7//!
8//! | Range | Category |
9//! |-------|----------|
10//! | `LDB-1001`..`LDB-1099` | SQL syntax errors |
11//! | `LDB-1100`..`LDB-1199` | Schema / column errors |
12//! | `LDB-1200`..`LDB-1299` | Type errors |
13//! | `LDB-2000`..`LDB-2099` | Window / watermark errors |
14//! | `LDB-3000`..`LDB-3099` | Join errors |
15//! | `LDB-9000`..`LDB-9099` | Internal errors |
16
17pub mod suggest;
18
19use suggest::{closest_match, resolve_column_name};
20
21/// Structured error code constants.
22///
23/// Codes in the `LDB-1xxx`..`LDB-3xxx` ranges are re-exported from the
24/// canonical registry in `laminar_core::error_codes`. The `LDB-9xxx` codes
25/// are SQL-layer specific and defined here.
26pub mod codes {
27    // Re-export LDB-1xxx (SQL parsing & validation) from laminar-core.
28    pub use laminar_core::error_codes::SQL_COLUMN_NOT_FOUND as COLUMN_NOT_FOUND;
29    pub use laminar_core::error_codes::SQL_PLANNING_FAILED as PLANNING_FAILED;
30    pub use laminar_core::error_codes::SQL_TABLE_NOT_FOUND as TABLE_NOT_FOUND;
31    pub use laminar_core::error_codes::SQL_TYPE_MISMATCH as TYPE_MISMATCH;
32    pub use laminar_core::error_codes::SQL_UNSUPPORTED as UNSUPPORTED_SQL;
33
34    // Re-export LDB-2xxx (window / watermark) from laminar-core.
35    pub use laminar_core::error_codes::LATE_DATA_REJECTED;
36    pub use laminar_core::error_codes::WATERMARK_REQUIRED;
37    pub use laminar_core::error_codes::WINDOW_INVALID;
38    pub use laminar_core::error_codes::WINDOW_SIZE_INVALID;
39
40    // Re-export LDB-3xxx (join) from laminar-core.
41    pub use laminar_core::error_codes::JOIN_KEY_MISSING;
42    pub use laminar_core::error_codes::JOIN_TIME_BOUND_MISSING;
43    pub use laminar_core::error_codes::JOIN_TYPE_UNSUPPORTED;
44    pub use laminar_core::error_codes::TEMPORAL_JOIN_NO_PK;
45
46    /// Internal query error (unrecognized pattern).
47    pub const INTERNAL: &str = "LDB-9000";
48    /// Query execution failed.
49    pub const EXECUTION_FAILED: &str = "LDB-9001";
50}
51
52/// A translated error with structured code, message, and optional hint.
53#[derive(Debug, Clone)]
54pub struct TranslatedError {
55    /// Structured error code (e.g. `"LDB-1100"`).
56    pub code: &'static str,
57    /// User-friendly error message.
58    pub message: String,
59    /// Optional hint for fixing the error.
60    pub hint: Option<String>,
61}
62
63impl std::fmt::Display for TranslatedError {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        write!(f, "[{}] {}", self.code, self.message)?;
66        if let Some(hint) = &self.hint {
67            write!(f, " (hint: {hint})")?;
68        }
69        Ok(())
70    }
71}
72
73/// Suggests a column name correction based on edit distance.
74///
75/// Returns a `"Did you mean '...'?"` string if a close match is found
76/// within 2 edits.
77#[must_use]
78pub fn suggest_column(input: &str, available: &[&str]) -> Option<String> {
79    closest_match(input, available, 2).map(|m| format!("Did you mean '{m}'?"))
80}
81
82// ---------------------------------------------------------------------------
83// Internal helpers
84// ---------------------------------------------------------------------------
85
86/// Strips known DataFusion/Arrow prefixes from an error message.
87fn sanitize(msg: &str) -> &str {
88    const PREFIXES: &[&str] = &[
89        "DataFusion error: ",
90        "Arrow error: ",
91        "Schema error: ",
92        "External error: ",
93    ];
94    let mut s = msg;
95    for prefix in PREFIXES {
96        if let Some(rest) = s.strip_prefix(prefix) {
97            s = rest;
98        }
99    }
100    s
101}
102
103/// Extracts a quoted name from a `"No field named ..."` message.
104fn extract_missing_column(msg: &str) -> Option<&str> {
105    let needle = "No field named ";
106    let idx = msg.find(needle)?;
107    let after = &msg[idx + needle.len()..];
108    extract_quoted(after)
109}
110
111/// Extracts a table name from a `"table '...' not found"` message.
112fn extract_missing_table(msg: &str) -> Option<&str> {
113    // DataFusion uses lowercase "table" in its messages
114    let lower = msg.to_ascii_lowercase();
115    let needle = "table '";
116    let idx = lower.find(needle)?;
117    let after = &msg[idx + needle.len()..];
118    after.find('\'').map(|end| &after[..end])
119}
120
121/// Extracts content from single or double quotes at the start of a string.
122fn extract_quoted(s: &str) -> Option<&str> {
123    if let Some(rest) = s.strip_prefix('\'') {
124        rest.find('\'').map(|end| &rest[..end])
125    } else if let Some(rest) = s.strip_prefix('"') {
126        rest.find('"').map(|end| &rest[..end])
127    } else {
128        // Bare word — up to whitespace or punctuation, then strip
129        // sentence-ending period (DataFusion 52.x: "No field named col.")
130        let end = s.find(|c: char| c.is_whitespace() || c == ',' || c == ')');
131        let word = match end {
132            Some(i) => &s[..i],
133            None => s,
134        };
135        let word = word.strip_suffix('.').unwrap_or(word);
136        if word.is_empty() {
137            None
138        } else {
139            Some(word)
140        }
141    }
142}
143
144/// Translates a DataFusion error message into a user-friendly [`TranslatedError`].
145///
146/// Pattern-matches known DataFusion error formats and rewrites them with
147/// structured error codes and helpful messages. Unrecognized patterns fall
148/// back to `LDB-9000` with the message sanitized (internal prefixes stripped).
149///
150/// When `available_columns` is provided, column-not-found errors include a
151/// "Did you mean '...'?" hint based on edit distance.
152#[must_use]
153pub fn translate_datafusion_error(msg: &str) -> TranslatedError {
154    translate_datafusion_error_with_context(msg, None)
155}
156
157/// Like [`translate_datafusion_error`] but accepts an optional list of
158/// available column names for typo suggestions.
159#[must_use]
160pub fn translate_datafusion_error_with_context(
161    msg: &str,
162    available_columns: Option<&[&str]>,
163) -> TranslatedError {
164    let clean = sanitize(msg);
165
166    // Column not found
167    if let Some(col) = extract_missing_column(clean) {
168        let hint = available_columns.and_then(|cols| match resolve_column_name(col, cols) {
169            Ok(actual) if actual != col => Some(format!(
170                "Column is named '{actual}' (case differs). \
171                         Use \"{actual}\" or match the exact casing."
172            )),
173            Err(suggest::ColumnResolveError::Ambiguous { matches, .. }) => Some(format!(
174                "Multiple columns match case-insensitively: {}. \
175                         Use double quotes for exact match.",
176                matches.join(", ")
177            )),
178            _ => suggest_column(col, cols),
179        });
180        return TranslatedError {
181            code: codes::COLUMN_NOT_FOUND,
182            message: format!("Column '{col}' not found in query"),
183            hint,
184        };
185    }
186
187    // Table not found
188    if let Some(table) = extract_missing_table(clean) {
189        return TranslatedError {
190            code: codes::TABLE_NOT_FOUND,
191            message: format!("Table or source '{table}' not found"),
192            hint: Some("Use SHOW TABLES to see available sources and tables".to_string()),
193        };
194    }
195
196    // Type mismatch
197    if clean.contains("mismatch")
198        || clean.contains("must match")
199        || clean.contains("cannot be cast")
200    {
201        return TranslatedError {
202            code: codes::TYPE_MISMATCH,
203            message: format!("Type mismatch: {clean}"),
204            hint: Some("Check column types with DESCRIBE <table>".to_string()),
205        };
206    }
207
208    // Window / watermark errors
209    if let Some(translated) = check_window_errors(clean) {
210        return translated;
211    }
212
213    // Join errors
214    if let Some(translated) = check_join_errors(clean) {
215        return translated;
216    }
217
218    // Unsupported / not implemented
219    if clean.contains("Unsupported")
220        || clean.contains("NotImplemented")
221        || clean.contains("This feature is not implemented")
222    {
223        return TranslatedError {
224            code: codes::UNSUPPORTED_SQL,
225            message: format!("Unsupported SQL syntax: {clean}"),
226            hint: None,
227        };
228    }
229
230    // Planning error — Plan("...")
231    if clean.starts_with("Plan(\"") {
232        let detail = clean
233            .strip_prefix("Plan(\"")
234            .and_then(|s| s.strip_suffix("\")"))
235            .unwrap_or(clean);
236        return TranslatedError {
237            code: codes::PLANNING_FAILED,
238            message: format!("Query planning failed: {detail}"),
239            hint: None,
240        };
241    }
242
243    // Planning error — "Error during planning"
244    if clean.contains("Error during planning") {
245        return TranslatedError {
246            code: codes::PLANNING_FAILED,
247            message: format!("Query planning failed: {clean}"),
248            hint: None,
249        };
250    }
251
252    // Execution error
253    if clean.contains("Execution error") {
254        return TranslatedError {
255            code: codes::EXECUTION_FAILED,
256            message: format!("Query execution failed: {clean}"),
257            hint: None,
258        };
259    }
260
261    // Fallback — unknown pattern
262    TranslatedError {
263        code: codes::INTERNAL,
264        message: format!("Internal query error: {clean}"),
265        hint: Some("If this persists, file a bug report".to_string()),
266    }
267}
268
269/// Check for window/watermark-related error patterns.
270fn check_window_errors(clean: &str) -> Option<TranslatedError> {
271    let lower = clean.to_ascii_lowercase();
272
273    // "Window error:" prefix from parser — classify as WINDOW_INVALID
274    if lower.starts_with("window error:") {
275        return Some(TranslatedError {
276            code: codes::WINDOW_INVALID,
277            message: format!("Invalid window specification: {clean}"),
278            hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
279        });
280    }
281
282    if lower.contains("watermark") && (lower.contains("required") || lower.contains("missing")) {
283        return Some(TranslatedError {
284            code: codes::WATERMARK_REQUIRED,
285            message: format!("Watermark required: {clean}"),
286            hint: Some(
287                "Add WATERMARK FOR <column> AS <column> - INTERVAL '<n>' SECOND \
288                 to the CREATE SOURCE statement"
289                    .to_string(),
290            ),
291        });
292    }
293
294    if lower.contains("window") && (lower.contains("invalid") || lower.contains("not supported")) {
295        return Some(TranslatedError {
296            code: codes::WINDOW_INVALID,
297            message: format!("Invalid window specification: {clean}"),
298            hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
299        });
300    }
301
302    if lower.contains("window")
303        && lower.contains("size")
304        && (lower.contains("zero") || lower.contains("negative") || lower.contains("positive"))
305    {
306        return Some(TranslatedError {
307            code: codes::WINDOW_SIZE_INVALID,
308            message: format!("Invalid window size: {clean}"),
309            hint: Some("Window size must be a positive interval".to_string()),
310        });
311    }
312
313    // Late data rejected/dropped
314    if lower.contains("late")
315        && (lower.contains("data") || lower.contains("event"))
316        && (lower.contains("rejected") || lower.contains("dropped"))
317    {
318        return Some(TranslatedError {
319            code: codes::LATE_DATA_REJECTED,
320            message: format!("Late data rejected: {clean}"),
321            hint: Some(
322                "Increase the allowed lateness with ALLOWED LATENESS INTERVAL, \
323                 or route late data to a side output"
324                    .to_string(),
325            ),
326        });
327    }
328
329    None
330}
331
332/// Check for join-related error patterns.
333fn check_join_errors(clean: &str) -> Option<TranslatedError> {
334    let lower = clean.to_ascii_lowercase();
335
336    // "Streaming SQL error:" prefix — classify sub-patterns
337    if lower.starts_with("streaming sql error:") {
338        if lower.contains("using clause requires") {
339            return Some(TranslatedError {
340                code: codes::JOIN_KEY_MISSING,
341                message: format!("Join key error: {clean}"),
342                hint: Some(
343                    "Ensure the USING clause references columns that exist \
344                     in both sides of the join"
345                        .to_string(),
346                ),
347            });
348        }
349        if lower.contains("cannot extract time bound") || lower.contains("tolerance") {
350            return Some(TranslatedError {
351                code: codes::JOIN_TIME_BOUND_MISSING,
352                message: format!("Join time bound required: {clean}"),
353                hint: Some(
354                    "Stream-stream joins require a time bound in the ON clause, e.g.: \
355                     AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
356                        .to_string(),
357                ),
358            });
359        }
360    }
361
362    if lower.contains("join") && lower.contains("key") && lower.contains("not found") {
363        return Some(TranslatedError {
364            code: codes::JOIN_KEY_MISSING,
365            message: format!("Join key error: {clean}"),
366            hint: Some(
367                "Ensure the ON clause references columns that exist \
368                 in both sides of the join"
369                    .to_string(),
370            ),
371        });
372    }
373
374    if lower.contains("join")
375        && (lower.contains("time bound") || lower.contains("interval"))
376        && lower.contains("required")
377    {
378        return Some(TranslatedError {
379            code: codes::JOIN_TIME_BOUND_MISSING,
380            message: format!("Join time bound required: {clean}"),
381            hint: Some(
382                "Stream-stream joins require a time bound in the ON clause, e.g.: \
383                 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
384                    .to_string(),
385            ),
386        });
387    }
388
389    if lower.contains("temporal") && lower.contains("primary key") {
390        return Some(TranslatedError {
391            code: codes::TEMPORAL_JOIN_NO_PK,
392            message: format!("Temporal join error: {clean}"),
393            hint: Some(
394                "The right-side table of a temporal join requires a PRIMARY KEY".to_string(),
395            ),
396        });
397    }
398
399    // Unsupported join types for streaming
400    if (lower.contains("not supported for streaming")
401        || lower.contains("natural join not supported")
402        || lower.contains("cross join not supported")
403        || lower.contains("unsupported join"))
404        && lower.contains("join")
405    {
406        return Some(TranslatedError {
407            code: codes::JOIN_TYPE_UNSUPPORTED,
408            message: format!("Unsupported join type: {clean}"),
409            hint: Some(
410                "Streaming queries support INNER, LEFT, RIGHT, and FULL OUTER joins \
411                 with time bounds. CROSS and NATURAL joins are not supported."
412                    .to_string(),
413            ),
414        });
415    }
416
417    None
418}
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423
424    // -- translate_datafusion_error tests --
425
426    #[test]
427    fn test_column_not_found_single_quotes() {
428        let t = translate_datafusion_error("No field named 'foo'");
429        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
430        assert!(t.message.contains("foo"));
431        assert!(t.message.contains("not found"));
432        assert!(!t.message.contains("DataFusion"));
433    }
434
435    #[test]
436    fn test_column_not_found_double_quotes() {
437        let t = translate_datafusion_error("No field named \"bar\"");
438        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
439        assert!(t.message.contains("bar"));
440    }
441
442    #[test]
443    fn test_column_not_found_with_prefix() {
444        let t = translate_datafusion_error("Schema error: No field named 'baz'");
445        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
446        assert!(t.message.contains("baz"));
447    }
448
449    #[test]
450    fn test_column_not_found_bare_word_with_trailing_period() {
451        let t = translate_datafusion_error("No field named ref_price.");
452        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
453        assert!(
454            t.message.contains("'ref_price'"),
455            "should strip trailing period: {}",
456            t.message
457        );
458        assert!(
459            !t.message.contains("ref_price."),
460            "should not include trailing period: {}",
461            t.message
462        );
463    }
464
465    #[test]
466    fn test_column_not_found_bare_word_with_valid_fields() {
467        let t = translate_datafusion_error(
468            "Schema error: No field named ref_price. Valid fields: symbol, event_time",
469        );
470        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
471        assert!(
472            t.message.contains("'ref_price'"),
473            "should extract clean column name: {}",
474            t.message
475        );
476    }
477
478    #[test]
479    fn test_table_not_found() {
480        let t = translate_datafusion_error("table 'orders' not found");
481        assert_eq!(t.code, codes::TABLE_NOT_FOUND);
482        assert!(t.message.contains("orders"));
483        assert!(t.hint.is_some());
484        assert!(t.hint.unwrap().contains("SHOW TABLES"));
485    }
486
487    #[test]
488    fn test_type_mismatch() {
489        let t = translate_datafusion_error("column types must match for UNION");
490        assert_eq!(t.code, codes::TYPE_MISMATCH);
491        assert!(t.hint.is_some());
492        assert!(t.hint.unwrap().contains("DESCRIBE"));
493    }
494
495    #[test]
496    fn test_type_cannot_cast() {
497        let t = translate_datafusion_error("cannot be cast to Int64");
498        assert_eq!(t.code, codes::TYPE_MISMATCH);
499    }
500
501    #[test]
502    fn test_unsupported_sql() {
503        let t = translate_datafusion_error("This feature is not implemented: LATERAL JOIN");
504        assert_eq!(t.code, codes::UNSUPPORTED_SQL);
505        assert!(t.message.contains("LATERAL JOIN"));
506    }
507
508    #[test]
509    fn test_plan_error_with_column_extracts_column() {
510        // When a Plan error wraps a "No field named" message, the more
511        // specific column-not-found code is preferred over generic planning.
512        let t = translate_datafusion_error("Plan(\"No field named 'x' in schema\")");
513        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
514        assert!(t.message.contains("'x'"));
515    }
516
517    #[test]
518    fn test_plan_error_generic() {
519        let t = translate_datafusion_error("Plan(\"aggregate function not found\")");
520        assert_eq!(t.code, codes::PLANNING_FAILED);
521        assert!(t.message.contains("aggregate function not found"));
522    }
523
524    #[test]
525    fn test_error_during_planning() {
526        let t = translate_datafusion_error("Error during planning: ambiguous reference 'id'");
527        assert_eq!(t.code, codes::PLANNING_FAILED);
528    }
529
530    #[test]
531    fn test_execution_error() {
532        let t = translate_datafusion_error("Execution error: division by zero");
533        assert_eq!(t.code, codes::EXECUTION_FAILED);
534    }
535
536    #[test]
537    fn test_unknown_fallback() {
538        let t = translate_datafusion_error("some totally unknown error");
539        assert_eq!(t.code, codes::INTERNAL);
540        assert!(t.message.contains("Internal query error"));
541        assert!(t.hint.is_some());
542    }
543
544    #[test]
545    fn test_prefix_stripping() {
546        let t = translate_datafusion_error("DataFusion error: Arrow error: No field named 'x'");
547        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
548        assert!(t.message.contains("'x'"));
549        assert!(!t.message.contains("DataFusion"));
550        assert!(!t.message.contains("Arrow error"));
551    }
552
553    #[test]
554    fn test_display_with_hint() {
555        let t = translate_datafusion_error("table 'foo' not found");
556        let display = t.to_string();
557        assert!(display.starts_with("[LDB-1101]"));
558        assert!(display.contains("(hint:"));
559    }
560
561    #[test]
562    fn test_display_without_hint() {
563        let t = translate_datafusion_error("Execution error: oops");
564        let display = t.to_string();
565        assert!(display.starts_with("[LDB-9001]"));
566        assert!(!display.contains("hint"));
567    }
568
569    // -- suggest_column tests --
570
571    #[test]
572    fn test_suggest_column_found() {
573        let result = suggest_column("user_ie", &["user_id", "email"]);
574        assert_eq!(result, Some("Did you mean 'user_id'?".to_string()));
575    }
576
577    #[test]
578    fn test_suggest_column_none() {
579        let result = suggest_column("xyz", &["user_id", "email"]);
580        assert_eq!(result, None);
581    }
582
583    // -- translate_datafusion_error_with_context tests --
584
585    #[test]
586    fn test_column_not_found_with_suggestion() {
587        let cols = &["user_id", "email", "price"];
588        let t = translate_datafusion_error_with_context("No field named 'user_ie'", Some(cols));
589        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
590        assert!(t.message.contains("user_ie"));
591        assert!(t.hint.is_some());
592        assert!(
593            t.hint.as_ref().unwrap().contains("user_id"),
594            "hint should suggest 'user_id': {:?}",
595            t.hint
596        );
597    }
598
599    #[test]
600    fn test_column_not_found_no_close_match() {
601        let cols = &["user_id", "email"];
602        let t = translate_datafusion_error_with_context("No field named 'zzzzz'", Some(cols));
603        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
604        assert!(t.hint.is_none());
605    }
606
607    #[test]
608    fn test_column_not_found_without_context() {
609        let t = translate_datafusion_error("No field named 'foo'");
610        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
611        assert!(t.hint.is_none()); // no columns provided
612    }
613
614    #[test]
615    fn test_column_not_found_case_insensitive_hint() {
616        let cols = &["tradeId", "symbol", "lastPrice"];
617        let t = translate_datafusion_error_with_context("No field named 'tradeid'", Some(cols));
618        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
619        assert!(t.hint.is_some());
620        let hint = t.hint.unwrap();
621        assert!(
622            hint.contains("tradeId"),
623            "hint should mention actual name: {hint}"
624        );
625        assert!(hint.contains("case"), "hint should mention case: {hint}");
626    }
627
628    #[test]
629    fn test_column_not_found_ambiguous_case_hint() {
630        let cols = &["price", "Price", "PRICE"];
631        let t = translate_datafusion_error_with_context("No field named 'pRiCe'", Some(cols));
632        assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
633        assert!(t.hint.is_some());
634        let hint = t.hint.unwrap();
635        assert!(
636            hint.contains("Multiple"),
637            "hint should mention ambiguity: {hint}"
638        );
639    }
640
641    // -- window error code tests --
642
643    #[test]
644    fn test_watermark_required_error() {
645        let t = translate_datafusion_error("Watermark required for EMIT ON WINDOW CLOSE");
646        assert_eq!(t.code, codes::WATERMARK_REQUIRED);
647        assert!(t.hint.is_some());
648        assert!(t.hint.unwrap().contains("WATERMARK FOR"));
649    }
650
651    #[test]
652    fn test_window_invalid_error() {
653        let t = translate_datafusion_error("Window type not supported for this operation");
654        assert_eq!(t.code, codes::WINDOW_INVALID);
655        assert!(t.hint.unwrap().contains("TUMBLE"));
656    }
657
658    // -- join error code tests --
659
660    #[test]
661    fn test_join_key_not_found_error() {
662        let t = translate_datafusion_error("Join key 'user_id' not found in right table");
663        assert_eq!(t.code, codes::JOIN_KEY_MISSING);
664        assert!(t.hint.unwrap().contains("ON clause"));
665    }
666
667    #[test]
668    fn test_temporal_join_pk_error() {
669        let t = translate_datafusion_error("Temporal join requires a primary key on right table");
670        assert_eq!(t.code, codes::TEMPORAL_JOIN_NO_PK);
671        assert!(t.hint.unwrap().contains("PRIMARY KEY"));
672    }
673
674    // -- LDB-2004 LATE_DATA_REJECTED tests --
675
676    #[test]
677    fn test_late_data_rejected() {
678        let t = translate_datafusion_error("late data rejected by window policy");
679        assert_eq!(t.code, codes::LATE_DATA_REJECTED);
680        assert!(t.hint.unwrap().contains("lateness"));
681    }
682
683    #[test]
684    fn test_late_event_dropped() {
685        let t = translate_datafusion_error("late event dropped after window close");
686        assert_eq!(t.code, codes::LATE_DATA_REJECTED);
687    }
688
689    // -- "Window error:" prefix test --
690
691    #[test]
692    fn test_window_error_prefix() {
693        let t = translate_datafusion_error("Window error: CUMULATE requires step <= size");
694        assert_eq!(t.code, codes::WINDOW_INVALID);
695        assert!(t.hint.unwrap().contains("CUMULATE"));
696    }
697
698    // -- LDB-3004 JOIN_TYPE_UNSUPPORTED tests --
699
700    #[test]
701    fn test_join_type_unsupported_cross() {
702        let t = translate_datafusion_error("cross join not supported for streaming queries");
703        assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
704        assert!(t.hint.unwrap().contains("CROSS"));
705    }
706
707    #[test]
708    fn test_join_type_unsupported_natural() {
709        let t = translate_datafusion_error("natural join not supported in streaming context");
710        assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
711    }
712
713    // -- "Streaming SQL error:" prefix tests --
714
715    #[test]
716    fn test_streaming_sql_error_using_clause() {
717        let t = translate_datafusion_error(
718            "Streaming SQL error: using clause requires matching columns",
719        );
720        assert_eq!(t.code, codes::JOIN_KEY_MISSING);
721    }
722
723    #[test]
724    fn test_streaming_sql_error_time_bound() {
725        let t = translate_datafusion_error(
726            "Streaming SQL error: cannot extract time bound from ON clause",
727        );
728        assert_eq!(t.code, codes::JOIN_TIME_BOUND_MISSING);
729    }
730}