1pub mod suggest;
18
19use suggest::{closest_match, resolve_column_name};
20
21pub mod codes {
27 pub use laminar_core::error_codes::SQL_COLUMN_NOT_FOUND as COLUMN_NOT_FOUND;
29 pub use laminar_core::error_codes::SQL_PLANNING_FAILED as PLANNING_FAILED;
30 pub use laminar_core::error_codes::SQL_TABLE_NOT_FOUND as TABLE_NOT_FOUND;
31 pub use laminar_core::error_codes::SQL_TYPE_MISMATCH as TYPE_MISMATCH;
32 pub use laminar_core::error_codes::SQL_UNSUPPORTED as UNSUPPORTED_SQL;
33
34 pub use laminar_core::error_codes::LATE_DATA_REJECTED;
36 pub use laminar_core::error_codes::WATERMARK_REQUIRED;
37 pub use laminar_core::error_codes::WINDOW_INVALID;
38 pub use laminar_core::error_codes::WINDOW_SIZE_INVALID;
39
40 pub use laminar_core::error_codes::JOIN_KEY_MISSING;
42 pub use laminar_core::error_codes::JOIN_TIME_BOUND_MISSING;
43 pub use laminar_core::error_codes::JOIN_TYPE_UNSUPPORTED;
44 pub use laminar_core::error_codes::TEMPORAL_JOIN_NO_PK;
45
46 pub const INTERNAL: &str = "LDB-9000";
48 pub const EXECUTION_FAILED: &str = "LDB-9001";
50}
51
52#[derive(Debug, Clone)]
54pub struct TranslatedError {
55 pub code: &'static str,
57 pub message: String,
59 pub hint: Option<String>,
61}
62
63impl std::fmt::Display for TranslatedError {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(f, "[{}] {}", self.code, self.message)?;
66 if let Some(hint) = &self.hint {
67 write!(f, " (hint: {hint})")?;
68 }
69 Ok(())
70 }
71}
72
73#[must_use]
78pub fn suggest_column(input: &str, available: &[&str]) -> Option<String> {
79 closest_match(input, available, 2).map(|m| format!("Did you mean '{m}'?"))
80}
81
82fn sanitize(msg: &str) -> &str {
88 const PREFIXES: &[&str] = &[
89 "DataFusion error: ",
90 "Arrow error: ",
91 "Schema error: ",
92 "External error: ",
93 ];
94 let mut s = msg;
95 for prefix in PREFIXES {
96 if let Some(rest) = s.strip_prefix(prefix) {
97 s = rest;
98 }
99 }
100 s
101}
102
103fn extract_missing_column(msg: &str) -> Option<&str> {
105 let needle = "No field named ";
106 let idx = msg.find(needle)?;
107 let after = &msg[idx + needle.len()..];
108 extract_quoted(after)
109}
110
111fn extract_missing_table(msg: &str) -> Option<&str> {
113 let lower = msg.to_ascii_lowercase();
115 let needle = "table '";
116 let idx = lower.find(needle)?;
117 let after = &msg[idx + needle.len()..];
118 after.find('\'').map(|end| &after[..end])
119}
120
121fn extract_quoted(s: &str) -> Option<&str> {
123 if let Some(rest) = s.strip_prefix('\'') {
124 rest.find('\'').map(|end| &rest[..end])
125 } else if let Some(rest) = s.strip_prefix('"') {
126 rest.find('"').map(|end| &rest[..end])
127 } else {
128 let end = s.find(|c: char| c.is_whitespace() || c == ',' || c == ')');
131 let word = match end {
132 Some(i) => &s[..i],
133 None => s,
134 };
135 let word = word.strip_suffix('.').unwrap_or(word);
136 if word.is_empty() {
137 None
138 } else {
139 Some(word)
140 }
141 }
142}
143
144#[must_use]
153pub fn translate_datafusion_error(msg: &str) -> TranslatedError {
154 translate_datafusion_error_with_context(msg, None)
155}
156
157#[must_use]
160pub fn translate_datafusion_error_with_context(
161 msg: &str,
162 available_columns: Option<&[&str]>,
163) -> TranslatedError {
164 let clean = sanitize(msg);
165
166 if let Some(col) = extract_missing_column(clean) {
168 let hint = available_columns.and_then(|cols| match resolve_column_name(col, cols) {
169 Ok(actual) if actual != col => Some(format!(
170 "Column is named '{actual}' (case differs). \
171 Use \"{actual}\" or match the exact casing."
172 )),
173 Err(suggest::ColumnResolveError::Ambiguous { matches, .. }) => Some(format!(
174 "Multiple columns match case-insensitively: {}. \
175 Use double quotes for exact match.",
176 matches.join(", ")
177 )),
178 _ => suggest_column(col, cols),
179 });
180 return TranslatedError {
181 code: codes::COLUMN_NOT_FOUND,
182 message: format!("Column '{col}' not found in query"),
183 hint,
184 };
185 }
186
187 if let Some(table) = extract_missing_table(clean) {
189 return TranslatedError {
190 code: codes::TABLE_NOT_FOUND,
191 message: format!("Table or source '{table}' not found"),
192 hint: Some("Use SHOW TABLES to see available sources and tables".to_string()),
193 };
194 }
195
196 if clean.contains("mismatch")
198 || clean.contains("must match")
199 || clean.contains("cannot be cast")
200 {
201 return TranslatedError {
202 code: codes::TYPE_MISMATCH,
203 message: format!("Type mismatch: {clean}"),
204 hint: Some("Check column types with DESCRIBE <table>".to_string()),
205 };
206 }
207
208 if let Some(translated) = check_window_errors(clean) {
210 return translated;
211 }
212
213 if let Some(translated) = check_join_errors(clean) {
215 return translated;
216 }
217
218 if clean.contains("Unsupported")
220 || clean.contains("NotImplemented")
221 || clean.contains("This feature is not implemented")
222 {
223 return TranslatedError {
224 code: codes::UNSUPPORTED_SQL,
225 message: format!("Unsupported SQL syntax: {clean}"),
226 hint: None,
227 };
228 }
229
230 if clean.starts_with("Plan(\"") {
232 let detail = clean
233 .strip_prefix("Plan(\"")
234 .and_then(|s| s.strip_suffix("\")"))
235 .unwrap_or(clean);
236 return TranslatedError {
237 code: codes::PLANNING_FAILED,
238 message: format!("Query planning failed: {detail}"),
239 hint: None,
240 };
241 }
242
243 if clean.contains("Error during planning") {
245 return TranslatedError {
246 code: codes::PLANNING_FAILED,
247 message: format!("Query planning failed: {clean}"),
248 hint: None,
249 };
250 }
251
252 if clean.contains("Execution error") {
254 return TranslatedError {
255 code: codes::EXECUTION_FAILED,
256 message: format!("Query execution failed: {clean}"),
257 hint: None,
258 };
259 }
260
261 TranslatedError {
263 code: codes::INTERNAL,
264 message: format!("Internal query error: {clean}"),
265 hint: Some("If this persists, file a bug report".to_string()),
266 }
267}
268
269fn check_window_errors(clean: &str) -> Option<TranslatedError> {
271 let lower = clean.to_ascii_lowercase();
272
273 if lower.starts_with("window error:") {
275 return Some(TranslatedError {
276 code: codes::WINDOW_INVALID,
277 message: format!("Invalid window specification: {clean}"),
278 hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
279 });
280 }
281
282 if lower.contains("watermark") && (lower.contains("required") || lower.contains("missing")) {
283 return Some(TranslatedError {
284 code: codes::WATERMARK_REQUIRED,
285 message: format!("Watermark required: {clean}"),
286 hint: Some(
287 "Add WATERMARK FOR <column> AS <column> - INTERVAL '<n>' SECOND \
288 to the CREATE SOURCE statement"
289 .to_string(),
290 ),
291 });
292 }
293
294 if lower.contains("window") && (lower.contains("invalid") || lower.contains("not supported")) {
295 return Some(TranslatedError {
296 code: codes::WINDOW_INVALID,
297 message: format!("Invalid window specification: {clean}"),
298 hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
299 });
300 }
301
302 if lower.contains("window")
303 && lower.contains("size")
304 && (lower.contains("zero") || lower.contains("negative") || lower.contains("positive"))
305 {
306 return Some(TranslatedError {
307 code: codes::WINDOW_SIZE_INVALID,
308 message: format!("Invalid window size: {clean}"),
309 hint: Some("Window size must be a positive interval".to_string()),
310 });
311 }
312
313 if lower.contains("late")
315 && (lower.contains("data") || lower.contains("event"))
316 && (lower.contains("rejected") || lower.contains("dropped"))
317 {
318 return Some(TranslatedError {
319 code: codes::LATE_DATA_REJECTED,
320 message: format!("Late data rejected: {clean}"),
321 hint: Some(
322 "Increase the allowed lateness with ALLOWED LATENESS INTERVAL, \
323 or route late data to a side output"
324 .to_string(),
325 ),
326 });
327 }
328
329 None
330}
331
332fn check_join_errors(clean: &str) -> Option<TranslatedError> {
334 let lower = clean.to_ascii_lowercase();
335
336 if lower.starts_with("streaming sql error:") {
338 if lower.contains("using clause requires") {
339 return Some(TranslatedError {
340 code: codes::JOIN_KEY_MISSING,
341 message: format!("Join key error: {clean}"),
342 hint: Some(
343 "Ensure the USING clause references columns that exist \
344 in both sides of the join"
345 .to_string(),
346 ),
347 });
348 }
349 if lower.contains("cannot extract time bound") || lower.contains("tolerance") {
350 return Some(TranslatedError {
351 code: codes::JOIN_TIME_BOUND_MISSING,
352 message: format!("Join time bound required: {clean}"),
353 hint: Some(
354 "Stream-stream joins require a time bound in the ON clause, e.g.: \
355 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
356 .to_string(),
357 ),
358 });
359 }
360 }
361
362 if lower.contains("join") && lower.contains("key") && lower.contains("not found") {
363 return Some(TranslatedError {
364 code: codes::JOIN_KEY_MISSING,
365 message: format!("Join key error: {clean}"),
366 hint: Some(
367 "Ensure the ON clause references columns that exist \
368 in both sides of the join"
369 .to_string(),
370 ),
371 });
372 }
373
374 if lower.contains("join")
375 && (lower.contains("time bound") || lower.contains("interval"))
376 && lower.contains("required")
377 {
378 return Some(TranslatedError {
379 code: codes::JOIN_TIME_BOUND_MISSING,
380 message: format!("Join time bound required: {clean}"),
381 hint: Some(
382 "Stream-stream joins require a time bound in the ON clause, e.g.: \
383 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
384 .to_string(),
385 ),
386 });
387 }
388
389 if lower.contains("temporal") && lower.contains("primary key") {
390 return Some(TranslatedError {
391 code: codes::TEMPORAL_JOIN_NO_PK,
392 message: format!("Temporal join error: {clean}"),
393 hint: Some(
394 "The right-side table of a temporal join requires a PRIMARY KEY".to_string(),
395 ),
396 });
397 }
398
399 if (lower.contains("not supported for streaming")
401 || lower.contains("natural join not supported")
402 || lower.contains("cross join not supported")
403 || lower.contains("unsupported join"))
404 && lower.contains("join")
405 {
406 return Some(TranslatedError {
407 code: codes::JOIN_TYPE_UNSUPPORTED,
408 message: format!("Unsupported join type: {clean}"),
409 hint: Some(
410 "Streaming queries support INNER, LEFT, RIGHT, and FULL OUTER joins \
411 with time bounds. CROSS and NATURAL joins are not supported."
412 .to_string(),
413 ),
414 });
415 }
416
417 None
418}
419
420#[cfg(test)]
421mod tests {
422 use super::*;
423
424 #[test]
427 fn test_column_not_found_single_quotes() {
428 let t = translate_datafusion_error("No field named 'foo'");
429 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
430 assert!(t.message.contains("foo"));
431 assert!(t.message.contains("not found"));
432 assert!(!t.message.contains("DataFusion"));
433 }
434
435 #[test]
436 fn test_column_not_found_double_quotes() {
437 let t = translate_datafusion_error("No field named \"bar\"");
438 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
439 assert!(t.message.contains("bar"));
440 }
441
442 #[test]
443 fn test_column_not_found_with_prefix() {
444 let t = translate_datafusion_error("Schema error: No field named 'baz'");
445 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
446 assert!(t.message.contains("baz"));
447 }
448
449 #[test]
450 fn test_column_not_found_bare_word_with_trailing_period() {
451 let t = translate_datafusion_error("No field named ref_price.");
452 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
453 assert!(
454 t.message.contains("'ref_price'"),
455 "should strip trailing period: {}",
456 t.message
457 );
458 assert!(
459 !t.message.contains("ref_price."),
460 "should not include trailing period: {}",
461 t.message
462 );
463 }
464
465 #[test]
466 fn test_column_not_found_bare_word_with_valid_fields() {
467 let t = translate_datafusion_error(
468 "Schema error: No field named ref_price. Valid fields: symbol, event_time",
469 );
470 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
471 assert!(
472 t.message.contains("'ref_price'"),
473 "should extract clean column name: {}",
474 t.message
475 );
476 }
477
478 #[test]
479 fn test_table_not_found() {
480 let t = translate_datafusion_error("table 'orders' not found");
481 assert_eq!(t.code, codes::TABLE_NOT_FOUND);
482 assert!(t.message.contains("orders"));
483 assert!(t.hint.is_some());
484 assert!(t.hint.unwrap().contains("SHOW TABLES"));
485 }
486
487 #[test]
488 fn test_type_mismatch() {
489 let t = translate_datafusion_error("column types must match for UNION");
490 assert_eq!(t.code, codes::TYPE_MISMATCH);
491 assert!(t.hint.is_some());
492 assert!(t.hint.unwrap().contains("DESCRIBE"));
493 }
494
495 #[test]
496 fn test_type_cannot_cast() {
497 let t = translate_datafusion_error("cannot be cast to Int64");
498 assert_eq!(t.code, codes::TYPE_MISMATCH);
499 }
500
501 #[test]
502 fn test_unsupported_sql() {
503 let t = translate_datafusion_error("This feature is not implemented: LATERAL JOIN");
504 assert_eq!(t.code, codes::UNSUPPORTED_SQL);
505 assert!(t.message.contains("LATERAL JOIN"));
506 }
507
508 #[test]
509 fn test_plan_error_with_column_extracts_column() {
510 let t = translate_datafusion_error("Plan(\"No field named 'x' in schema\")");
513 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
514 assert!(t.message.contains("'x'"));
515 }
516
517 #[test]
518 fn test_plan_error_generic() {
519 let t = translate_datafusion_error("Plan(\"aggregate function not found\")");
520 assert_eq!(t.code, codes::PLANNING_FAILED);
521 assert!(t.message.contains("aggregate function not found"));
522 }
523
524 #[test]
525 fn test_error_during_planning() {
526 let t = translate_datafusion_error("Error during planning: ambiguous reference 'id'");
527 assert_eq!(t.code, codes::PLANNING_FAILED);
528 }
529
530 #[test]
531 fn test_execution_error() {
532 let t = translate_datafusion_error("Execution error: division by zero");
533 assert_eq!(t.code, codes::EXECUTION_FAILED);
534 }
535
536 #[test]
537 fn test_unknown_fallback() {
538 let t = translate_datafusion_error("some totally unknown error");
539 assert_eq!(t.code, codes::INTERNAL);
540 assert!(t.message.contains("Internal query error"));
541 assert!(t.hint.is_some());
542 }
543
544 #[test]
545 fn test_prefix_stripping() {
546 let t = translate_datafusion_error("DataFusion error: Arrow error: No field named 'x'");
547 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
548 assert!(t.message.contains("'x'"));
549 assert!(!t.message.contains("DataFusion"));
550 assert!(!t.message.contains("Arrow error"));
551 }
552
553 #[test]
554 fn test_display_with_hint() {
555 let t = translate_datafusion_error("table 'foo' not found");
556 let display = t.to_string();
557 assert!(display.starts_with("[LDB-1101]"));
558 assert!(display.contains("(hint:"));
559 }
560
561 #[test]
562 fn test_display_without_hint() {
563 let t = translate_datafusion_error("Execution error: oops");
564 let display = t.to_string();
565 assert!(display.starts_with("[LDB-9001]"));
566 assert!(!display.contains("hint"));
567 }
568
569 #[test]
572 fn test_suggest_column_found() {
573 let result = suggest_column("user_ie", &["user_id", "email"]);
574 assert_eq!(result, Some("Did you mean 'user_id'?".to_string()));
575 }
576
577 #[test]
578 fn test_suggest_column_none() {
579 let result = suggest_column("xyz", &["user_id", "email"]);
580 assert_eq!(result, None);
581 }
582
583 #[test]
586 fn test_column_not_found_with_suggestion() {
587 let cols = &["user_id", "email", "price"];
588 let t = translate_datafusion_error_with_context("No field named 'user_ie'", Some(cols));
589 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
590 assert!(t.message.contains("user_ie"));
591 assert!(t.hint.is_some());
592 assert!(
593 t.hint.as_ref().unwrap().contains("user_id"),
594 "hint should suggest 'user_id': {:?}",
595 t.hint
596 );
597 }
598
599 #[test]
600 fn test_column_not_found_no_close_match() {
601 let cols = &["user_id", "email"];
602 let t = translate_datafusion_error_with_context("No field named 'zzzzz'", Some(cols));
603 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
604 assert!(t.hint.is_none());
605 }
606
607 #[test]
608 fn test_column_not_found_without_context() {
609 let t = translate_datafusion_error("No field named 'foo'");
610 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
611 assert!(t.hint.is_none()); }
613
614 #[test]
615 fn test_column_not_found_case_insensitive_hint() {
616 let cols = &["tradeId", "symbol", "lastPrice"];
617 let t = translate_datafusion_error_with_context("No field named 'tradeid'", Some(cols));
618 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
619 assert!(t.hint.is_some());
620 let hint = t.hint.unwrap();
621 assert!(
622 hint.contains("tradeId"),
623 "hint should mention actual name: {hint}"
624 );
625 assert!(hint.contains("case"), "hint should mention case: {hint}");
626 }
627
628 #[test]
629 fn test_column_not_found_ambiguous_case_hint() {
630 let cols = &["price", "Price", "PRICE"];
631 let t = translate_datafusion_error_with_context("No field named 'pRiCe'", Some(cols));
632 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
633 assert!(t.hint.is_some());
634 let hint = t.hint.unwrap();
635 assert!(
636 hint.contains("Multiple"),
637 "hint should mention ambiguity: {hint}"
638 );
639 }
640
641 #[test]
644 fn test_watermark_required_error() {
645 let t = translate_datafusion_error("Watermark required for EMIT ON WINDOW CLOSE");
646 assert_eq!(t.code, codes::WATERMARK_REQUIRED);
647 assert!(t.hint.is_some());
648 assert!(t.hint.unwrap().contains("WATERMARK FOR"));
649 }
650
651 #[test]
652 fn test_window_invalid_error() {
653 let t = translate_datafusion_error("Window type not supported for this operation");
654 assert_eq!(t.code, codes::WINDOW_INVALID);
655 assert!(t.hint.unwrap().contains("TUMBLE"));
656 }
657
658 #[test]
661 fn test_join_key_not_found_error() {
662 let t = translate_datafusion_error("Join key 'user_id' not found in right table");
663 assert_eq!(t.code, codes::JOIN_KEY_MISSING);
664 assert!(t.hint.unwrap().contains("ON clause"));
665 }
666
667 #[test]
668 fn test_temporal_join_pk_error() {
669 let t = translate_datafusion_error("Temporal join requires a primary key on right table");
670 assert_eq!(t.code, codes::TEMPORAL_JOIN_NO_PK);
671 assert!(t.hint.unwrap().contains("PRIMARY KEY"));
672 }
673
674 #[test]
677 fn test_late_data_rejected() {
678 let t = translate_datafusion_error("late data rejected by window policy");
679 assert_eq!(t.code, codes::LATE_DATA_REJECTED);
680 assert!(t.hint.unwrap().contains("lateness"));
681 }
682
683 #[test]
684 fn test_late_event_dropped() {
685 let t = translate_datafusion_error("late event dropped after window close");
686 assert_eq!(t.code, codes::LATE_DATA_REJECTED);
687 }
688
689 #[test]
692 fn test_window_error_prefix() {
693 let t = translate_datafusion_error("Window error: CUMULATE requires step <= size");
694 assert_eq!(t.code, codes::WINDOW_INVALID);
695 assert!(t.hint.unwrap().contains("CUMULATE"));
696 }
697
698 #[test]
701 fn test_join_type_unsupported_cross() {
702 let t = translate_datafusion_error("cross join not supported for streaming queries");
703 assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
704 assert!(t.hint.unwrap().contains("CROSS"));
705 }
706
707 #[test]
708 fn test_join_type_unsupported_natural() {
709 let t = translate_datafusion_error("natural join not supported in streaming context");
710 assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
711 }
712
713 #[test]
716 fn test_streaming_sql_error_using_clause() {
717 let t = translate_datafusion_error(
718 "Streaming SQL error: using clause requires matching columns",
719 );
720 assert_eq!(t.code, codes::JOIN_KEY_MISSING);
721 }
722
723 #[test]
724 fn test_streaming_sql_error_time_bound() {
725 let t = translate_datafusion_error(
726 "Streaming SQL error: cannot extract time bound from ON clause",
727 );
728 assert_eq!(t.code, codes::JOIN_TIME_BOUND_MISSING);
729 }
730}