Skip to main content

dataprof_core/
errors.rs

1use thiserror::Error;
2
3/// Auto-retry configuration for error recovery
4#[derive(Debug, Clone)]
5pub struct RetryConfig {
6    pub max_attempts: usize,
7    pub enable_delimiter_detection: bool,
8    pub enable_encoding_detection: bool,
9    pub enable_flexible_parsing: bool,
10}
11
12impl Default for RetryConfig {
13    fn default() -> Self {
14        Self {
15            max_attempts: 3,
16            enable_delimiter_detection: true,
17            enable_encoding_detection: true,
18            enable_flexible_parsing: true,
19        }
20    }
21}
22
23/// Result of an auto-recovery attempt
24#[derive(Debug, Clone)]
25pub struct RecoveryAttempt {
26    pub attempt_number: usize,
27    pub strategy: RecoveryStrategy,
28    pub success: bool,
29    pub error_message: Option<String>,
30}
31
32/// Strategies for error recovery
33#[derive(Debug, Clone)]
34pub enum RecoveryStrategy {
35    DelimiterDetection { delimiter: char },
36    EncodingConversion { from: String, to: String },
37    FlexibleParsing,
38    ChunkSizeReduction { new_size: usize },
39    MemoryOptimization,
40}
41
42/// Enhanced error types with more descriptive messages for DataProfiler
43#[derive(Error, Debug, Clone)]
44pub enum DataProfilerError {
45    #[error("CSV parsing failed: {message}\nSuggestion: {suggestion}")]
46    CsvParsingError { message: String, suggestion: String },
47
48    #[error(
49        "File not found: {path}\nPlease check that the file exists and you have permission to read it"
50    )]
51    FileNotFound { path: String },
52
53    #[error("Unsupported file format: {format}\nSupported formats: CSV, JSON, JSONL")]
54    UnsupportedFormat { format: String },
55
56    #[error(
57        "Memory limit exceeded while processing large file\nTry using streaming mode or increase available memory"
58    )]
59    MemoryLimitExceeded,
60
61    #[error("Invalid configuration: {message}\n{suggestion}")]
62    InvalidConfiguration { message: String, suggestion: String },
63
64    #[error(
65        "Data quality issue detected: {issue}\nImpact: {impact}\nRecommendation: {recommendation}"
66    )]
67    DataQualityIssue {
68        issue: String,
69        impact: String,
70        recommendation: String,
71    },
72
73    #[error(
74        "Streaming processing failed: {message}\nTry setting a smaller chunk size on the profiler builder (e.g. `.chunk_size(ChunkSize::Fixed(1000))`)"
75    )]
76    StreamingError { message: String },
77
78    #[error("SIMD acceleration not available: {reason}\nFalling back to standard processing")]
79    SimdUnavailable { reason: String },
80
81    #[error("Sampling error: {message}\n{suggestion}")]
82    SamplingError { message: String, suggestion: String },
83
84    #[error("I/O error: {message}\nCheck file permissions and disk space")]
85    IoError { message: String },
86
87    #[error("JSON parsing failed: {message}\nVerify JSON format and encoding")]
88    JsonParsingError { message: String },
89
90    #[error("Column analysis failed for '{column}': {reason}\n{suggestion}")]
91    ColumnAnalysisError {
92        column: String,
93        reason: String,
94        suggestion: String,
95    },
96
97    #[error(
98        "Recoverable error (attempt {attempt}/{max_attempts}): {message}\n{recovery_suggestion}"
99    )]
100    RecoverableError {
101        message: String,
102        recovery_suggestion: String,
103        attempt: usize,
104        max_attempts: usize,
105        recovery_attempts: Vec<RecoveryAttempt>,
106    },
107
108    #[error(
109        "Auto-recovery failed after {attempts} attempts\nLast strategy tried: {last_strategy}\nRecovery log: {recovery_log}"
110    )]
111    RecoveryFailed {
112        attempts: usize,
113        last_strategy: String,
114        recovery_log: String,
115        original_error: String,
116    },
117
118    #[error("Parquet processing failed: {message}")]
119    ParquetError { message: String },
120
121    #[error("Arrow processing failed: {message}")]
122    ArrowError { message: String },
123
124    #[error("Unsupported data source: {message}")]
125    UnsupportedDataSource { message: String },
126
127    #[error("All engines failed: {message}")]
128    AllEnginesFailed { message: String },
129
130    #[error("Metrics calculation failed: {message}")]
131    MetricsCalculationError { message: String },
132
133    #[error("Configuration validation failed: {message}")]
134    ConfigValidationError { message: String },
135
136    #[error("Database connection failed: {message}\n{suggestion}")]
137    DatabaseConnectionError { message: String, suggestion: String },
138
139    #[error("Database query failed: {message}")]
140    DatabaseQueryError { message: String },
141
142    #[error("Database configuration error: {message}")]
143    DatabaseConfigError { message: String },
144
145    #[error("Database feature not enabled: {message}\nRecompile with the appropriate feature flag")]
146    DatabaseFeatureDisabled { message: String },
147
148    #[error("SQL validation failed: {message}")]
149    SqlValidationError { message: String },
150
151    #[error("Database SSL/TLS error: {message}")]
152    DatabaseSslError { message: String },
153
154    #[error(
155        "Database retry exhausted: operation '{operation}' failed after {attempts} attempts\nLast error: {last_error}"
156    )]
157    DatabaseRetryExhausted {
158        operation: String,
159        attempts: u32,
160        last_error: String,
161    },
162}
163
164impl DataProfilerError {
165    /// Create a database connection error
166    pub fn database_connection(message: &str) -> Self {
167        let m = message.to_lowercase();
168        let suggestion = if m.contains("refused") {
169            "Check that the database server is running and accepting connections."
170        } else if m.contains("timeout") {
171            "Increase the connection timeout or check network connectivity."
172        } else if m.contains("authentication") || m.contains("password") {
173            "Verify your credentials or use environment variables for authentication."
174        } else {
175            "Verify the connection string format and database server availability."
176        };
177        DataProfilerError::DatabaseConnectionError {
178            message: message.to_string(),
179            suggestion: suggestion.to_string(),
180        }
181    }
182
183    /// Create a database query error
184    pub fn database_query(message: &str) -> Self {
185        DataProfilerError::DatabaseQueryError {
186            message: message.to_string(),
187        }
188    }
189
190    /// Create a database config error
191    pub fn database_config(message: &str) -> Self {
192        DataProfilerError::DatabaseConfigError {
193            message: message.to_string(),
194        }
195    }
196
197    /// Create a feature-not-enabled error
198    pub fn database_feature_disabled(db_name: &str, feature: &str) -> Self {
199        DataProfilerError::DatabaseFeatureDisabled {
200            message: format!(
201                "{} support not compiled. Enable '{}' feature.",
202                db_name, feature
203            ),
204        }
205    }
206
207    /// Create a SQL validation error
208    pub fn sql_validation(message: &str) -> Self {
209        DataProfilerError::SqlValidationError {
210            message: message.to_string(),
211        }
212    }
213
214    /// Create a database SSL error
215    pub fn database_ssl(message: &str) -> Self {
216        DataProfilerError::DatabaseSslError {
217            message: message.to_string(),
218        }
219    }
220    /// Create a CSV parsing error with helpful suggestions
221    pub fn csv_parsing(original_error: &str, file_path: &str) -> Self {
222        let suggestion = if original_error.contains("field") && original_error.contains("record") {
223            format!(
224                "The CSV file '{}' has inconsistent column counts. This often happens with:\n  • Text fields containing commas without proper quoting\n  • Mixed line endings (Windows/Unix)\n  • Embedded newlines in data\n\n  DataProfiler will attempt to parse it with flexible mode automatically.",
225                file_path
226            )
227        } else if original_error.contains("UTF-8") {
228            "The file contains non-UTF-8 characters. Try converting it to UTF-8 encoding."
229                .to_string()
230        } else if original_error.contains("permission") {
231            "Check file permissions - you may not have read access to this file.".to_string()
232        } else {
233            "Try using a different CSV delimiter or check for data formatting issues.".to_string()
234        };
235
236        DataProfilerError::CsvParsingError {
237            message: original_error.to_string(),
238            suggestion,
239        }
240    }
241
242    /// Create a file not found error with path context
243    pub fn file_not_found<P: AsRef<str>>(path: P) -> Self {
244        DataProfilerError::FileNotFound {
245            path: path.as_ref().to_string(),
246        }
247    }
248
249    /// Create unsupported format error with format detection
250    pub fn unsupported_format(extension: &str) -> Self {
251        DataProfilerError::UnsupportedFormat {
252            format: extension.to_string(),
253        }
254    }
255
256    /// Create configuration error with specific suggestion
257    pub fn invalid_config(message: &str, suggestion: &str) -> Self {
258        DataProfilerError::InvalidConfiguration {
259            message: message.to_string(),
260            suggestion: suggestion.to_string(),
261        }
262    }
263
264    /// Create data quality issue with impact and recommendation
265    pub fn data_quality_issue(issue: &str, impact: &str, recommendation: &str) -> Self {
266        DataProfilerError::DataQualityIssue {
267            issue: issue.to_string(),
268            impact: impact.to_string(),
269            recommendation: recommendation.to_string(),
270        }
271    }
272
273    /// Create streaming error with context
274    pub fn streaming_error(message: &str) -> Self {
275        DataProfilerError::StreamingError {
276            message: message.to_string(),
277        }
278    }
279
280    /// Create SIMD error with fallback information
281    pub fn simd_unavailable(reason: &str) -> Self {
282        DataProfilerError::SimdUnavailable {
283            reason: reason.to_string(),
284        }
285    }
286
287    /// Create sampling error with suggestion
288    pub fn sampling_error(message: &str, suggestion: &str) -> Self {
289        DataProfilerError::SamplingError {
290            message: message.to_string(),
291            suggestion: suggestion.to_string(),
292        }
293    }
294
295    /// Create I/O error with context
296    pub fn io_error(original: &std::io::Error) -> Self {
297        DataProfilerError::IoError {
298            message: original.to_string(),
299        }
300    }
301
302    /// Create JSON parsing error
303    pub fn json_parsing_error(original: &str) -> Self {
304        DataProfilerError::JsonParsingError {
305            message: original.to_string(),
306        }
307    }
308
309    /// Create column analysis error with specific suggestion
310    pub fn column_analysis_error(column: &str, reason: &str, suggestion: &str) -> Self {
311        DataProfilerError::ColumnAnalysisError {
312            column: column.to_string(),
313            reason: reason.to_string(),
314            suggestion: suggestion.to_string(),
315        }
316    }
317
318    /// Create a recoverable error that can be auto-retried
319    pub fn recoverable_error(
320        message: &str,
321        recovery_suggestion: &str,
322        attempt: usize,
323        max_attempts: usize,
324    ) -> Self {
325        DataProfilerError::RecoverableError {
326            message: message.to_string(),
327            recovery_suggestion: recovery_suggestion.to_string(),
328            attempt,
329            max_attempts,
330            recovery_attempts: Vec::new(),
331        }
332    }
333
334    /// Create a recovery failed error with attempt log
335    pub fn recovery_failed(
336        attempts: usize,
337        last_strategy: &str,
338        recovery_log: &str,
339        original_error: &str,
340    ) -> Self {
341        DataProfilerError::RecoveryFailed {
342            attempts,
343            last_strategy: last_strategy.to_string(),
344            recovery_log: recovery_log.to_string(),
345            original_error: original_error.to_string(),
346        }
347    }
348
349    /// Add a recovery attempt to the error
350    pub fn add_recovery_attempt(&mut self, attempt: RecoveryAttempt) {
351        if let DataProfilerError::RecoverableError {
352            recovery_attempts, ..
353        } = self
354        {
355            recovery_attempts.push(attempt);
356        }
357    }
358
359    /// Check if error supports auto-recovery
360    pub fn supports_auto_recovery(&self) -> bool {
361        matches!(
362            self,
363            DataProfilerError::CsvParsingError { .. }
364                | DataProfilerError::JsonParsingError { .. }
365                | DataProfilerError::StreamingError { .. }
366                | DataProfilerError::MemoryLimitExceeded
367                | DataProfilerError::RecoverableError { .. }
368        )
369    }
370
371    /// Get suggested recovery strategies for this error
372    pub fn suggested_recovery_strategies(&self) -> Vec<RecoveryStrategy> {
373        match self {
374            DataProfilerError::CsvParsingError { .. } => vec![
375                RecoveryStrategy::DelimiterDetection { delimiter: ',' },
376                RecoveryStrategy::DelimiterDetection { delimiter: ';' },
377                RecoveryStrategy::DelimiterDetection { delimiter: '\t' },
378                RecoveryStrategy::DelimiterDetection { delimiter: '|' },
379                RecoveryStrategy::EncodingConversion {
380                    from: "latin1".to_string(),
381                    to: "utf8".to_string(),
382                },
383                RecoveryStrategy::FlexibleParsing,
384            ],
385            DataProfilerError::MemoryLimitExceeded => vec![
386                RecoveryStrategy::ChunkSizeReduction { new_size: 1000 },
387                RecoveryStrategy::MemoryOptimization,
388            ],
389            DataProfilerError::JsonParsingError { .. } => {
390                vec![RecoveryStrategy::EncodingConversion {
391                    from: "latin1".to_string(),
392                    to: "utf8".to_string(),
393                }]
394            }
395            DataProfilerError::StreamingError { .. } => vec![
396                RecoveryStrategy::ChunkSizeReduction { new_size: 500 },
397                RecoveryStrategy::MemoryOptimization,
398            ],
399            _ => vec![],
400        }
401    }
402
403    /// Check if this error is recoverable (can continue processing)
404    pub fn is_recoverable(&self) -> bool {
405        matches!(
406            self,
407            DataProfilerError::SimdUnavailable { .. }
408                | DataProfilerError::SamplingError { .. }
409                | DataProfilerError::DataQualityIssue { .. }
410                | DataProfilerError::RecoverableError { .. }
411        )
412    }
413
414    /// Get error category for logging/metrics
415    pub fn category(&self) -> &'static str {
416        match self {
417            DataProfilerError::CsvParsingError { .. } => "csv_parsing",
418            DataProfilerError::FileNotFound { .. } => "file_not_found",
419            DataProfilerError::UnsupportedFormat { .. } => "unsupported_format",
420            DataProfilerError::MemoryLimitExceeded => "memory_limit",
421            DataProfilerError::InvalidConfiguration { .. } => "configuration",
422            DataProfilerError::DataQualityIssue { .. } => "data_quality",
423            DataProfilerError::StreamingError { .. } => "streaming",
424            DataProfilerError::SimdUnavailable { .. } => "simd",
425            DataProfilerError::SamplingError { .. } => "sampling",
426            DataProfilerError::IoError { .. } => "io",
427            DataProfilerError::JsonParsingError { .. } => "json_parsing",
428            DataProfilerError::ColumnAnalysisError { .. } => "column_analysis",
429            DataProfilerError::RecoverableError { .. } => "recoverable",
430            DataProfilerError::RecoveryFailed { .. } => "recovery_failed",
431            DataProfilerError::ParquetError { .. } => "parquet",
432            DataProfilerError::ArrowError { .. } => "arrow",
433            DataProfilerError::UnsupportedDataSource { .. } => "unsupported_data_source",
434            DataProfilerError::AllEnginesFailed { .. } => "all_engines_failed",
435            DataProfilerError::MetricsCalculationError { .. } => "metrics_calculation",
436            DataProfilerError::ConfigValidationError { .. } => "config_validation",
437            DataProfilerError::DatabaseConnectionError { .. } => "database_connection",
438            DataProfilerError::DatabaseQueryError { .. } => "database_query",
439            DataProfilerError::DatabaseConfigError { .. } => "database_config",
440            DataProfilerError::DatabaseFeatureDisabled { .. } => "database_feature_disabled",
441            DataProfilerError::SqlValidationError { .. } => "sql_validation",
442            DataProfilerError::DatabaseSslError { .. } => "database_ssl",
443            DataProfilerError::DatabaseRetryExhausted { .. } => "database_retry_exhausted",
444        }
445    }
446}
447
448/// Convert from anyhow::Error to DataProfilerError with context
449impl From<anyhow::Error> for DataProfilerError {
450    fn from(err: anyhow::Error) -> Self {
451        let error_str = err.to_string();
452
453        // Try to categorize the error based on its message
454        if error_str.contains("No such file") || error_str.contains("not found") {
455            DataProfilerError::FileNotFound {
456                path: "unknown".to_string(),
457            }
458        } else if error_str.contains("CSV") {
459            DataProfilerError::CsvParsingError {
460                message: error_str,
461                suggestion: "Try using robust CSV parsing mode".to_string(),
462            }
463        } else if error_str.contains("JSON") {
464            DataProfilerError::JsonParsingError { message: error_str }
465        } else if error_str.contains("permission") {
466            DataProfilerError::IoError { message: error_str }
467        } else {
468            // Generic error
469            DataProfilerError::IoError { message: error_str }
470        }
471    }
472}
473
474/// Convert from std::io::Error to DataProfilerError
475impl From<std::io::Error> for DataProfilerError {
476    fn from(err: std::io::Error) -> Self {
477        match err.kind() {
478            std::io::ErrorKind::NotFound => DataProfilerError::FileNotFound {
479                path: "unknown".to_string(),
480            },
481            std::io::ErrorKind::PermissionDenied => DataProfilerError::IoError {
482                message: "Permission denied - check file access rights".to_string(),
483            },
484            std::io::ErrorKind::InvalidData => DataProfilerError::CsvParsingError {
485                message: "Invalid data format detected".to_string(),
486                suggestion: "Check file encoding and format".to_string(),
487            },
488            _ => DataProfilerError::IoError {
489                message: err.to_string(),
490            },
491        }
492    }
493}
494
495/// Convert from csv::Error to DataProfilerError with enhanced context
496impl From<csv::Error> for DataProfilerError {
497    fn from(err: csv::Error) -> Self {
498        DataProfilerError::csv_parsing(&err.to_string(), "unknown")
499    }
500}
501
502/// Convert from arrow::error::ArrowError to DataProfilerError
503#[cfg(feature = "arrow")]
504impl From<arrow::error::ArrowError> for DataProfilerError {
505    fn from(err: arrow::error::ArrowError) -> Self {
506        DataProfilerError::ArrowError {
507            message: err.to_string(),
508        }
509    }
510}
511
512/// Convert from serde_json::Error to DataProfilerError
513impl From<serde_json::Error> for DataProfilerError {
514    fn from(err: serde_json::Error) -> Self {
515        DataProfilerError::JsonParsingError {
516            message: err.to_string(),
517        }
518    }
519}
520
521/// Convert from glob::PatternError to DataProfilerError
522impl From<glob::PatternError> for DataProfilerError {
523    fn from(err: glob::PatternError) -> Self {
524        DataProfilerError::InvalidConfiguration {
525            message: format!("Invalid glob pattern: {}", err),
526            suggestion: "Check the glob pattern syntax".to_string(),
527        }
528    }
529}
530
531/// Convert from toml::de::Error to DataProfilerError
532impl From<toml::de::Error> for DataProfilerError {
533    fn from(err: toml::de::Error) -> Self {
534        DataProfilerError::InvalidConfiguration {
535            message: format!("Failed to parse TOML configuration: {}", err),
536            suggestion: "Check your configuration file syntax".to_string(),
537        }
538    }
539}
540
541/// Convert from toml::ser::Error to DataProfilerError
542impl From<toml::ser::Error> for DataProfilerError {
543    fn from(err: toml::ser::Error) -> Self {
544        DataProfilerError::InvalidConfiguration {
545            message: format!("Failed to serialize configuration: {}", err),
546            suggestion: "Check configuration values for serialization issues".to_string(),
547        }
548    }
549}
550
551/// Auto-recovery manager for handling error recovery strategies
552pub struct AutoRecoveryManager {
553    config: RetryConfig,
554    recovery_log: Vec<RecoveryAttempt>,
555}
556
557impl AutoRecoveryManager {
558    pub fn new(config: RetryConfig) -> Self {
559        Self {
560            config,
561            recovery_log: Vec::new(),
562        }
563    }
564
565    /// Attempt auto-recovery for a given error
566    pub fn attempt_recovery<F, T>(
567        &mut self,
568        error: &DataProfilerError,
569        retry_fn: F,
570    ) -> Result<T, DataProfilerError>
571    where
572        F: Fn(RecoveryStrategy) -> Result<T, DataProfilerError>,
573    {
574        if !error.supports_auto_recovery() {
575            return Err(error.clone());
576        }
577
578        let strategies = error.suggested_recovery_strategies();
579        let mut last_error: DataProfilerError = error.clone();
580
581        for (attempt, strategy) in strategies.iter().enumerate() {
582            if attempt >= self.config.max_attempts {
583                break;
584            }
585
586            // Log attempt start
587            log::info!(
588                "Auto-recovery attempt {}/{}: {:?}",
589                attempt + 1,
590                self.config.max_attempts,
591                strategy
592            );
593
594            match retry_fn(strategy.clone()) {
595                Ok(result) => {
596                    let recovery_attempt = RecoveryAttempt {
597                        attempt_number: attempt + 1,
598                        strategy: strategy.clone(),
599                        success: true,
600                        error_message: None,
601                    };
602                    self.recovery_log.push(recovery_attempt);
603
604                    log::info!("Auto-recovery successful with strategy: {:?}", strategy);
605                    return Ok(result);
606                }
607                Err(err) => {
608                    let recovery_attempt = RecoveryAttempt {
609                        attempt_number: attempt + 1,
610                        strategy: strategy.clone(),
611                        success: false,
612                        error_message: Some(err.to_string()),
613                    };
614                    self.recovery_log.push(recovery_attempt);
615                    last_error = err;
616
617                    log::warn!("Auto-recovery attempt failed: {}", last_error);
618                }
619            }
620        }
621
622        // All recovery attempts failed
623        let recovery_log_text = self
624            .recovery_log
625            .iter()
626            .map(|attempt| {
627                format!(
628                    "Attempt {}: {:?} - {}",
629                    attempt.attempt_number,
630                    attempt.strategy,
631                    if attempt.success { "Success" } else { "Failed" }
632                )
633            })
634            .collect::<Vec<_>>()
635            .join("; ");
636
637        let last_strategy = self
638            .recovery_log
639            .last()
640            .map(|attempt| format!("{:?}", attempt.strategy))
641            .unwrap_or_else(|| "None".to_string());
642
643        Err(DataProfilerError::recovery_failed(
644            self.recovery_log.len(),
645            &last_strategy,
646            &recovery_log_text,
647            &last_error.to_string(),
648        ))
649    }
650
651    /// Get the recovery log
652    pub fn get_recovery_log(&self) -> &[RecoveryAttempt] {
653        &self.recovery_log
654    }
655
656    /// Clear the recovery log
657    pub fn clear_log(&mut self) {
658        self.recovery_log.clear();
659    }
660}
661
662impl Default for AutoRecoveryManager {
663    fn default() -> Self {
664        Self::new(RetryConfig::default())
665    }
666}
667
668#[cfg(test)]
669mod tests {
670    use super::*;
671
672    #[test]
673    fn test_error_categorization() {
674        let csv_error = DataProfilerError::csv_parsing("field count mismatch", "test.csv");
675        assert_eq!(csv_error.category(), "csv_parsing");
676        assert!(!csv_error.is_recoverable());
677    }
678
679    #[test]
680    fn test_recoverable_errors() {
681        let simd_error = DataProfilerError::simd_unavailable("CPU doesn't support SIMD");
682        assert!(simd_error.is_recoverable());
683    }
684
685    #[test]
686    fn test_error_suggestions() {
687        let config_error = DataProfilerError::invalid_config(
688            "Invalid chunk size",
689            "Use a value between 1000 and 100000",
690        );
691
692        let error_string = config_error.to_string();
693        assert!(error_string.contains("Invalid chunk size"));
694        assert!(error_string.contains("Use a value between"));
695    }
696}