use crate::config::MidasConfig;
use crate::processor::DatasetProcessor;
use std::fs;
use tempfile::TempDir;
fn create_multi_station_dataset(temp_dir: &TempDir) -> (std::path::PathBuf, std::path::PathBuf) {
let dataset_path = temp_dir.path().join("multi-station-dataset");
let qcv_path = dataset_path.join("qcv-1");
let station1_path = qcv_path.join("county1").join("station1");
fs::create_dir_all(&station1_path).unwrap();
let csv_content_1 = r#"observation_station,G,Test Station 1
midas_station_id,G,00001
historic_county_name,G,county1
location,G,51.5,-0.1
height,G,10,m
data
ob_end_ctime,prcp_amt,prcp_amt_q
2023-01-01,5.0,0
2023-01-02,3.2,0
end data"#;
fs::write(station1_path.join("rain_2023.csv"), csv_content_1).unwrap();
let station2_path = qcv_path.join("county1").join("station2");
fs::create_dir_all(&station2_path).unwrap();
let csv_content_2 = r#"observation_station,G,Test Station 2
midas_station_id,G,00002
historic_county_name,G,county1
location,G,52.0,-0.2
height,G,20,m
data
ob_end_ctime,prcp_amt,prcp_amt_q
2023-01-01,2.5,0
2023-01-02,1.8,0
2023-01-03,4.1,0
end data"#;
fs::write(station2_path.join("rain_2023.csv"), csv_content_2).unwrap();
let station3_path = qcv_path.join("county2").join("station3");
fs::create_dir_all(&station3_path).unwrap();
let csv_content_3 = r#"observation_station,G,Test Station 3
midas_station_id,G,00003
historic_county_name,G,county2
location,G,53.0,-0.3
height,G,30,m
data
ob_end_ctime,prcp_amt,prcp_amt_q
2023-01-01,1.0,0
2023-01-02,0.5,0
2023-01-03,2.3,0
2023-01-04,0.0,0
end data"#;
fs::write(station3_path.join("rain_2023.csv"), csv_content_3).unwrap();
let output_path = temp_dir
.path()
.join("output")
.join("multi-station-dataset.parquet");
fs::create_dir_all(output_path.parent().unwrap()).unwrap();
(dataset_path, output_path)
}
#[tokio::test]
async fn test_multi_station_processing() {
let temp_dir = TempDir::new().unwrap();
let (dataset_path, output_path) = create_multi_station_dataset(&temp_dir);
let mut processor = DatasetProcessor::new(dataset_path, Some(output_path.clone())).unwrap();
let stats = processor.process().await.unwrap();
assert_eq!(stats.files_processed, 3);
assert_eq!(stats.files_failed, 0);
assert!(stats.total_rows > 0);
assert!(stats.total_rows >= 9);
assert!(output_path.exists());
let metadata = fs::metadata(&output_path).unwrap();
assert!(metadata.len() > 0);
}
#[tokio::test]
async fn test_multi_station_with_concurrent_processing() {
let temp_dir = TempDir::new().unwrap();
let (dataset_path, output_path) = create_multi_station_dataset(&temp_dir);
let config = MidasConfig {
max_concurrent_files: 2,
sample_size: 3,
..Default::default()
};
let mut processor = DatasetProcessor::new(dataset_path, Some(output_path.clone()))
.unwrap()
.with_config(config);
let stats = processor.process().await.unwrap();
assert_eq!(stats.files_processed, 3);
assert_eq!(stats.files_failed, 0);
assert!(stats.total_rows > 0);
assert!(output_path.exists());
}
#[tokio::test]
async fn test_large_dataset_simulation() {
let temp_dir = TempDir::new().unwrap();
let dataset_path = temp_dir.path().join("large-dataset");
let qcv_path = dataset_path.join("qcv-1");
for county in 1..=3 {
for station in 1..=4 {
let station_path = qcv_path
.join(format!("county{}", county))
.join(format!("station{}", station));
fs::create_dir_all(&station_path).unwrap();
for year in 2022..=2023 {
let csv_content = format!(
r#"observation_station,G,Test Station {}-{}
midas_station_id,G,{:05}
historic_county_name,G,county{}
location,G,{}.{},-0.{}
height,G,{}0,m
data
ob_end_ctime,prcp_amt,prcp_amt_q
{}-01-01,{}.0,0
{}-01-02,{}.5,0
{}-01-03,{}.2,0
end data"#,
county,
station,
county * 100 + station,
county,
50 + county,
station,
station,
county * 10 + station,
year,
station,
year,
station + 1,
year,
station + 2
);
fs::write(station_path.join(format!("rain_{}.csv", year)), csv_content).unwrap();
}
}
}
let output_path = temp_dir.path().join("output").join("large-dataset.parquet");
fs::create_dir_all(output_path.parent().unwrap()).unwrap();
let config = MidasConfig {
max_concurrent_files: 4,
sample_size: 5,
..Default::default()
};
let mut processor = DatasetProcessor::new(dataset_path, Some(output_path.clone()))
.unwrap()
.with_config(config);
let stats = processor.process().await.unwrap();
assert_eq!(stats.files_processed, 24);
assert_eq!(stats.files_failed, 0);
assert!(stats.total_rows > 0);
assert!(stats.total_rows >= 72);
assert!(output_path.exists());
}
#[tokio::test]
async fn test_mixed_file_sizes() {
let temp_dir = TempDir::new().unwrap();
let dataset_path = temp_dir.path().join("mixed-size-dataset");
let qcv_path = dataset_path.join("qcv-1");
let small_station_path = qcv_path.join("county1").join("small_station");
fs::create_dir_all(&small_station_path).unwrap();
let small_csv = r#"observation_station,G,Small Station
midas_station_id,G,00001
historic_county_name,G,county1
location,G,51.5,-0.1
height,G,10,m
data
ob_end_ctime,prcp_amt,prcp_amt_q
2023-01-01,1.0,0
end data"#;
fs::write(small_station_path.join("rain_2023.csv"), small_csv).unwrap();
let large_station_path = qcv_path.join("county1").join("large_station");
fs::create_dir_all(&large_station_path).unwrap();
let mut large_csv = r#"observation_station,G,Large Station
midas_station_id,G,00002
historic_county_name,G,county1
location,G,51.6,-0.2
height,G,20,m
data
ob_end_ctime,prcp_amt,prcp_amt_q
"#
.to_string();
for day in 1..=31 {
large_csv.push_str(&format!("2023-01-{:02},{}.0,0\n", day, day % 10));
}
large_csv.push_str("end data");
fs::write(large_station_path.join("rain_2023.csv"), large_csv).unwrap();
let output_path = temp_dir
.path()
.join("output")
.join("mixed-size-dataset.parquet");
fs::create_dir_all(output_path.parent().unwrap()).unwrap();
let mut processor = DatasetProcessor::new(dataset_path, Some(output_path.clone())).unwrap();
let stats = processor.process().await.unwrap();
assert_eq!(stats.files_processed, 2);
assert_eq!(stats.files_failed, 0);
assert!(stats.total_rows > 0);
assert!(stats.total_rows >= 32);
assert!(output_path.exists());
}