1pub mod suggest;
18
19use suggest::{closest_match, resolve_column_name};
20
21pub mod codes {
27 pub use laminar_core::error_codes::SQL_COLUMN_NOT_FOUND as COLUMN_NOT_FOUND;
29 pub use laminar_core::error_codes::SQL_PLANNING_FAILED as PLANNING_FAILED;
30 pub use laminar_core::error_codes::SQL_TABLE_NOT_FOUND as TABLE_NOT_FOUND;
31 pub use laminar_core::error_codes::SQL_TYPE_MISMATCH as TYPE_MISMATCH;
32 pub use laminar_core::error_codes::SQL_UNSUPPORTED as UNSUPPORTED_SQL;
33
34 pub use laminar_core::error_codes::LATE_DATA_REJECTED;
36 pub use laminar_core::error_codes::WATERMARK_REQUIRED;
37 pub use laminar_core::error_codes::WINDOW_INVALID;
38 pub use laminar_core::error_codes::WINDOW_SIZE_INVALID;
39
40 pub use laminar_core::error_codes::JOIN_KEY_MISSING;
42 pub use laminar_core::error_codes::JOIN_TIME_BOUND_MISSING;
43 pub use laminar_core::error_codes::JOIN_TYPE_UNSUPPORTED;
44 pub use laminar_core::error_codes::TEMPORAL_JOIN_NO_PK;
45
46 pub const INTERNAL: &str = "LDB-9000";
48 pub const EXECUTION_FAILED: &str = "LDB-9001";
50}
51
52#[derive(Debug, Clone)]
54pub struct TranslatedError {
55 pub code: &'static str,
57 pub message: String,
59 pub hint: Option<String>,
61}
62
63impl std::fmt::Display for TranslatedError {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(f, "[{}] {}", self.code, self.message)?;
66 if let Some(hint) = &self.hint {
67 write!(f, " (hint: {hint})")?;
68 }
69 Ok(())
70 }
71}
72
73#[must_use]
78pub fn suggest_column(input: &str, available: &[&str]) -> Option<String> {
79 closest_match(input, available, 2).map(|m| format!("Did you mean '{m}'?"))
80}
81
82fn sanitize(msg: &str) -> &str {
88 const PREFIXES: &[&str] = &[
89 "DataFusion error: ",
90 "Arrow error: ",
91 "Schema error: ",
92 "External error: ",
93 ];
94 let mut s = msg;
95 for prefix in PREFIXES {
96 if let Some(rest) = s.strip_prefix(prefix) {
97 s = rest;
98 }
99 }
100 s
101}
102
103fn extract_missing_column(msg: &str) -> Option<&str> {
105 let needle = "No field named ";
106 let idx = msg.find(needle)?;
107 let after = &msg[idx + needle.len()..];
108 extract_quoted(after)
109}
110
111fn extract_missing_table(msg: &str) -> Option<&str> {
113 let lower = msg.to_ascii_lowercase();
115 let needle = "table '";
116 let idx = lower.find(needle)?;
117 let after = &msg[idx + needle.len()..];
118 after.find('\'').map(|end| &after[..end])
119}
120
121fn extract_quoted(s: &str) -> Option<&str> {
123 if let Some(rest) = s.strip_prefix('\'') {
124 rest.find('\'').map(|end| &rest[..end])
125 } else if let Some(rest) = s.strip_prefix('"') {
126 rest.find('"').map(|end| &rest[..end])
127 } else {
128 let end = s.find(|c: char| c.is_whitespace() || c == ',' || c == ')');
130 let word = match end {
131 Some(i) => &s[..i],
132 None => s,
133 };
134 if word.is_empty() {
135 None
136 } else {
137 Some(word)
138 }
139 }
140}
141
142#[must_use]
151pub fn translate_datafusion_error(msg: &str) -> TranslatedError {
152 translate_datafusion_error_with_context(msg, None)
153}
154
155#[must_use]
158pub fn translate_datafusion_error_with_context(
159 msg: &str,
160 available_columns: Option<&[&str]>,
161) -> TranslatedError {
162 let clean = sanitize(msg);
163
164 if let Some(col) = extract_missing_column(clean) {
166 let hint = available_columns.and_then(|cols| match resolve_column_name(col, cols) {
167 Ok(actual) if actual != col => Some(format!(
168 "Column is named '{actual}' (case differs). \
169 Use \"{actual}\" or match the exact casing."
170 )),
171 Err(suggest::ColumnResolveError::Ambiguous { matches, .. }) => Some(format!(
172 "Multiple columns match case-insensitively: {}. \
173 Use double quotes for exact match.",
174 matches.join(", ")
175 )),
176 _ => suggest_column(col, cols),
177 });
178 return TranslatedError {
179 code: codes::COLUMN_NOT_FOUND,
180 message: format!("Column '{col}' not found in query"),
181 hint,
182 };
183 }
184
185 if let Some(table) = extract_missing_table(clean) {
187 return TranslatedError {
188 code: codes::TABLE_NOT_FOUND,
189 message: format!("Table or source '{table}' not found"),
190 hint: Some("Use SHOW TABLES to see available sources and tables".to_string()),
191 };
192 }
193
194 if clean.contains("mismatch")
196 || clean.contains("must match")
197 || clean.contains("cannot be cast")
198 {
199 return TranslatedError {
200 code: codes::TYPE_MISMATCH,
201 message: format!("Type mismatch: {clean}"),
202 hint: Some("Check column types with DESCRIBE <table>".to_string()),
203 };
204 }
205
206 if let Some(translated) = check_window_errors(clean) {
208 return translated;
209 }
210
211 if let Some(translated) = check_join_errors(clean) {
213 return translated;
214 }
215
216 if clean.contains("Unsupported")
218 || clean.contains("NotImplemented")
219 || clean.contains("This feature is not implemented")
220 {
221 return TranslatedError {
222 code: codes::UNSUPPORTED_SQL,
223 message: format!("Unsupported SQL syntax: {clean}"),
224 hint: None,
225 };
226 }
227
228 if clean.starts_with("Plan(\"") {
230 let detail = clean
231 .strip_prefix("Plan(\"")
232 .and_then(|s| s.strip_suffix("\")"))
233 .unwrap_or(clean);
234 return TranslatedError {
235 code: codes::PLANNING_FAILED,
236 message: format!("Query planning failed: {detail}"),
237 hint: None,
238 };
239 }
240
241 if clean.contains("Error during planning") {
243 return TranslatedError {
244 code: codes::PLANNING_FAILED,
245 message: format!("Query planning failed: {clean}"),
246 hint: None,
247 };
248 }
249
250 if clean.contains("Execution error") {
252 return TranslatedError {
253 code: codes::EXECUTION_FAILED,
254 message: format!("Query execution failed: {clean}"),
255 hint: None,
256 };
257 }
258
259 TranslatedError {
261 code: codes::INTERNAL,
262 message: format!("Internal query error: {clean}"),
263 hint: Some("If this persists, file a bug report".to_string()),
264 }
265}
266
267fn check_window_errors(clean: &str) -> Option<TranslatedError> {
269 let lower = clean.to_ascii_lowercase();
270
271 if lower.starts_with("window error:") {
273 return Some(TranslatedError {
274 code: codes::WINDOW_INVALID,
275 message: format!("Invalid window specification: {clean}"),
276 hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
277 });
278 }
279
280 if lower.contains("watermark") && (lower.contains("required") || lower.contains("missing")) {
281 return Some(TranslatedError {
282 code: codes::WATERMARK_REQUIRED,
283 message: format!("Watermark required: {clean}"),
284 hint: Some(
285 "Add WATERMARK FOR <column> AS <column> - INTERVAL '<n>' SECOND \
286 to the CREATE SOURCE statement"
287 .to_string(),
288 ),
289 });
290 }
291
292 if lower.contains("window") && (lower.contains("invalid") || lower.contains("not supported")) {
293 return Some(TranslatedError {
294 code: codes::WINDOW_INVALID,
295 message: format!("Invalid window specification: {clean}"),
296 hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
297 });
298 }
299
300 if lower.contains("window")
301 && lower.contains("size")
302 && (lower.contains("zero") || lower.contains("negative") || lower.contains("positive"))
303 {
304 return Some(TranslatedError {
305 code: codes::WINDOW_SIZE_INVALID,
306 message: format!("Invalid window size: {clean}"),
307 hint: Some("Window size must be a positive interval".to_string()),
308 });
309 }
310
311 if lower.contains("late")
313 && (lower.contains("data") || lower.contains("event"))
314 && (lower.contains("rejected") || lower.contains("dropped"))
315 {
316 return Some(TranslatedError {
317 code: codes::LATE_DATA_REJECTED,
318 message: format!("Late data rejected: {clean}"),
319 hint: Some(
320 "Increase the allowed lateness with ALLOWED LATENESS INTERVAL, \
321 or route late data to a side output"
322 .to_string(),
323 ),
324 });
325 }
326
327 None
328}
329
330fn check_join_errors(clean: &str) -> Option<TranslatedError> {
332 let lower = clean.to_ascii_lowercase();
333
334 if lower.starts_with("streaming sql error:") {
336 if lower.contains("using clause requires") {
337 return Some(TranslatedError {
338 code: codes::JOIN_KEY_MISSING,
339 message: format!("Join key error: {clean}"),
340 hint: Some(
341 "Ensure the USING clause references columns that exist \
342 in both sides of the join"
343 .to_string(),
344 ),
345 });
346 }
347 if lower.contains("cannot extract time bound") || lower.contains("tolerance") {
348 return Some(TranslatedError {
349 code: codes::JOIN_TIME_BOUND_MISSING,
350 message: format!("Join time bound required: {clean}"),
351 hint: Some(
352 "Stream-stream joins require a time bound in the ON clause, e.g.: \
353 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
354 .to_string(),
355 ),
356 });
357 }
358 }
359
360 if lower.contains("join") && lower.contains("key") && lower.contains("not found") {
361 return Some(TranslatedError {
362 code: codes::JOIN_KEY_MISSING,
363 message: format!("Join key error: {clean}"),
364 hint: Some(
365 "Ensure the ON clause references columns that exist \
366 in both sides of the join"
367 .to_string(),
368 ),
369 });
370 }
371
372 if lower.contains("join")
373 && (lower.contains("time bound") || lower.contains("interval"))
374 && lower.contains("required")
375 {
376 return Some(TranslatedError {
377 code: codes::JOIN_TIME_BOUND_MISSING,
378 message: format!("Join time bound required: {clean}"),
379 hint: Some(
380 "Stream-stream joins require a time bound in the ON clause, e.g.: \
381 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
382 .to_string(),
383 ),
384 });
385 }
386
387 if lower.contains("temporal") && lower.contains("primary key") {
388 return Some(TranslatedError {
389 code: codes::TEMPORAL_JOIN_NO_PK,
390 message: format!("Temporal join error: {clean}"),
391 hint: Some(
392 "The right-side table of a temporal join requires a PRIMARY KEY".to_string(),
393 ),
394 });
395 }
396
397 if (lower.contains("not supported for streaming")
399 || lower.contains("natural join not supported")
400 || lower.contains("cross join not supported")
401 || lower.contains("unsupported join"))
402 && lower.contains("join")
403 {
404 return Some(TranslatedError {
405 code: codes::JOIN_TYPE_UNSUPPORTED,
406 message: format!("Unsupported join type: {clean}"),
407 hint: Some(
408 "Streaming queries support INNER, LEFT, RIGHT, and FULL OUTER joins \
409 with time bounds. CROSS and NATURAL joins are not supported."
410 .to_string(),
411 ),
412 });
413 }
414
415 None
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421
422 #[test]
425 fn test_column_not_found_single_quotes() {
426 let t = translate_datafusion_error("No field named 'foo'");
427 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
428 assert!(t.message.contains("foo"));
429 assert!(t.message.contains("not found"));
430 assert!(!t.message.contains("DataFusion"));
431 }
432
433 #[test]
434 fn test_column_not_found_double_quotes() {
435 let t = translate_datafusion_error("No field named \"bar\"");
436 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
437 assert!(t.message.contains("bar"));
438 }
439
440 #[test]
441 fn test_column_not_found_with_prefix() {
442 let t = translate_datafusion_error("Schema error: No field named 'baz'");
443 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
444 assert!(t.message.contains("baz"));
445 }
446
447 #[test]
448 fn test_table_not_found() {
449 let t = translate_datafusion_error("table 'orders' not found");
450 assert_eq!(t.code, codes::TABLE_NOT_FOUND);
451 assert!(t.message.contains("orders"));
452 assert!(t.hint.is_some());
453 assert!(t.hint.unwrap().contains("SHOW TABLES"));
454 }
455
456 #[test]
457 fn test_type_mismatch() {
458 let t = translate_datafusion_error("column types must match for UNION");
459 assert_eq!(t.code, codes::TYPE_MISMATCH);
460 assert!(t.hint.is_some());
461 assert!(t.hint.unwrap().contains("DESCRIBE"));
462 }
463
464 #[test]
465 fn test_type_cannot_cast() {
466 let t = translate_datafusion_error("cannot be cast to Int64");
467 assert_eq!(t.code, codes::TYPE_MISMATCH);
468 }
469
470 #[test]
471 fn test_unsupported_sql() {
472 let t = translate_datafusion_error("This feature is not implemented: LATERAL JOIN");
473 assert_eq!(t.code, codes::UNSUPPORTED_SQL);
474 assert!(t.message.contains("LATERAL JOIN"));
475 }
476
477 #[test]
478 fn test_plan_error_with_column_extracts_column() {
479 let t = translate_datafusion_error("Plan(\"No field named 'x' in schema\")");
482 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
483 assert!(t.message.contains("'x'"));
484 }
485
486 #[test]
487 fn test_plan_error_generic() {
488 let t = translate_datafusion_error("Plan(\"aggregate function not found\")");
489 assert_eq!(t.code, codes::PLANNING_FAILED);
490 assert!(t.message.contains("aggregate function not found"));
491 }
492
493 #[test]
494 fn test_error_during_planning() {
495 let t = translate_datafusion_error("Error during planning: ambiguous reference 'id'");
496 assert_eq!(t.code, codes::PLANNING_FAILED);
497 }
498
499 #[test]
500 fn test_execution_error() {
501 let t = translate_datafusion_error("Execution error: division by zero");
502 assert_eq!(t.code, codes::EXECUTION_FAILED);
503 }
504
505 #[test]
506 fn test_unknown_fallback() {
507 let t = translate_datafusion_error("some totally unknown error");
508 assert_eq!(t.code, codes::INTERNAL);
509 assert!(t.message.contains("Internal query error"));
510 assert!(t.hint.is_some());
511 }
512
513 #[test]
514 fn test_prefix_stripping() {
515 let t = translate_datafusion_error("DataFusion error: Arrow error: No field named 'x'");
516 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
517 assert!(t.message.contains("'x'"));
518 assert!(!t.message.contains("DataFusion"));
519 assert!(!t.message.contains("Arrow error"));
520 }
521
522 #[test]
523 fn test_display_with_hint() {
524 let t = translate_datafusion_error("table 'foo' not found");
525 let display = t.to_string();
526 assert!(display.starts_with("[LDB-1101]"));
527 assert!(display.contains("(hint:"));
528 }
529
530 #[test]
531 fn test_display_without_hint() {
532 let t = translate_datafusion_error("Execution error: oops");
533 let display = t.to_string();
534 assert!(display.starts_with("[LDB-9001]"));
535 assert!(!display.contains("hint"));
536 }
537
538 #[test]
541 fn test_suggest_column_found() {
542 let result = suggest_column("user_ie", &["user_id", "email"]);
543 assert_eq!(result, Some("Did you mean 'user_id'?".to_string()));
544 }
545
546 #[test]
547 fn test_suggest_column_none() {
548 let result = suggest_column("xyz", &["user_id", "email"]);
549 assert_eq!(result, None);
550 }
551
552 #[test]
555 fn test_column_not_found_with_suggestion() {
556 let cols = &["user_id", "email", "price"];
557 let t = translate_datafusion_error_with_context("No field named 'user_ie'", Some(cols));
558 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
559 assert!(t.message.contains("user_ie"));
560 assert!(t.hint.is_some());
561 assert!(
562 t.hint.as_ref().unwrap().contains("user_id"),
563 "hint should suggest 'user_id': {:?}",
564 t.hint
565 );
566 }
567
568 #[test]
569 fn test_column_not_found_no_close_match() {
570 let cols = &["user_id", "email"];
571 let t = translate_datafusion_error_with_context("No field named 'zzzzz'", Some(cols));
572 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
573 assert!(t.hint.is_none());
574 }
575
576 #[test]
577 fn test_column_not_found_without_context() {
578 let t = translate_datafusion_error("No field named 'foo'");
579 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
580 assert!(t.hint.is_none()); }
582
583 #[test]
584 fn test_column_not_found_case_insensitive_hint() {
585 let cols = &["tradeId", "symbol", "lastPrice"];
586 let t = translate_datafusion_error_with_context("No field named 'tradeid'", Some(cols));
587 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
588 assert!(t.hint.is_some());
589 let hint = t.hint.unwrap();
590 assert!(
591 hint.contains("tradeId"),
592 "hint should mention actual name: {hint}"
593 );
594 assert!(hint.contains("case"), "hint should mention case: {hint}");
595 }
596
597 #[test]
598 fn test_column_not_found_ambiguous_case_hint() {
599 let cols = &["price", "Price", "PRICE"];
600 let t = translate_datafusion_error_with_context("No field named 'pRiCe'", Some(cols));
601 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
602 assert!(t.hint.is_some());
603 let hint = t.hint.unwrap();
604 assert!(
605 hint.contains("Multiple"),
606 "hint should mention ambiguity: {hint}"
607 );
608 }
609
610 #[test]
613 fn test_watermark_required_error() {
614 let t = translate_datafusion_error("Watermark required for EMIT ON WINDOW CLOSE");
615 assert_eq!(t.code, codes::WATERMARK_REQUIRED);
616 assert!(t.hint.is_some());
617 assert!(t.hint.unwrap().contains("WATERMARK FOR"));
618 }
619
620 #[test]
621 fn test_window_invalid_error() {
622 let t = translate_datafusion_error("Window type not supported for this operation");
623 assert_eq!(t.code, codes::WINDOW_INVALID);
624 assert!(t.hint.unwrap().contains("TUMBLE"));
625 }
626
627 #[test]
630 fn test_join_key_not_found_error() {
631 let t = translate_datafusion_error("Join key 'user_id' not found in right table");
632 assert_eq!(t.code, codes::JOIN_KEY_MISSING);
633 assert!(t.hint.unwrap().contains("ON clause"));
634 }
635
636 #[test]
637 fn test_temporal_join_pk_error() {
638 let t = translate_datafusion_error("Temporal join requires a primary key on right table");
639 assert_eq!(t.code, codes::TEMPORAL_JOIN_NO_PK);
640 assert!(t.hint.unwrap().contains("PRIMARY KEY"));
641 }
642
643 #[test]
646 fn test_late_data_rejected() {
647 let t = translate_datafusion_error("late data rejected by window policy");
648 assert_eq!(t.code, codes::LATE_DATA_REJECTED);
649 assert!(t.hint.unwrap().contains("lateness"));
650 }
651
652 #[test]
653 fn test_late_event_dropped() {
654 let t = translate_datafusion_error("late event dropped after window close");
655 assert_eq!(t.code, codes::LATE_DATA_REJECTED);
656 }
657
658 #[test]
661 fn test_window_error_prefix() {
662 let t = translate_datafusion_error("Window error: CUMULATE requires step <= size");
663 assert_eq!(t.code, codes::WINDOW_INVALID);
664 assert!(t.hint.unwrap().contains("CUMULATE"));
665 }
666
667 #[test]
670 fn test_join_type_unsupported_cross() {
671 let t = translate_datafusion_error("cross join not supported for streaming queries");
672 assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
673 assert!(t.hint.unwrap().contains("CROSS"));
674 }
675
676 #[test]
677 fn test_join_type_unsupported_natural() {
678 let t = translate_datafusion_error("natural join not supported in streaming context");
679 assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
680 }
681
682 #[test]
685 fn test_streaming_sql_error_using_clause() {
686 let t = translate_datafusion_error(
687 "Streaming SQL error: using clause requires matching columns",
688 );
689 assert_eq!(t.code, codes::JOIN_KEY_MISSING);
690 }
691
692 #[test]
693 fn test_streaming_sql_error_time_bound() {
694 let t = translate_datafusion_error(
695 "Streaming SQL error: cannot extract time bound from ON clause",
696 );
697 assert_eq!(t.code, codes::JOIN_TIME_BOUND_MISSING);
698 }
699}