pub mod discovery;
pub mod streaming;
pub mod writer;
#[cfg(test)]
pub mod tests;
use self::{discovery::FileDiscovery, streaming::StreamingProcessor, writer::ParquetWriter};
use indicatif::{ProgressBar, ProgressStyle};
use crate::config::MidasConfig;
use crate::error::{MidasError, Result};
use crate::header::parse_badc_header;
use crate::models::{DatasetType, ProcessingStats};
use crate::schema::SchemaManager;
use colored::*;
use futures::stream::{self, StreamExt};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
use tokio::fs;
use tokio::sync::Semaphore;
use tokio::task;
#[derive(Debug)]
pub struct DatasetProcessor {
dataset_path: PathBuf,
output_path: PathBuf,
config: MidasConfig,
file_discovery: FileDiscovery,
streaming_processor: StreamingProcessor,
parquet_writer: ParquetWriter,
schema_manager: SchemaManager,
station_count: usize,
}
impl DatasetProcessor {
pub fn new(dataset_path: PathBuf, output_path: Option<PathBuf>) -> Result<Self> {
let output_path = output_path.unwrap_or_else(|| {
let dataset_name = dataset_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string();
dataset_path
.parent()
.unwrap_or_else(|| Path::new("."))
.parent()
.unwrap_or_else(|| Path::new("."))
.join("parquet")
.join(format!("{}.parquet", dataset_name))
});
if !dataset_path.exists() {
return Err(MidasError::DatasetNotFound { path: dataset_path });
}
let config = MidasConfig::default();
let schema_manager = SchemaManager::new();
Ok(Self {
dataset_path: dataset_path.clone(),
output_path: output_path.clone(),
config: config.clone(),
file_discovery: FileDiscovery::new(dataset_path),
streaming_processor: StreamingProcessor::new(config.clone(), schema_manager.clone()),
parquet_writer: ParquetWriter::new(output_path, config),
schema_manager,
station_count: 0, })
}
pub fn with_config(mut self, config: MidasConfig) -> Self {
self.config = config.clone();
self.streaming_processor =
StreamingProcessor::new(config.clone(), self.schema_manager.clone());
self.parquet_writer = ParquetWriter::new(self.output_path.clone(), config);
self
}
pub async fn process(&mut self) -> Result<ProcessingStats> {
let start_time = Instant::now();
println!(
"{}",
"Starting MIDAS dataset processing".bright_green().bold()
);
println!(
" {} {}",
"Dataset:".bright_cyan(),
self.dataset_path.display()
);
println!(
" {} {}",
"Output:".bright_cyan(),
self.output_path.display()
);
println!("\n{}", "Discovering CSV files...".bright_yellow());
let discovery_spinner = ProgressBar::new_spinner();
discovery_spinner.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.green} [{elapsed_precise}] {msg}")
.unwrap()
.tick_chars("⠁⠂⠄⡀⢀⠠⠐⠈ "),
);
discovery_spinner.set_message("Scanning directories for CSV files...");
discovery_spinner.enable_steady_tick(std::time::Duration::from_millis(100));
let csv_files = self.file_discovery.discover_csv_files().await?;
self.station_count = self.file_discovery.station_count();
discovery_spinner.finish_with_message("CSV file discovery complete");
println!(
" Found {} CSV files from {} stations",
csv_files.len(),
self.station_count
);
if csv_files.is_empty() {
return Ok(ProcessingStats {
files_processed: 0,
files_failed: 0,
total_rows: 0,
output_path: self.output_path.clone(),
processing_time_ms: start_time.elapsed().as_millis(),
});
}
let dataset_type = self.schema_manager.detect_dataset_type(&csv_files[0])?;
println!(" {} {:?}", "Dataset type:".bright_cyan(), dataset_type);
let sample_size = self.config.sample_size.min(csv_files.len());
let sample_files: Vec<_> = csv_files.iter().take(sample_size).cloned().collect();
println!(
" {} {} sample files for schema analysis",
"Analyzing".bright_cyan(),
sample_files.len()
);
let semaphore = Arc::new(Semaphore::new(4));
let sample_headers = stream::iter(sample_files.iter())
.map(|file_path| {
let sem = semaphore.clone();
let file_path = file_path.clone();
async move {
let _permit = sem.acquire().await.ok()?;
let file_path_for_task = file_path.clone();
let header_result =
task::spawn_blocking(move || parse_badc_header(&file_path_for_task))
.await
.ok()?
.ok()?;
Some((file_path, header_result))
}
})
.buffer_unordered(4)
.collect::<Vec<_>>()
.await;
let valid_samples: Vec<_> = sample_headers.into_iter().flatten().collect();
if valid_samples.is_empty() {
return Err(MidasError::ProcessingFailed {
path: self.dataset_path.clone(),
reason: "No valid headers found in sample files".to_string(),
});
}
let valid_files: Vec<_> = valid_samples.iter().map(|(path, _)| path.clone()).collect();
if !self.config.skip_schema_validation {
println!(
" {} {} valid files for schema analysis",
"Examining".bright_cyan(),
valid_files.len()
);
self.schema_manager
.initialize_schema(dataset_type.clone(), &valid_files)
.await?;
}
if self.config.discovery_only {
println!(
"\n{}",
"Discovery mode - schema analysis complete".bright_green()
);
self.schema_manager.report_discovery(&dataset_type)?;
return Ok(ProcessingStats {
files_processed: 0,
files_failed: 0,
total_rows: 0,
output_path: self.output_path.clone(),
processing_time_ms: start_time.elapsed().as_millis(),
});
}
if let Some(parent) = self.output_path.parent() {
fs::create_dir_all(parent).await?;
}
println!("\n{}", "Processing files...".bright_yellow());
self.streaming_processor
.update_schema_manager(self.schema_manager.clone());
let use_per_station = self.station_count > 3000 ||
(dataset_type == DatasetType::Rain && self.station_count > 1000);
let stats = if use_per_station {
let station_frames = self
.streaming_processor
.process_files_by_station(&csv_files, &dataset_type)
.await?;
let files_processed = csv_files.len();
let total_rows = if !station_frames.is_empty() {
self.parquet_writer
.write_per_station_parquet_and_merge(station_frames, &dataset_type)
.await?
} else {
0
};
ProcessingStats {
files_processed,
files_failed: 0,
total_rows,
output_path: self.output_path.clone(),
processing_time_ms: 0, }
} else {
let (batches, mut stats) = self
.streaming_processor
.process_files_streaming(&csv_files, &dataset_type, &self.output_path)
.await?;
if !batches.is_empty() {
let total_rows = self
.parquet_writer
.write_final_parquet(batches, self.station_count, &dataset_type)
.await?;
stats.total_rows = total_rows;
}
stats
};
let total_time = start_time.elapsed().as_millis();
println!("\n{}", "Processing Summary".bright_green().bold());
println!(
" {} {}ms",
"Time elapsed:".bright_cyan(),
total_time.to_string().bright_white()
);
println!(
" {} {}",
"Files processed:".bright_cyan(),
stats.files_processed.to_string().bright_white()
);
if stats.files_failed > 0 {
println!(
" {} {}",
"Files failed:".bright_red(),
stats.files_failed.to_string().bright_red().bold()
);
}
println!(
" {} {}",
"Total rows:".bright_cyan(),
stats.total_rows.to_string().bright_white().bold()
);
Ok(ProcessingStats {
processing_time_ms: total_time,
..stats
})
}
}