1pub mod suggest;
18
19use suggest::closest_match;
20
21pub mod codes {
23 pub const UNSUPPORTED_SQL: &str = "LDB-1001";
25 pub const PLANNING_FAILED: &str = "LDB-1002";
27 pub const COLUMN_NOT_FOUND: &str = "LDB-1100";
29 pub const TABLE_NOT_FOUND: &str = "LDB-1101";
31 pub const TYPE_MISMATCH: &str = "LDB-1200";
33
34 pub const WATERMARK_REQUIRED: &str = "LDB-2001";
37 pub const WINDOW_INVALID: &str = "LDB-2002";
39 pub const WINDOW_SIZE_INVALID: &str = "LDB-2003";
41 pub const LATE_DATA_REJECTED: &str = "LDB-2004";
43
44 pub const JOIN_KEY_MISSING: &str = "LDB-3001";
47 pub const JOIN_TIME_BOUND_MISSING: &str = "LDB-3002";
49 pub const TEMPORAL_JOIN_NO_PK: &str = "LDB-3003";
51 pub const JOIN_TYPE_UNSUPPORTED: &str = "LDB-3004";
53
54 pub const INTERNAL: &str = "LDB-9000";
56 pub const EXECUTION_FAILED: &str = "LDB-9001";
58}
59
60#[derive(Debug, Clone)]
62pub struct TranslatedError {
63 pub code: &'static str,
65 pub message: String,
67 pub hint: Option<String>,
69}
70
71impl std::fmt::Display for TranslatedError {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 write!(f, "[{}] {}", self.code, self.message)?;
74 if let Some(hint) = &self.hint {
75 write!(f, " (hint: {hint})")?;
76 }
77 Ok(())
78 }
79}
80
81#[must_use]
86pub fn suggest_column(input: &str, available: &[&str]) -> Option<String> {
87 closest_match(input, available, 2).map(|m| format!("Did you mean '{m}'?"))
88}
89
90fn sanitize(msg: &str) -> &str {
96 const PREFIXES: &[&str] = &[
97 "DataFusion error: ",
98 "Arrow error: ",
99 "Schema error: ",
100 "External error: ",
101 ];
102 let mut s = msg;
103 for prefix in PREFIXES {
104 if let Some(rest) = s.strip_prefix(prefix) {
105 s = rest;
106 }
107 }
108 s
109}
110
111fn extract_missing_column(msg: &str) -> Option<&str> {
113 let needle = "No field named ";
114 let idx = msg.find(needle)?;
115 let after = &msg[idx + needle.len()..];
116 extract_quoted(after)
117}
118
119fn extract_missing_table(msg: &str) -> Option<&str> {
121 let lower = msg.to_ascii_lowercase();
123 let needle = "table '";
124 let idx = lower.find(needle)?;
125 let after = &msg[idx + needle.len()..];
126 after.find('\'').map(|end| &after[..end])
127}
128
129fn extract_quoted(s: &str) -> Option<&str> {
131 if let Some(rest) = s.strip_prefix('\'') {
132 rest.find('\'').map(|end| &rest[..end])
133 } else if let Some(rest) = s.strip_prefix('"') {
134 rest.find('"').map(|end| &rest[..end])
135 } else {
136 let end = s.find(|c: char| c.is_whitespace() || c == ',' || c == ')');
138 let word = match end {
139 Some(i) => &s[..i],
140 None => s,
141 };
142 if word.is_empty() {
143 None
144 } else {
145 Some(word)
146 }
147 }
148}
149
150#[must_use]
159pub fn translate_datafusion_error(msg: &str) -> TranslatedError {
160 translate_datafusion_error_with_context(msg, None)
161}
162
163#[must_use]
166pub fn translate_datafusion_error_with_context(
167 msg: &str,
168 available_columns: Option<&[&str]>,
169) -> TranslatedError {
170 let clean = sanitize(msg);
171
172 if let Some(col) = extract_missing_column(clean) {
174 let hint = available_columns.and_then(|cols| suggest_column(col, cols));
175 return TranslatedError {
176 code: codes::COLUMN_NOT_FOUND,
177 message: format!("Column '{col}' not found in query"),
178 hint,
179 };
180 }
181
182 if let Some(table) = extract_missing_table(clean) {
184 return TranslatedError {
185 code: codes::TABLE_NOT_FOUND,
186 message: format!("Table or source '{table}' not found"),
187 hint: Some("Use SHOW TABLES to see available sources and tables".to_string()),
188 };
189 }
190
191 if clean.contains("mismatch")
193 || clean.contains("must match")
194 || clean.contains("cannot be cast")
195 {
196 return TranslatedError {
197 code: codes::TYPE_MISMATCH,
198 message: format!("Type mismatch: {clean}"),
199 hint: Some("Check column types with DESCRIBE <table>".to_string()),
200 };
201 }
202
203 if let Some(translated) = check_window_errors(clean) {
205 return translated;
206 }
207
208 if let Some(translated) = check_join_errors(clean) {
210 return translated;
211 }
212
213 if clean.contains("Unsupported")
215 || clean.contains("NotImplemented")
216 || clean.contains("This feature is not implemented")
217 {
218 return TranslatedError {
219 code: codes::UNSUPPORTED_SQL,
220 message: format!("Unsupported SQL syntax: {clean}"),
221 hint: None,
222 };
223 }
224
225 if clean.starts_with("Plan(\"") {
227 let detail = clean
228 .strip_prefix("Plan(\"")
229 .and_then(|s| s.strip_suffix("\")"))
230 .unwrap_or(clean);
231 return TranslatedError {
232 code: codes::PLANNING_FAILED,
233 message: format!("Query planning failed: {detail}"),
234 hint: None,
235 };
236 }
237
238 if clean.contains("Error during planning") {
240 return TranslatedError {
241 code: codes::PLANNING_FAILED,
242 message: format!("Query planning failed: {clean}"),
243 hint: None,
244 };
245 }
246
247 if clean.contains("Execution error") {
249 return TranslatedError {
250 code: codes::EXECUTION_FAILED,
251 message: format!("Query execution failed: {clean}"),
252 hint: None,
253 };
254 }
255
256 TranslatedError {
258 code: codes::INTERNAL,
259 message: format!("Internal query error: {clean}"),
260 hint: Some("If this persists, file a bug report".to_string()),
261 }
262}
263
264fn check_window_errors(clean: &str) -> Option<TranslatedError> {
266 let lower = clean.to_ascii_lowercase();
267
268 if lower.starts_with("window error:") {
270 return Some(TranslatedError {
271 code: codes::WINDOW_INVALID,
272 message: format!("Invalid window specification: {clean}"),
273 hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
274 });
275 }
276
277 if lower.contains("watermark") && (lower.contains("required") || lower.contains("missing")) {
278 return Some(TranslatedError {
279 code: codes::WATERMARK_REQUIRED,
280 message: format!("Watermark required: {clean}"),
281 hint: Some(
282 "Add WATERMARK FOR <column> AS <column> - INTERVAL '<n>' SECOND \
283 to the CREATE SOURCE statement"
284 .to_string(),
285 ),
286 });
287 }
288
289 if lower.contains("window") && (lower.contains("invalid") || lower.contains("not supported")) {
290 return Some(TranslatedError {
291 code: codes::WINDOW_INVALID,
292 message: format!("Invalid window specification: {clean}"),
293 hint: Some("Supported window types: TUMBLE, HOP, SESSION, CUMULATE".to_string()),
294 });
295 }
296
297 if lower.contains("window")
298 && lower.contains("size")
299 && (lower.contains("zero") || lower.contains("negative") || lower.contains("positive"))
300 {
301 return Some(TranslatedError {
302 code: codes::WINDOW_SIZE_INVALID,
303 message: format!("Invalid window size: {clean}"),
304 hint: Some("Window size must be a positive interval".to_string()),
305 });
306 }
307
308 if lower.contains("late")
310 && (lower.contains("data") || lower.contains("event"))
311 && (lower.contains("rejected") || lower.contains("dropped"))
312 {
313 return Some(TranslatedError {
314 code: codes::LATE_DATA_REJECTED,
315 message: format!("Late data rejected: {clean}"),
316 hint: Some(
317 "Increase the allowed lateness with ALLOWED LATENESS INTERVAL, \
318 or route late data to a side output"
319 .to_string(),
320 ),
321 });
322 }
323
324 None
325}
326
327fn check_join_errors(clean: &str) -> Option<TranslatedError> {
329 let lower = clean.to_ascii_lowercase();
330
331 if lower.starts_with("streaming sql error:") {
333 if lower.contains("using clause requires") {
334 return Some(TranslatedError {
335 code: codes::JOIN_KEY_MISSING,
336 message: format!("Join key error: {clean}"),
337 hint: Some(
338 "Ensure the USING clause references columns that exist \
339 in both sides of the join"
340 .to_string(),
341 ),
342 });
343 }
344 if lower.contains("cannot extract time bound") || lower.contains("tolerance") {
345 return Some(TranslatedError {
346 code: codes::JOIN_TIME_BOUND_MISSING,
347 message: format!("Join time bound required: {clean}"),
348 hint: Some(
349 "Stream-stream joins require a time bound in the ON clause, e.g.: \
350 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
351 .to_string(),
352 ),
353 });
354 }
355 }
356
357 if lower.contains("join") && lower.contains("key") && lower.contains("not found") {
358 return Some(TranslatedError {
359 code: codes::JOIN_KEY_MISSING,
360 message: format!("Join key error: {clean}"),
361 hint: Some(
362 "Ensure the ON clause references columns that exist \
363 in both sides of the join"
364 .to_string(),
365 ),
366 });
367 }
368
369 if lower.contains("join")
370 && (lower.contains("time bound") || lower.contains("interval"))
371 && lower.contains("required")
372 {
373 return Some(TranslatedError {
374 code: codes::JOIN_TIME_BOUND_MISSING,
375 message: format!("Join time bound required: {clean}"),
376 hint: Some(
377 "Stream-stream joins require a time bound in the ON clause, e.g.: \
378 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR"
379 .to_string(),
380 ),
381 });
382 }
383
384 if lower.contains("temporal") && lower.contains("primary key") {
385 return Some(TranslatedError {
386 code: codes::TEMPORAL_JOIN_NO_PK,
387 message: format!("Temporal join error: {clean}"),
388 hint: Some(
389 "The right-side table of a temporal join requires a PRIMARY KEY".to_string(),
390 ),
391 });
392 }
393
394 if (lower.contains("not supported for streaming")
396 || lower.contains("natural join not supported")
397 || lower.contains("cross join not supported")
398 || lower.contains("unsupported join"))
399 && lower.contains("join")
400 {
401 return Some(TranslatedError {
402 code: codes::JOIN_TYPE_UNSUPPORTED,
403 message: format!("Unsupported join type: {clean}"),
404 hint: Some(
405 "Streaming queries support INNER, LEFT, RIGHT, and FULL OUTER joins \
406 with time bounds. CROSS and NATURAL joins are not supported."
407 .to_string(),
408 ),
409 });
410 }
411
412 None
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418
419 #[test]
422 fn test_column_not_found_single_quotes() {
423 let t = translate_datafusion_error("No field named 'foo'");
424 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
425 assert!(t.message.contains("foo"));
426 assert!(t.message.contains("not found"));
427 assert!(!t.message.contains("DataFusion"));
428 }
429
430 #[test]
431 fn test_column_not_found_double_quotes() {
432 let t = translate_datafusion_error("No field named \"bar\"");
433 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
434 assert!(t.message.contains("bar"));
435 }
436
437 #[test]
438 fn test_column_not_found_with_prefix() {
439 let t = translate_datafusion_error("Schema error: No field named 'baz'");
440 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
441 assert!(t.message.contains("baz"));
442 }
443
444 #[test]
445 fn test_table_not_found() {
446 let t = translate_datafusion_error("table 'orders' not found");
447 assert_eq!(t.code, codes::TABLE_NOT_FOUND);
448 assert!(t.message.contains("orders"));
449 assert!(t.hint.is_some());
450 assert!(t.hint.unwrap().contains("SHOW TABLES"));
451 }
452
453 #[test]
454 fn test_type_mismatch() {
455 let t = translate_datafusion_error("column types must match for UNION");
456 assert_eq!(t.code, codes::TYPE_MISMATCH);
457 assert!(t.hint.is_some());
458 assert!(t.hint.unwrap().contains("DESCRIBE"));
459 }
460
461 #[test]
462 fn test_type_cannot_cast() {
463 let t = translate_datafusion_error("cannot be cast to Int64");
464 assert_eq!(t.code, codes::TYPE_MISMATCH);
465 }
466
467 #[test]
468 fn test_unsupported_sql() {
469 let t = translate_datafusion_error("This feature is not implemented: LATERAL JOIN");
470 assert_eq!(t.code, codes::UNSUPPORTED_SQL);
471 assert!(t.message.contains("LATERAL JOIN"));
472 }
473
474 #[test]
475 fn test_plan_error_with_column_extracts_column() {
476 let t = translate_datafusion_error("Plan(\"No field named 'x' in schema\")");
479 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
480 assert!(t.message.contains("'x'"));
481 }
482
483 #[test]
484 fn test_plan_error_generic() {
485 let t = translate_datafusion_error("Plan(\"aggregate function not found\")");
486 assert_eq!(t.code, codes::PLANNING_FAILED);
487 assert!(t.message.contains("aggregate function not found"));
488 }
489
490 #[test]
491 fn test_error_during_planning() {
492 let t = translate_datafusion_error("Error during planning: ambiguous reference 'id'");
493 assert_eq!(t.code, codes::PLANNING_FAILED);
494 }
495
496 #[test]
497 fn test_execution_error() {
498 let t = translate_datafusion_error("Execution error: division by zero");
499 assert_eq!(t.code, codes::EXECUTION_FAILED);
500 }
501
502 #[test]
503 fn test_unknown_fallback() {
504 let t = translate_datafusion_error("some totally unknown error");
505 assert_eq!(t.code, codes::INTERNAL);
506 assert!(t.message.contains("Internal query error"));
507 assert!(t.hint.is_some());
508 }
509
510 #[test]
511 fn test_prefix_stripping() {
512 let t = translate_datafusion_error("DataFusion error: Arrow error: No field named 'x'");
513 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
514 assert!(t.message.contains("'x'"));
515 assert!(!t.message.contains("DataFusion"));
516 assert!(!t.message.contains("Arrow error"));
517 }
518
519 #[test]
520 fn test_display_with_hint() {
521 let t = translate_datafusion_error("table 'foo' not found");
522 let display = t.to_string();
523 assert!(display.starts_with("[LDB-1101]"));
524 assert!(display.contains("(hint:"));
525 }
526
527 #[test]
528 fn test_display_without_hint() {
529 let t = translate_datafusion_error("Execution error: oops");
530 let display = t.to_string();
531 assert!(display.starts_with("[LDB-9001]"));
532 assert!(!display.contains("hint"));
533 }
534
535 #[test]
538 fn test_suggest_column_found() {
539 let result = suggest_column("user_ie", &["user_id", "email"]);
540 assert_eq!(result, Some("Did you mean 'user_id'?".to_string()));
541 }
542
543 #[test]
544 fn test_suggest_column_none() {
545 let result = suggest_column("xyz", &["user_id", "email"]);
546 assert_eq!(result, None);
547 }
548
549 #[test]
552 fn test_column_not_found_with_suggestion() {
553 let cols = &["user_id", "email", "price"];
554 let t = translate_datafusion_error_with_context("No field named 'user_ie'", Some(cols));
555 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
556 assert!(t.message.contains("user_ie"));
557 assert!(t.hint.is_some());
558 assert!(
559 t.hint.as_ref().unwrap().contains("user_id"),
560 "hint should suggest 'user_id': {:?}",
561 t.hint
562 );
563 }
564
565 #[test]
566 fn test_column_not_found_no_close_match() {
567 let cols = &["user_id", "email"];
568 let t = translate_datafusion_error_with_context("No field named 'zzzzz'", Some(cols));
569 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
570 assert!(t.hint.is_none());
571 }
572
573 #[test]
574 fn test_column_not_found_without_context() {
575 let t = translate_datafusion_error("No field named 'foo'");
576 assert_eq!(t.code, codes::COLUMN_NOT_FOUND);
577 assert!(t.hint.is_none()); }
579
580 #[test]
583 fn test_watermark_required_error() {
584 let t = translate_datafusion_error("Watermark required for EMIT ON WINDOW CLOSE");
585 assert_eq!(t.code, codes::WATERMARK_REQUIRED);
586 assert!(t.hint.is_some());
587 assert!(t.hint.unwrap().contains("WATERMARK FOR"));
588 }
589
590 #[test]
591 fn test_window_invalid_error() {
592 let t = translate_datafusion_error("Window type not supported for this operation");
593 assert_eq!(t.code, codes::WINDOW_INVALID);
594 assert!(t.hint.unwrap().contains("TUMBLE"));
595 }
596
597 #[test]
600 fn test_join_key_not_found_error() {
601 let t = translate_datafusion_error("Join key 'user_id' not found in right table");
602 assert_eq!(t.code, codes::JOIN_KEY_MISSING);
603 assert!(t.hint.unwrap().contains("ON clause"));
604 }
605
606 #[test]
607 fn test_temporal_join_pk_error() {
608 let t = translate_datafusion_error("Temporal join requires a primary key on right table");
609 assert_eq!(t.code, codes::TEMPORAL_JOIN_NO_PK);
610 assert!(t.hint.unwrap().contains("PRIMARY KEY"));
611 }
612
613 #[test]
616 fn test_late_data_rejected() {
617 let t = translate_datafusion_error("late data rejected by window policy");
618 assert_eq!(t.code, codes::LATE_DATA_REJECTED);
619 assert!(t.hint.unwrap().contains("lateness"));
620 }
621
622 #[test]
623 fn test_late_event_dropped() {
624 let t = translate_datafusion_error("late event dropped after window close");
625 assert_eq!(t.code, codes::LATE_DATA_REJECTED);
626 }
627
628 #[test]
631 fn test_window_error_prefix() {
632 let t = translate_datafusion_error("Window error: CUMULATE requires step <= size");
633 assert_eq!(t.code, codes::WINDOW_INVALID);
634 assert!(t.hint.unwrap().contains("CUMULATE"));
635 }
636
637 #[test]
640 fn test_join_type_unsupported_cross() {
641 let t = translate_datafusion_error("cross join not supported for streaming queries");
642 assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
643 assert!(t.hint.unwrap().contains("CROSS"));
644 }
645
646 #[test]
647 fn test_join_type_unsupported_natural() {
648 let t = translate_datafusion_error("natural join not supported in streaming context");
649 assert_eq!(t.code, codes::JOIN_TYPE_UNSUPPORTED);
650 }
651
652 #[test]
655 fn test_streaming_sql_error_using_clause() {
656 let t = translate_datafusion_error(
657 "Streaming SQL error: using clause requires matching columns",
658 );
659 assert_eq!(t.code, codes::JOIN_KEY_MISSING);
660 }
661
662 #[test]
663 fn test_streaming_sql_error_time_bound() {
664 let t = translate_datafusion_error(
665 "Streaming SQL error: cannot extract time bound from ON clause",
666 );
667 assert_eq!(t.code, codes::JOIN_TIME_BOUND_MISSING);
668 }
669}