1pub mod suggest;
18
19use suggest::{closest_match, resolve_column_name};
20
21pub mod codes {
23 pub const UNSUPPORTED_SQL: &str = "LDB-1001";
25 pub const PLANNING_FAILED: &str = "LDB-1002";
27 pub const COLUMN_NOT_FOUND: &str = "LDB-1100";
29 pub const TABLE_NOT_FOUND: &str = "LDB-1101";
31 pub const TYPE_MISMATCH: &str = "LDB-1200";
33
34 pub const WATERMARK_REQUIRED: &str = "LDB-2001";
37 pub const WINDOW_INVALID: &str = "LDB-2002";
39 pub const WINDOW_SIZE_INVALID: &str = "LDB-2003";
41 pub const LATE_DATA_REJECTED: &str = "LDB-2004";
43
44 pub const JOIN_KEY_MISSING: &str = "LDB-3001";
47 pub const JOIN_TIME_BOUND_MISSING: &str = "LDB-3002";
49 pub const TEMPORAL_JOIN_NO_PK: &str = "LDB-3003";
51 pub const JOIN_TYPE_UNSUPPORTED: &str = "LDB-3004";
53
54 pub const INTERNAL: &str = "LDB-9000";
56 pub const EXECUTION_FAILED: &str = "LDB-9001";
58}
59
60#[derive(Debug, Clone)]
62pub struct TranslatedError {
63 pub code: &'static str,
65 pub message: String,
67 pub hint: Option<String>,
69}
70
71impl std::fmt::Display for TranslatedError {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 write!(f, "[{}] {}", self.code, self.message)?;
74 if let Some(hint) = &self.hint {
75 write!(f, " (hint: {hint})")?;
76 }
77 Ok(())
78 }
79}
80
81#[must_use]
86pub fn suggest_column(input: &str, available: &[&str]) -> Option<String> {
87 closest_match(input, available, 2).map(|m| format!("Did you mean '{m}'?"))
88}
89
90fn sanitize(msg: &str) -> &str {
96 const PREFIXES: &[&str] = &[
97 "DataFusion error: ",
98 "Arrow error: ",
99 "Schema error: ",
100 "External error: ",
101 ];
102 let mut s = msg;
103 for prefix in PREFIXES {
104 if let Some(rest) = s.strip_prefix(prefix) {
105 s = rest;
106 }
107 }
108 s
109}
110
111fn extract_missing_column(msg: &str) -> Option<&str> {
113 let needle = "No field named ";
114 let idx = msg.find(needle)?;
115 let after = &msg[idx + needle.len()..];
116 extract_quoted(after)
117}
118
119fn extract_missing_table(msg: &str) -> Option<&str> {
121 let lower = msg.to_ascii_lowercase();
123 let needle = "table '";
124 let idx = lower.find(needle)?;
125 let after = &msg[idx + needle.len()..];
126 after.find('\'').map(|end| &after[..end])
127}
128
129fn extract_quoted(s: &str) -> Option<&str> {
131 if let Some(rest) = s.strip_prefix('\'') {
132 rest.find('\'').map(|end| &rest[..end])
133 } else if let Some(rest) = s.strip_prefix('"') {
134 rest.find('"').map(|end| &rest[..end])
135 } else {
136 let end = s.find(|c: char| c.is_whitespace() || c == ',' || c == ')');
138 let word = match end {
139 Some(i) => &s[..i],
140 None => s,
141 };
142 if word.is_empty() {
143 None
144 } else {
145 Some(word)
146 }
147 }
148}
149
150#[must_use]
159pub fn translate_datafusion_error(msg: &str) -> TranslatedError {
160 translate_datafusion_error_with_context(msg, None)
161}
162
163#[must_use]
166pub fn translate_datafusion_error_with_context(
167 msg: &str,
168 available_columns: Option<&[&str]>,
169) -> TranslatedError {
170 let clean = sanitize(msg);
171
172 if let Some(col) = extract_missing_column(clean) {
174 let hint = available_columns.and_then(|cols| match resolve_column_name(col, cols) {
175 Ok(actual) if actual != col => Some(format!(
176 "Column is named '{actual}' (case differs). \
177 Use \"{actual}\" or match the exact casing."
178 )),
179 Err(suggest::ColumnResolveError::Ambiguous { matches, .. }) => Some(format!(
180 "Multiple columns match case-insensitively: {}. \
181 Use double quotes for exact match.",
182 matches.join(", ")
183 )),
184 _ => suggest_column(col, cols),
185 });
186 return TranslatedError {
187 code: codes::COLUMN_NOT_FOUND,
188 message: format!("Column '{col}' not found in query"),
189 hint,
190 };
191 }
192
193 if let Some(table) = extract_missing_table(clean) {
195 return TranslatedError {
196 code: codes::TABLE_NOT_FOUND,
197 message: format!("Table or source '{table}' not found"),
198 hint: Some("Use SHOW TABLES to see available sources and tables".to_string()),
199 };
200 }
201
202 if clean.contains("mismatch")
204 || clean.contains("must match")
205 || clean.contains("cannot be cast")
206 {
207 return TranslatedError {
208 code: codes::TYPE_MISMATCH,
209 message: format!("Type mismatch: {clean}"),
210 hint: Some("Check column types with DESCRIBE <table>".to_string()),
211 };
212 }
213
214 if let Some(translated) = check_window_errors(clean) {
216 return translated;
217 }
218
219 if let Some(translated) = check_join_errors(clean) {
221 return translated;
222 }
223
224 if clean.contains("Unsupported")
226 || clean.contains("NotImplemented")
227 || clean.contains("This feature is not implemented")
228 {
229 return TranslatedError {
230 code: codes::UNSUPPORTED_SQL,
231 message: format!("Unsupported SQL syntax: {clean}"),
232 hint: None,
233 };
234 }
235
236 if clean.starts_with("Plan(\"") {
238 let detail = clean
239 .strip_prefix("Plan(\"")
240 .and_then(|s| s.strip_suffix("\")"))
241 .unwrap_or(clean);
242 return TranslatedError {
243 code: codes::PLANNING_FAILED,
244 message: format!("Query planning failed: {detail}"),
245 hint: None,
246 };
247 }
248
249 if clean.contains("Error during planning") {
251 return TranslatedError {
252 code: codes::PLANNING_FAILED,
253 message: format!("Query planning failed: {clean}"),
254 hint: None,
255 };
256 }
257
258 if clean.contains("Execution error") {
260 return TranslatedError {
261 code: codes::EXECUTION_FAILED,
262 message: format!("Query execution failed: {clean}"),
263 hint: None,
264 };
265 }
266
267 TranslatedError {
269 code: codes::INTERNAL,
270 message: format!("Internal query error: {clean}"),
271 hint: Some("If this persists, file a bug report".to_string()),
272 }
273}
274
275fn check_window_errors(clean: &str) -> Option<TranslatedError> {
277 let lower = clean.to_ascii_lowercase();
278
279 if lower.starts_with("window error:") {
281 return Some(TranslatedError {
282 code: codes::WINDOW_INVALID,
283 message: format!("Invalid window specification: {clean}"),
284 hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
285 });
286 }
287
288 if lower.contains("watermark") && (lower.contains("required") || lower.contains("missing")) {
289 return Some(TranslatedError {
290 code: codes::WATERMARK_REQUIRED,
291 message: format!("Watermark required: {clean}"),
292 hint: Some(
293 "Add WATERMARK FOR <column> AS <column> - INTERVAL '<n>' SECOND \
294 to the CREATE SOURCE statement"
295 .to_string(),
296 ),
297 });
298 }
299
300 if lower.contains("window") && (lower.contains("invalid") || lower.contains("not supported")) {
301 return Some(TranslatedError {
302 code: codes::WINDOW_INVALID,
303 message: format!("Invalid window specification: {clean}"),
304 hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
305 });
306 }
307
308 if lower.contains("window")
309 && lower.contains("size")
310 && (lower.contains("zero") || lower.contains("negative") || lower.contains("positive"))
311 {
312 return Some(TranslatedError {
313 code: codes::WINDOW_SIZE_INVALID,
314 message: format!("Invalid window size: {clean}"),
315 hint: Some("Window size must be a positive interval".to_string()),
316 });
317 }
318
319 if lower.contains("late")
321 && (lower.contains("data") || lower.contains("event"))
322 && (lower.contains("rejected") || lower.contains("dropped"))
323 {
324 return Some(TranslatedError {
325 code: codes::LATE_DATA_REJECTED,
326 message: format!("Late data rejected: {clean}"),
327 hint: Some(
328 "Increase the allowed lateness with ALLOWED LATENESS INTERVAL, \
329 or route late data to a side output"
330 .to_string(),
331 ),
332 });
333 }
334
335 None
336}
337
338fn check_join_errors(clean: &str) -> Option<TranslatedError> {
340 let lower = clean.to_ascii_lowercase();
341
342 if lower.starts_with("streaming sql error:") {
344 if lower.contains("using clause requires") {
345 return Some(TranslatedError {
346 code: codes::JOIN_KEY_MISSING,
347 message: format!("Join key error: {clean}"),
348 hint: Some(
349 "Ensure the USING clause references columns that exist \
350 in both sides of the join"
351 .to_string(),
352 ),
353 });
354 }
355 if lower.contains("cannot extract time bound") || lower.contains("tolerance") {
356 return Some(TranslatedError {
357 code: codes::JOIN_TIME_BOUND_MISSING,
358 message: format!("Join time bound required: {clean}"),
359 hint: Some(
360 "Stream-stream joins require a time bound in the ON clause, e.g.: \
361 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
362 .to_string(),
363 ),
364 });
365 }
366 }
367
368 if lower.contains("join") && lower.contains("key") && lower.contains("not found") {
369 return Some(TranslatedError {
370 code: codes::JOIN_KEY_MISSING,
371 message: format!("Join key error: {clean}"),
372 hint: Some(
373 "Ensure the ON clause references columns that exist \
374 in both sides of the join"
375 .to_string(),
376 ),
377 });
378 }
379
380 if lower.contains("join")
381 && (lower.contains("time bound") || lower.contains("interval"))
382 && lower.contains("required")
383 {
384 return Some(TranslatedError {
385 code: codes::JOIN_TIME_BOUND_MISSING,
386 message: format!("Join time bound required: {clean}"),
387 hint: Some(
388 "Stream-stream joins require a time bound in the ON clause, e.g.: \
389 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
390 .to_string(),
391 ),
392 });
393 }
394
395 if lower.contains("temporal") && lower.contains("primary key") {
396 return Some(TranslatedError {
397 code: codes::TEMPORAL_JOIN_NO_PK,
398 message: format!("Temporal join error: {clean}"),
399 hint: Some(
400 "The right-side table of a temporal join requires a PRIMARY KEY".to_string(),
401 ),
402 });
403 }
404
405 if (lower.contains("not supported for streaming")
407 || lower.contains("natural join not supported")
408 || lower.contains("cross join not supported")
409 || lower.contains("unsupported join"))
410 && lower.contains("join")
411 {
412 return Some(TranslatedError {
413 code: codes::JOIN_TYPE_UNSUPPORTED,
414 message: format!("Unsupported join type: {clean}"),
415 hint: Some(
416 "Streaming queries support INNER, LEFT, RIGHT, and FULL OUTER joins \
417 with time bounds. CROSS and NATURAL joins are not supported."
418 .to_string(),
419 ),
420 });
421 }
422
423 None
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429
430 #[test]
433 fn test_column_not_found_single_quotes() {
434 let t = translate_datafusion_error("No field named 'foo'");
435 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
436 assert!(t.message.contains("foo"));
437 assert!(t.message.contains("not found"));
438 assert!(!t.message.contains("DataFusion"));
439 }
440
441 #[test]
442 fn test_column_not_found_double_quotes() {
443 let t = translate_datafusion_error("No field named \"bar\"");
444 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
445 assert!(t.message.contains("bar"));
446 }
447
448 #[test]
449 fn test_column_not_found_with_prefix() {
450 let t = translate_datafusion_error("Schema error: No field named 'baz'");
451 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
452 assert!(t.message.contains("baz"));
453 }
454
455 #[test]
456 fn test_table_not_found() {
457 let t = translate_datafusion_error("table 'orders' not found");
458 assert_eq!(t.code, codes::TABLE_NOT_FOUND);
459 assert!(t.message.contains("orders"));
460 assert!(t.hint.is_some());
461 assert!(t.hint.unwrap().contains("SHOW TABLES"));
462 }
463
464 #[test]
465 fn test_type_mismatch() {
466 let t = translate_datafusion_error("column types must match for UNION");
467 assert_eq!(t.code, codes::TYPE_MISMATCH);
468 assert!(t.hint.is_some());
469 assert!(t.hint.unwrap().contains("DESCRIBE"));
470 }
471
472 #[test]
473 fn test_type_cannot_cast() {
474 let t = translate_datafusion_error("cannot be cast to Int64");
475 assert_eq!(t.code, codes::TYPE_MISMATCH);
476 }
477
478 #[test]
479 fn test_unsupported_sql() {
480 let t = translate_datafusion_error("This feature is not implemented: LATERAL JOIN");
481 assert_eq!(t.code, codes::UNSUPPORTED_SQL);
482 assert!(t.message.contains("LATERAL JOIN"));
483 }
484
485 #[test]
486 fn test_plan_error_with_column_extracts_column() {
487 let t = translate_datafusion_error("Plan(\"No field named 'x' in schema\")");
490 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
491 assert!(t.message.contains("'x'"));
492 }
493
494 #[test]
495 fn test_plan_error_generic() {
496 let t = translate_datafusion_error("Plan(\"aggregate function not found\")");
497 assert_eq!(t.code, codes::PLANNING_FAILED);
498 assert!(t.message.contains("aggregate function not found"));
499 }
500
501 #[test]
502 fn test_error_during_planning() {
503 let t = translate_datafusion_error("Error during planning: ambiguous reference 'id'");
504 assert_eq!(t.code, codes::PLANNING_FAILED);
505 }
506
507 #[test]
508 fn test_execution_error() {
509 let t = translate_datafusion_error("Execution error: division by zero");
510 assert_eq!(t.code, codes::EXECUTION_FAILED);
511 }
512
513 #[test]
514 fn test_unknown_fallback() {
515 let t = translate_datafusion_error("some totally unknown error");
516 assert_eq!(t.code, codes::INTERNAL);
517 assert!(t.message.contains("Internal query error"));
518 assert!(t.hint.is_some());
519 }
520
521 #[test]
522 fn test_prefix_stripping() {
523 let t = translate_datafusion_error("DataFusion error: Arrow error: No field named 'x'");
524 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
525 assert!(t.message.contains("'x'"));
526 assert!(!t.message.contains("DataFusion"));
527 assert!(!t.message.contains("Arrow error"));
528 }
529
530 #[test]
531 fn test_display_with_hint() {
532 let t = translate_datafusion_error("table 'foo' not found");
533 let display = t.to_string();
534 assert!(display.starts_with("[LDB-1101]"));
535 assert!(display.contains("(hint:"));
536 }
537
538 #[test]
539 fn test_display_without_hint() {
540 let t = translate_datafusion_error("Execution error: oops");
541 let display = t.to_string();
542 assert!(display.starts_with("[LDB-9001]"));
543 assert!(!display.contains("hint"));
544 }
545
546 #[test]
549 fn test_suggest_column_found() {
550 let result = suggest_column("user_ie", &["user_id", "email"]);
551 assert_eq!(result, Some("Did you mean 'user_id'?".to_string()));
552 }
553
554 #[test]
555 fn test_suggest_column_none() {
556 let result = suggest_column("xyz", &["user_id", "email"]);
557 assert_eq!(result, None);
558 }
559
560 #[test]
563 fn test_column_not_found_with_suggestion() {
564 let cols = &["user_id", "email", "price"];
565 let t = translate_datafusion_error_with_context("No field named 'user_ie'", Some(cols));
566 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
567 assert!(t.message.contains("user_ie"));
568 assert!(t.hint.is_some());
569 assert!(
570 t.hint.as_ref().unwrap().contains("user_id"),
571 "hint should suggest 'user_id': {:?}",
572 t.hint
573 );
574 }
575
576 #[test]
577 fn test_column_not_found_no_close_match() {
578 let cols = &["user_id", "email"];
579 let t = translate_datafusion_error_with_context("No field named 'zzzzz'", Some(cols));
580 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
581 assert!(t.hint.is_none());
582 }
583
584 #[test]
585 fn test_column_not_found_without_context() {
586 let t = translate_datafusion_error("No field named 'foo'");
587 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
588 assert!(t.hint.is_none()); }
590
591 #[test]
592 fn test_column_not_found_case_insensitive_hint() {
593 let cols = &["tradeId", "symbol", "lastPrice"];
594 let t = translate_datafusion_error_with_context("No field named 'tradeid'", Some(cols));
595 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
596 assert!(t.hint.is_some());
597 let hint = t.hint.unwrap();
598 assert!(
599 hint.contains("tradeId"),
600 "hint should mention actual name: {hint}"
601 );
602 assert!(hint.contains("case"), "hint should mention case: {hint}");
603 }
604
605 #[test]
606 fn test_column_not_found_ambiguous_case_hint() {
607 let cols = &["price", "Price", "PRICE"];
608 let t = translate_datafusion_error_with_context("No field named 'pRiCe'", Some(cols));
609 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
610 assert!(t.hint.is_some());
611 let hint = t.hint.unwrap();
612 assert!(
613 hint.contains("Multiple"),
614 "hint should mention ambiguity: {hint}"
615 );
616 }
617
618 #[test]
621 fn test_watermark_required_error() {
622 let t = translate_datafusion_error("Watermark required for EMIT ON WINDOW CLOSE");
623 assert_eq!(t.code, codes::WATERMARK_REQUIRED);
624 assert!(t.hint.is_some());
625 assert!(t.hint.unwrap().contains("WATERMARK FOR"));
626 }
627
628 #[test]
629 fn test_window_invalid_error() {
630 let t = translate_datafusion_error("Window type not supported for this operation");
631 assert_eq!(t.code, codes::WINDOW_INVALID);
632 assert!(t.hint.unwrap().contains("TUMBLE"));
633 }
634
635 #[test]
638 fn test_join_key_not_found_error() {
639 let t = translate_datafusion_error("Join key 'user_id' not found in right table");
640 assert_eq!(t.code, codes::JOIN_KEY_MISSING);
641 assert!(t.hint.unwrap().contains("ON clause"));
642 }
643
644 #[test]
645 fn test_temporal_join_pk_error() {
646 let t = translate_datafusion_error("Temporal join requires a primary key on right table");
647 assert_eq!(t.code, codes::TEMPORAL_JOIN_NO_PK);
648 assert!(t.hint.unwrap().contains("PRIMARY KEY"));
649 }
650
651 #[test]
654 fn test_late_data_rejected() {
655 let t = translate_datafusion_error("late data rejected by window policy");
656 assert_eq!(t.code, codes::LATE_DATA_REJECTED);
657 assert!(t.hint.unwrap().contains("lateness"));
658 }
659
660 #[test]
661 fn test_late_event_dropped() {
662 let t = translate_datafusion_error("late event dropped after window close");
663 assert_eq!(t.code, codes::LATE_DATA_REJECTED);
664 }
665
666 #[test]
669 fn test_window_error_prefix() {
670 let t = translate_datafusion_error("Window error: CUMULATE requires step <= size");
671 assert_eq!(t.code, codes::WINDOW_INVALID);
672 assert!(t.hint.unwrap().contains("CUMULATE"));
673 }
674
675 #[test]
678 fn test_join_type_unsupported_cross() {
679 let t = translate_datafusion_error("cross join not supported for streaming queries");
680 assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
681 assert!(t.hint.unwrap().contains("CROSS"));
682 }
683
684 #[test]
685 fn test_join_type_unsupported_natural() {
686 let t = translate_datafusion_error("natural join not supported in streaming context");
687 assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
688 }
689
690 #[test]
693 fn test_streaming_sql_error_using_clause() {
694 let t = translate_datafusion_error(
695 "Streaming SQL error: using clause requires matching columns",
696 );
697 assert_eq!(t.code, codes::JOIN_KEY_MISSING);
698 }
699
700 #[test]
701 fn test_streaming_sql_error_time_bound() {
702 let t = translate_datafusion_error(
703 "Streaming SQL error: cannot extract time bound from ON clause",
704 );
705 assert_eq!(t.code, codes::JOIN_TIME_BOUND_MISSING);
706 }
707}