use crate::processor::DatasetProcessor;
use std::fs;
use tempfile::TempDir;
#[derive(Debug, Clone)]
struct TestCsvFile {
filename: String,
columns: Vec<String>,
data_rows: Vec<Vec<String>>,
has_badc_header: bool,
}
impl TestCsvFile {
fn new(filename: &str, columns: Vec<&str>, data_rows: Vec<Vec<&str>>) -> Self {
Self {
filename: filename.to_string(),
columns: columns.iter().map(|s| s.to_string()).collect(),
data_rows: data_rows
.iter()
.map(|row| row.iter().map(|s| s.to_string()).collect())
.collect(),
has_badc_header: true,
}
}
fn to_csv_content(&self) -> String {
let mut content = String::new();
if self.has_badc_header {
content.push_str("observation_station,G,Test Station\n");
content.push_str("midas_station_id,G,00001\n");
content.push_str("historic_county_name,G,testshire\n");
content.push_str("location,G,51.5,-0.1\n");
content.push_str("height,G,10,m\n");
content.push_str("data\n");
}
content.push_str(&self.columns.join(","));
content.push('\n');
for row in &self.data_rows {
content.push_str(&row.join(","));
content.push('\n');
}
if self.has_badc_header {
content.push_str("end data\n");
}
content
}
}
fn create_varying_schema_test_data() -> Vec<TestCsvFile> {
vec![
TestCsvFile::new(
"rain_standard.csv",
vec![
"ob_end_ctime",
"id",
"prcp_amt",
"prcp_amt_q",
"prcp_dur",
"prcp_dur_q",
"prcp_amt_j",
"prcp_amt_j_q",
"midas_stmp",
"midas_stmp_etime",
"meto_stmp",
"meto_stmp_etime",
"quality_flag",
"version",
"src_id",
],
vec![
vec![
"2023-01-01",
"00001",
"5.0",
"0",
"24",
"0",
"0.0",
"0",
"2023-01-01T00:00:00Z",
"2023-01-01T23:59:59Z",
"2023-01-01T00:00:00Z",
"2023-01-01T23:59:59Z",
"0",
"1",
"MIDAS",
],
vec![
"2023-01-02",
"00001",
"3.2",
"0",
"24",
"0",
"0.0",
"0",
"2023-01-02T00:00:00Z",
"2023-01-02T23:59:59Z",
"2023-01-02T00:00:00Z",
"2023-01-02T23:59:59Z",
"0",
"1",
"MIDAS",
],
],
),
TestCsvFile::new(
"rain_missing_cols.csv",
vec![
"ob_end_ctime",
"id",
"prcp_amt",
"prcp_amt_q",
"prcp_dur",
"prcp_dur_q",
"prcp_amt_j",
"prcp_amt_j_q",
"midas_stmp",
"meto_stmp",
"quality_flag",
"version",
"src_id",
],
vec![
vec![
"2023-01-03",
"00001",
"2.1",
"0",
"24",
"0",
"0.0",
"0",
"2023-01-03T00:00:00Z",
"2023-01-03T00:00:00Z",
"0",
"1",
"MIDAS",
],
vec![
"2023-01-04",
"00001",
"0.0",
"0",
"24",
"0",
"0.0",
"0",
"2023-01-04T00:00:00Z",
"2023-01-04T00:00:00Z",
"0",
"1",
"MIDAS",
],
],
),
TestCsvFile::new(
"rain_extra_cols.csv",
vec![
"ob_end_ctime",
"id",
"prcp_amt",
"prcp_amt_q",
"prcp_dur",
"prcp_dur_q",
"prcp_amt_j",
"prcp_amt_j_q",
"midas_stmp",
"midas_stmp_etime",
"meto_stmp",
"meto_stmp_etime",
"quality_flag",
"version",
"src_id",
"extra_col",
],
vec![
vec![
"2023-01-05",
"00001",
"1.8",
"0",
"24",
"0",
"0.0",
"0",
"2023-01-05T00:00:00Z",
"2023-01-05T23:59:59Z",
"2023-01-05T00:00:00Z",
"2023-01-05T23:59:59Z",
"0",
"1",
"MIDAS",
"extra_value",
],
vec![
"2023-01-06",
"00001",
"4.5",
"0",
"24",
"0",
"0.0",
"0",
"2023-01-06T00:00:00Z",
"2023-01-06T23:59:59Z",
"2023-01-06T00:00:00Z",
"2023-01-06T23:59:59Z",
"0",
"1",
"MIDAS",
"extra_value2",
],
],
),
]
}
fn create_schema_union_test_dataset(
temp_dir: &TempDir,
) -> (std::path::PathBuf, std::path::PathBuf) {
let dataset_path = temp_dir.path().join("test-schema-union");
let qcv_path = dataset_path.join("qcv-1");
let station_path = qcv_path.join("county1").join("station1");
fs::create_dir_all(&station_path).unwrap();
let test_files = create_varying_schema_test_data();
for test_file in test_files {
let csv_content = test_file.to_csv_content();
fs::write(station_path.join(&test_file.filename), csv_content).unwrap();
}
let output_path = temp_dir
.path()
.join("output")
.join("schema-union-test.parquet");
fs::create_dir_all(output_path.parent().unwrap()).unwrap();
(dataset_path, output_path)
}
#[tokio::test]
async fn test_schema_union_failure_reproduction() {
let temp_dir = TempDir::new().unwrap();
let (dataset_path, output_path) = create_schema_union_test_dataset(&temp_dir);
let mut processor = DatasetProcessor::new(dataset_path, Some(output_path.clone())).unwrap();
let result = processor.process().await;
println!("Current schema union result: {:?}", result);
assert!(result.is_ok() || result.is_err());
}
#[tokio::test]
async fn test_diagonal_concat_with_varying_schemas() {
let temp_dir = TempDir::new().unwrap();
let (dataset_path, output_path) = create_schema_union_test_dataset(&temp_dir);
let mut processor = DatasetProcessor::new(dataset_path, Some(output_path.clone())).unwrap();
let result = processor.process().await;
match result {
Ok(stats) => {
assert_eq!(stats.files_processed, 3);
assert_eq!(stats.files_failed, 0);
assert!(output_path.exists());
}
Err(e) => {
println!("Schema union test failed: {}", e);
}
}
}
#[tokio::test]
async fn test_null_handling_for_missing_columns() {
let temp_dir = TempDir::new().unwrap();
let (dataset_path, output_path) = create_schema_union_test_dataset(&temp_dir);
let mut processor = DatasetProcessor::new(dataset_path, Some(output_path.clone())).unwrap();
let result = processor.process().await;
match result {
Ok(_stats) => {
println!("Null handling test - needs parquet validation implementation");
}
Err(e) => {
println!("Null handling test failed (expected until fix): {}", e);
}
}
}
#[tokio::test]
async fn test_error_handling_for_incompatible_schemas() {
let temp_dir = TempDir::new().unwrap();
let dataset_path = temp_dir.path().join("test-incompatible");
let qcv_path = dataset_path.join("qcv-1");
let station_path = qcv_path.join("county1").join("station1");
fs::create_dir_all(&station_path).unwrap();
let incompatible_content = r#"observation_station,G,Test Station
midas_station_id,G,00001
historic_county_name,G,testshire
location,G,51.5,-0.1
height,G,10,m
data
ob_end_ctime,completely_different_column
invalid_date,text_data
another_invalid_date,more_text
end data"#;
fs::write(station_path.join("incompatible.csv"), incompatible_content).unwrap();
let output_path = temp_dir
.path()
.join("output")
.join("incompatible-test.parquet");
fs::create_dir_all(output_path.parent().unwrap()).unwrap();
let mut processor = DatasetProcessor::new(dataset_path, Some(output_path)).unwrap();
let result = processor.process().await;
match result {
Ok(_) => {
println!("Incompatible schema test passed - diagonal concat handled gracefully");
}
Err(e) => {
let error_msg = e.to_string();
println!("Incompatible schema error (expected): {}", error_msg);
assert!(
error_msg.contains("schema")
|| error_msg.contains("column")
|| error_msg.contains("parse")
|| error_msg.contains("configuration")
|| error_msg.contains("dataset")
);
}
}
}
fn create_files_with_column_counts(
temp_dir: &TempDir,
column_counts: Vec<usize>,
) -> std::path::PathBuf {
let dataset_path = temp_dir.path().join("test-column-counts");
let qcv_path = dataset_path.join("qcv-1");
let station_path = qcv_path.join("county1").join("station1");
fs::create_dir_all(&station_path).unwrap();
for (i, count) in column_counts.iter().enumerate() {
let columns: Vec<String> = (0..*count).map(|j| format!("col_{}", j)).collect();
let data_row: Vec<String> = (0..*count).map(|j| format!("data_{}", j)).collect();
let test_file = TestCsvFile::new(
&format!("file_{}.csv", i),
columns.iter().map(|s| s.as_str()).collect(),
vec![data_row.iter().map(|s| s.as_str()).collect()],
);
fs::write(
station_path.join(&test_file.filename),
test_file.to_csv_content(),
)
.unwrap();
}
dataset_path
}
#[tokio::test]
async fn test_performance_diagonal_vs_standard() {
let temp_dir = TempDir::new().unwrap();
let _identical_schema_path = create_files_with_column_counts(&temp_dir, vec![10, 10, 10]);
let _varying_schema_path = create_files_with_column_counts(&temp_dir, vec![10, 8, 12]);
println!("Performance test setup complete - timing implementation needed");
}