1use thiserror::Error;
2
3#[derive(Debug, Clone)]
5pub struct RetryConfig {
6 pub max_attempts: usize,
7 pub enable_delimiter_detection: bool,
8 pub enable_encoding_detection: bool,
9 pub enable_flexible_parsing: bool,
10}
11
12impl Default for RetryConfig {
13 fn default() -> Self {
14 Self {
15 max_attempts: 3,
16 enable_delimiter_detection: true,
17 enable_encoding_detection: true,
18 enable_flexible_parsing: true,
19 }
20 }
21}
22
23#[derive(Debug, Clone)]
25pub struct RecoveryAttempt {
26 pub attempt_number: usize,
27 pub strategy: RecoveryStrategy,
28 pub success: bool,
29 pub error_message: Option<String>,
30}
31
32#[derive(Debug, Clone)]
34pub enum RecoveryStrategy {
35 DelimiterDetection { delimiter: char },
36 EncodingConversion { from: String, to: String },
37 FlexibleParsing,
38 ChunkSizeReduction { new_size: usize },
39 MemoryOptimization,
40}
41
42#[derive(Error, Debug, Clone)]
44pub enum DataProfilerError {
45 #[error("CSV parsing failed: {message}\nSuggestion: {suggestion}")]
46 CsvParsingError { message: String, suggestion: String },
47
48 #[error(
49 "File not found: {path}\nPlease check that the file exists and you have permission to read it"
50 )]
51 FileNotFound { path: String },
52
53 #[error("Unsupported file format: {format}\nSupported formats: CSV, JSON, JSONL")]
54 UnsupportedFormat { format: String },
55
56 #[error(
57 "Memory limit exceeded while processing large file\nTry using streaming mode or increase available memory"
58 )]
59 MemoryLimitExceeded,
60
61 #[error("Invalid configuration: {message}\n{suggestion}")]
62 InvalidConfiguration { message: String, suggestion: String },
63
64 #[error(
65 "Data quality issue detected: {issue}\nImpact: {impact}\nRecommendation: {recommendation}"
66 )]
67 DataQualityIssue {
68 issue: String,
69 impact: String,
70 recommendation: String,
71 },
72
73 #[error(
74 "Streaming processing failed: {message}\nTry setting a smaller chunk size on the profiler builder (e.g. `.chunk_size(ChunkSize::Fixed(1000))`)"
75 )]
76 StreamingError { message: String },
77
78 #[error("SIMD acceleration not available: {reason}\nFalling back to standard processing")]
79 SimdUnavailable { reason: String },
80
81 #[error("Sampling error: {message}\n{suggestion}")]
82 SamplingError { message: String, suggestion: String },
83
84 #[error("I/O error: {message}\nCheck file permissions and disk space")]
85 IoError { message: String },
86
87 #[error("JSON parsing failed: {message}\nVerify JSON format and encoding")]
88 JsonParsingError { message: String },
89
90 #[error("Column analysis failed for '{column}': {reason}\n{suggestion}")]
91 ColumnAnalysisError {
92 column: String,
93 reason: String,
94 suggestion: String,
95 },
96
97 #[error(
98 "Recoverable error (attempt {attempt}/{max_attempts}): {message}\n{recovery_suggestion}"
99 )]
100 RecoverableError {
101 message: String,
102 recovery_suggestion: String,
103 attempt: usize,
104 max_attempts: usize,
105 recovery_attempts: Vec<RecoveryAttempt>,
106 },
107
108 #[error(
109 "Auto-recovery failed after {attempts} attempts\nLast strategy tried: {last_strategy}\nRecovery log: {recovery_log}"
110 )]
111 RecoveryFailed {
112 attempts: usize,
113 last_strategy: String,
114 recovery_log: String,
115 original_error: String,
116 },
117
118 #[error("Parquet processing failed: {message}")]
119 ParquetError { message: String },
120
121 #[error("Arrow processing failed: {message}")]
122 ArrowError { message: String },
123
124 #[error("Unsupported data source: {message}")]
125 UnsupportedDataSource { message: String },
126
127 #[error("All engines failed: {message}")]
128 AllEnginesFailed { message: String },
129
130 #[error("Metrics calculation failed: {message}")]
131 MetricsCalculationError { message: String },
132
133 #[error("Configuration validation failed: {message}")]
134 ConfigValidationError { message: String },
135
136 #[error("Database connection failed: {message}\n{suggestion}")]
137 DatabaseConnectionError { message: String, suggestion: String },
138
139 #[error("Database query failed: {message}")]
140 DatabaseQueryError { message: String },
141
142 #[error("Database configuration error: {message}")]
143 DatabaseConfigError { message: String },
144
145 #[error("Database feature not enabled: {message}\nRecompile with the appropriate feature flag")]
146 DatabaseFeatureDisabled { message: String },
147
148 #[error("SQL validation failed: {message}")]
149 SqlValidationError { message: String },
150
151 #[error("Database SSL/TLS error: {message}")]
152 DatabaseSslError { message: String },
153
154 #[error(
155 "Database retry exhausted: operation '{operation}' failed after {attempts} attempts\nLast error: {last_error}"
156 )]
157 DatabaseRetryExhausted {
158 operation: String,
159 attempts: u32,
160 last_error: String,
161 },
162}
163
164impl DataProfilerError {
165 pub fn database_connection(message: &str) -> Self {
167 let m = message.to_lowercase();
168 let suggestion = if m.contains("refused") {
169 "Check that the database server is running and accepting connections."
170 } else if m.contains("timeout") {
171 "Increase the connection timeout or check network connectivity."
172 } else if m.contains("authentication") || m.contains("password") {
173 "Verify your credentials or use environment variables for authentication."
174 } else {
175 "Verify the connection string format and database server availability."
176 };
177 DataProfilerError::DatabaseConnectionError {
178 message: message.to_string(),
179 suggestion: suggestion.to_string(),
180 }
181 }
182
183 pub fn database_query(message: &str) -> Self {
185 DataProfilerError::DatabaseQueryError {
186 message: message.to_string(),
187 }
188 }
189
190 pub fn database_config(message: &str) -> Self {
192 DataProfilerError::DatabaseConfigError {
193 message: message.to_string(),
194 }
195 }
196
197 pub fn database_feature_disabled(db_name: &str, feature: &str) -> Self {
199 DataProfilerError::DatabaseFeatureDisabled {
200 message: format!(
201 "{} support not compiled. Enable '{}' feature.",
202 db_name, feature
203 ),
204 }
205 }
206
207 pub fn sql_validation(message: &str) -> Self {
209 DataProfilerError::SqlValidationError {
210 message: message.to_string(),
211 }
212 }
213
214 pub fn database_ssl(message: &str) -> Self {
216 DataProfilerError::DatabaseSslError {
217 message: message.to_string(),
218 }
219 }
220 pub fn csv_parsing(original_error: &str, file_path: &str) -> Self {
222 let suggestion = if original_error.contains("field") && original_error.contains("record") {
223 format!(
224 "The CSV file '{}' has inconsistent column counts. This often happens with:\n • Text fields containing commas without proper quoting\n • Mixed line endings (Windows/Unix)\n • Embedded newlines in data\n\n DataProfiler will attempt to parse it with flexible mode automatically.",
225 file_path
226 )
227 } else if original_error.contains("UTF-8") {
228 "The file contains non-UTF-8 characters. Try converting it to UTF-8 encoding."
229 .to_string()
230 } else if original_error.contains("permission") {
231 "Check file permissions - you may not have read access to this file.".to_string()
232 } else {
233 "Try using a different CSV delimiter or check for data formatting issues.".to_string()
234 };
235
236 DataProfilerError::CsvParsingError {
237 message: original_error.to_string(),
238 suggestion,
239 }
240 }
241
242 pub fn file_not_found<P: AsRef<str>>(path: P) -> Self {
244 DataProfilerError::FileNotFound {
245 path: path.as_ref().to_string(),
246 }
247 }
248
249 pub fn unsupported_format(extension: &str) -> Self {
251 DataProfilerError::UnsupportedFormat {
252 format: extension.to_string(),
253 }
254 }
255
256 pub fn invalid_config(message: &str, suggestion: &str) -> Self {
258 DataProfilerError::InvalidConfiguration {
259 message: message.to_string(),
260 suggestion: suggestion.to_string(),
261 }
262 }
263
264 pub fn data_quality_issue(issue: &str, impact: &str, recommendation: &str) -> Self {
266 DataProfilerError::DataQualityIssue {
267 issue: issue.to_string(),
268 impact: impact.to_string(),
269 recommendation: recommendation.to_string(),
270 }
271 }
272
273 pub fn streaming_error(message: &str) -> Self {
275 DataProfilerError::StreamingError {
276 message: message.to_string(),
277 }
278 }
279
280 pub fn simd_unavailable(reason: &str) -> Self {
282 DataProfilerError::SimdUnavailable {
283 reason: reason.to_string(),
284 }
285 }
286
287 pub fn sampling_error(message: &str, suggestion: &str) -> Self {
289 DataProfilerError::SamplingError {
290 message: message.to_string(),
291 suggestion: suggestion.to_string(),
292 }
293 }
294
295 pub fn io_error(original: &std::io::Error) -> Self {
297 DataProfilerError::IoError {
298 message: original.to_string(),
299 }
300 }
301
302 pub fn json_parsing_error(original: &str) -> Self {
304 DataProfilerError::JsonParsingError {
305 message: original.to_string(),
306 }
307 }
308
309 pub fn column_analysis_error(column: &str, reason: &str, suggestion: &str) -> Self {
311 DataProfilerError::ColumnAnalysisError {
312 column: column.to_string(),
313 reason: reason.to_string(),
314 suggestion: suggestion.to_string(),
315 }
316 }
317
318 pub fn recoverable_error(
320 message: &str,
321 recovery_suggestion: &str,
322 attempt: usize,
323 max_attempts: usize,
324 ) -> Self {
325 DataProfilerError::RecoverableError {
326 message: message.to_string(),
327 recovery_suggestion: recovery_suggestion.to_string(),
328 attempt,
329 max_attempts,
330 recovery_attempts: Vec::new(),
331 }
332 }
333
334 pub fn recovery_failed(
336 attempts: usize,
337 last_strategy: &str,
338 recovery_log: &str,
339 original_error: &str,
340 ) -> Self {
341 DataProfilerError::RecoveryFailed {
342 attempts,
343 last_strategy: last_strategy.to_string(),
344 recovery_log: recovery_log.to_string(),
345 original_error: original_error.to_string(),
346 }
347 }
348
349 pub fn add_recovery_attempt(&mut self, attempt: RecoveryAttempt) {
351 if let DataProfilerError::RecoverableError {
352 recovery_attempts, ..
353 } = self
354 {
355 recovery_attempts.push(attempt);
356 }
357 }
358
359 pub fn supports_auto_recovery(&self) -> bool {
361 matches!(
362 self,
363 DataProfilerError::CsvParsingError { .. }
364 | DataProfilerError::JsonParsingError { .. }
365 | DataProfilerError::StreamingError { .. }
366 | DataProfilerError::MemoryLimitExceeded
367 | DataProfilerError::RecoverableError { .. }
368 )
369 }
370
371 pub fn suggested_recovery_strategies(&self) -> Vec<RecoveryStrategy> {
373 match self {
374 DataProfilerError::CsvParsingError { .. } => vec![
375 RecoveryStrategy::DelimiterDetection { delimiter: ',' },
376 RecoveryStrategy::DelimiterDetection { delimiter: ';' },
377 RecoveryStrategy::DelimiterDetection { delimiter: '\t' },
378 RecoveryStrategy::DelimiterDetection { delimiter: '|' },
379 RecoveryStrategy::EncodingConversion {
380 from: "latin1".to_string(),
381 to: "utf8".to_string(),
382 },
383 RecoveryStrategy::FlexibleParsing,
384 ],
385 DataProfilerError::MemoryLimitExceeded => vec![
386 RecoveryStrategy::ChunkSizeReduction { new_size: 1000 },
387 RecoveryStrategy::MemoryOptimization,
388 ],
389 DataProfilerError::JsonParsingError { .. } => {
390 vec![RecoveryStrategy::EncodingConversion {
391 from: "latin1".to_string(),
392 to: "utf8".to_string(),
393 }]
394 }
395 DataProfilerError::StreamingError { .. } => vec![
396 RecoveryStrategy::ChunkSizeReduction { new_size: 500 },
397 RecoveryStrategy::MemoryOptimization,
398 ],
399 _ => vec![],
400 }
401 }
402
403 pub fn is_recoverable(&self) -> bool {
405 matches!(
406 self,
407 DataProfilerError::SimdUnavailable { .. }
408 | DataProfilerError::SamplingError { .. }
409 | DataProfilerError::DataQualityIssue { .. }
410 | DataProfilerError::RecoverableError { .. }
411 )
412 }
413
414 pub fn category(&self) -> &'static str {
416 match self {
417 DataProfilerError::CsvParsingError { .. } => "csv_parsing",
418 DataProfilerError::FileNotFound { .. } => "file_not_found",
419 DataProfilerError::UnsupportedFormat { .. } => "unsupported_format",
420 DataProfilerError::MemoryLimitExceeded => "memory_limit",
421 DataProfilerError::InvalidConfiguration { .. } => "configuration",
422 DataProfilerError::DataQualityIssue { .. } => "data_quality",
423 DataProfilerError::StreamingError { .. } => "streaming",
424 DataProfilerError::SimdUnavailable { .. } => "simd",
425 DataProfilerError::SamplingError { .. } => "sampling",
426 DataProfilerError::IoError { .. } => "io",
427 DataProfilerError::JsonParsingError { .. } => "json_parsing",
428 DataProfilerError::ColumnAnalysisError { .. } => "column_analysis",
429 DataProfilerError::RecoverableError { .. } => "recoverable",
430 DataProfilerError::RecoveryFailed { .. } => "recovery_failed",
431 DataProfilerError::ParquetError { .. } => "parquet",
432 DataProfilerError::ArrowError { .. } => "arrow",
433 DataProfilerError::UnsupportedDataSource { .. } => "unsupported_data_source",
434 DataProfilerError::AllEnginesFailed { .. } => "all_engines_failed",
435 DataProfilerError::MetricsCalculationError { .. } => "metrics_calculation",
436 DataProfilerError::ConfigValidationError { .. } => "config_validation",
437 DataProfilerError::DatabaseConnectionError { .. } => "database_connection",
438 DataProfilerError::DatabaseQueryError { .. } => "database_query",
439 DataProfilerError::DatabaseConfigError { .. } => "database_config",
440 DataProfilerError::DatabaseFeatureDisabled { .. } => "database_feature_disabled",
441 DataProfilerError::SqlValidationError { .. } => "sql_validation",
442 DataProfilerError::DatabaseSslError { .. } => "database_ssl",
443 DataProfilerError::DatabaseRetryExhausted { .. } => "database_retry_exhausted",
444 }
445 }
446}
447
448impl From<anyhow::Error> for DataProfilerError {
450 fn from(err: anyhow::Error) -> Self {
451 let error_str = err.to_string();
452
453 if error_str.contains("No such file") || error_str.contains("not found") {
455 DataProfilerError::FileNotFound {
456 path: "unknown".to_string(),
457 }
458 } else if error_str.contains("CSV") {
459 DataProfilerError::CsvParsingError {
460 message: error_str,
461 suggestion: "Try using robust CSV parsing mode".to_string(),
462 }
463 } else if error_str.contains("JSON") {
464 DataProfilerError::JsonParsingError { message: error_str }
465 } else if error_str.contains("permission") {
466 DataProfilerError::IoError { message: error_str }
467 } else {
468 DataProfilerError::IoError { message: error_str }
470 }
471 }
472}
473
474impl From<std::io::Error> for DataProfilerError {
476 fn from(err: std::io::Error) -> Self {
477 match err.kind() {
478 std::io::ErrorKind::NotFound => DataProfilerError::FileNotFound {
479 path: "unknown".to_string(),
480 },
481 std::io::ErrorKind::PermissionDenied => DataProfilerError::IoError {
482 message: "Permission denied - check file access rights".to_string(),
483 },
484 std::io::ErrorKind::InvalidData => DataProfilerError::CsvParsingError {
485 message: "Invalid data format detected".to_string(),
486 suggestion: "Check file encoding and format".to_string(),
487 },
488 _ => DataProfilerError::IoError {
489 message: err.to_string(),
490 },
491 }
492 }
493}
494
495impl From<csv::Error> for DataProfilerError {
497 fn from(err: csv::Error) -> Self {
498 DataProfilerError::csv_parsing(&err.to_string(), "unknown")
499 }
500}
501
502#[cfg(feature = "arrow")]
504impl From<arrow::error::ArrowError> for DataProfilerError {
505 fn from(err: arrow::error::ArrowError) -> Self {
506 DataProfilerError::ArrowError {
507 message: err.to_string(),
508 }
509 }
510}
511
512impl From<serde_json::Error> for DataProfilerError {
514 fn from(err: serde_json::Error) -> Self {
515 DataProfilerError::JsonParsingError {
516 message: err.to_string(),
517 }
518 }
519}
520
521impl From<glob::PatternError> for DataProfilerError {
523 fn from(err: glob::PatternError) -> Self {
524 DataProfilerError::InvalidConfiguration {
525 message: format!("Invalid glob pattern: {}", err),
526 suggestion: "Check the glob pattern syntax".to_string(),
527 }
528 }
529}
530
531impl From<toml::de::Error> for DataProfilerError {
533 fn from(err: toml::de::Error) -> Self {
534 DataProfilerError::InvalidConfiguration {
535 message: format!("Failed to parse TOML configuration: {}", err),
536 suggestion: "Check your configuration file syntax".to_string(),
537 }
538 }
539}
540
541impl From<toml::ser::Error> for DataProfilerError {
543 fn from(err: toml::ser::Error) -> Self {
544 DataProfilerError::InvalidConfiguration {
545 message: format!("Failed to serialize configuration: {}", err),
546 suggestion: "Check configuration values for serialization issues".to_string(),
547 }
548 }
549}
550
551pub struct AutoRecoveryManager {
553 config: RetryConfig,
554 recovery_log: Vec<RecoveryAttempt>,
555}
556
557impl AutoRecoveryManager {
558 pub fn new(config: RetryConfig) -> Self {
559 Self {
560 config,
561 recovery_log: Vec::new(),
562 }
563 }
564
565 pub fn attempt_recovery<F, T>(
567 &mut self,
568 error: &DataProfilerError,
569 retry_fn: F,
570 ) -> Result<T, DataProfilerError>
571 where
572 F: Fn(RecoveryStrategy) -> Result<T, DataProfilerError>,
573 {
574 if !error.supports_auto_recovery() {
575 return Err(error.clone());
576 }
577
578 let strategies = error.suggested_recovery_strategies();
579 let mut last_error: DataProfilerError = error.clone();
580
581 for (attempt, strategy) in strategies.iter().enumerate() {
582 if attempt >= self.config.max_attempts {
583 break;
584 }
585
586 log::info!(
588 "Auto-recovery attempt {}/{}: {:?}",
589 attempt + 1,
590 self.config.max_attempts,
591 strategy
592 );
593
594 match retry_fn(strategy.clone()) {
595 Ok(result) => {
596 let recovery_attempt = RecoveryAttempt {
597 attempt_number: attempt + 1,
598 strategy: strategy.clone(),
599 success: true,
600 error_message: None,
601 };
602 self.recovery_log.push(recovery_attempt);
603
604 log::info!("Auto-recovery successful with strategy: {:?}", strategy);
605 return Ok(result);
606 }
607 Err(err) => {
608 let recovery_attempt = RecoveryAttempt {
609 attempt_number: attempt + 1,
610 strategy: strategy.clone(),
611 success: false,
612 error_message: Some(err.to_string()),
613 };
614 self.recovery_log.push(recovery_attempt);
615 last_error = err;
616
617 log::warn!("Auto-recovery attempt failed: {}", last_error);
618 }
619 }
620 }
621
622 let recovery_log_text = self
624 .recovery_log
625 .iter()
626 .map(|attempt| {
627 format!(
628 "Attempt {}: {:?} - {}",
629 attempt.attempt_number,
630 attempt.strategy,
631 if attempt.success { "Success" } else { "Failed" }
632 )
633 })
634 .collect::<Vec<_>>()
635 .join("; ");
636
637 let last_strategy = self
638 .recovery_log
639 .last()
640 .map(|attempt| format!("{:?}", attempt.strategy))
641 .unwrap_or_else(|| "None".to_string());
642
643 Err(DataProfilerError::recovery_failed(
644 self.recovery_log.len(),
645 &last_strategy,
646 &recovery_log_text,
647 &last_error.to_string(),
648 ))
649 }
650
651 pub fn get_recovery_log(&self) -> &[RecoveryAttempt] {
653 &self.recovery_log
654 }
655
656 pub fn clear_log(&mut self) {
658 self.recovery_log.clear();
659 }
660}
661
662impl Default for AutoRecoveryManager {
663 fn default() -> Self {
664 Self::new(RetryConfig::default())
665 }
666}
667
668#[cfg(test)]
669mod tests {
670 use super::*;
671
672 #[test]
673 fn test_error_categorization() {
674 let csv_error = DataProfilerError::csv_parsing("field count mismatch", "test.csv");
675 assert_eq!(csv_error.category(), "csv_parsing");
676 assert!(!csv_error.is_recoverable());
677 }
678
679 #[test]
680 fn test_recoverable_errors() {
681 let simd_error = DataProfilerError::simd_unavailable("CPU doesn't support SIMD");
682 assert!(simd_error.is_recoverable());
683 }
684
685 #[test]
686 fn test_error_suggestions() {
687 let config_error = DataProfilerError::invalid_config(
688 "Invalid chunk size",
689 "Use a value between 1000 and 100000",
690 );
691
692 let error_string = config_error.to_string();
693 assert!(error_string.contains("Invalid chunk size"));
694 assert!(error_string.contains("Use a value between"));
695 }
696}