use super::OverlayOperation;
use crate::app::overlay::{GeometryColumnType, Grouping, OverlaySource};
use csv::{Reader, StringRecord};
use geo::Geometry;
use itertools::Itertools;
use kdam::{tqdm, BarBuilder, BarExt};
use rayon::prelude::*;
use routee_compass_core::util::{fs::read_utils, geo::PolygonalRTree};
use std::{
collections::HashMap,
fs::File,
path::Path,
sync::{Arc, Mutex},
};
pub fn run(
bambam_filepath: &str,
output_directory: &str,
overlay_source: &OverlaySource,
col_type: &GeometryColumnType,
_: &OverlayOperation,
verbose: bool,
) -> Result<(), String> {
let output_directory = Path::new(output_directory);
std::fs::create_dir_all(output_directory).map_err(|e| {
format!(
"failure creating output directory '{}': {e}",
output_directory.as_os_str().to_string_lossy()
)
})?;
let overlay_data = overlay_source.build()?;
log::info!("found {} rows in overlay dataset", overlay_data.len());
let overlay_lookup = overlay_data
.iter()
.map(|(geom, geoid)| (geoid.clone(), geom.clone()))
.collect::<HashMap<_, _>>();
let overlay: Arc<PolygonalRTree<f64, String>> = Arc::new(PolygonalRTree::new(overlay_data)?);
let mut file = std::fs::File::open(bambam_filepath)
.map_err(|e| format!("failure reading {bambam_filepath}: {e}"))?;
let mut reader = csv::ReaderBuilder::new()
.has_headers(true)
.trim(csv::Trim::Fields)
.from_reader(file);
let (header_record, headers) = build_header_lookup(&mut reader)?;
let grouped_rows: Vec<(String, (StringRecord, Geometry))> =
spatial_lookup(reader, overlay.clone(), &headers, col_type, verbose)?;
let mut grouped_lookup: HashMap<String, (Geometry, Vec<StringRecord>)> = HashMap::new();
for (grouping, (row, geom)) in grouped_rows.into_iter() {
match grouped_lookup.get_mut(&grouping) {
Some((_, v)) => v.push(row),
None => {
let geometry = overlay_lookup.get(&grouping).ok_or_else(|| {
format!(
"internal error, lookup missing geometry entry for id '{}'",
grouping
)
})?;
let _ = grouped_lookup.insert(grouping.clone(), (geometry.clone(), vec![row]));
}
}
}
let len = grouped_lookup.len();
let write_iter = tqdm!(
grouped_lookup
.into_iter()
.sorted_by_cached_key(|(k, _)| k.clone()),
desc = "writing partitioned datasets",
total = len
);
for (id, (overlay, raw_rows)) in write_iter {
let id_sani = sanitize_filename::sanitize(&id);
let out_filename = format!("{id_sani}.csv");
let out_filepath = output_directory.join(out_filename);
let mut output_writer = csv::Writer::from_path(&out_filepath).map_err(|e| {
format!(
"failure opening output file '{}': {e}",
out_filepath.as_os_str().to_string_lossy()
)
})?;
output_writer
.write_record(&header_record)
.map_err(|e| format!("failure writing header to output: {e}"))?;
for row in raw_rows.into_iter() {
output_writer
.write_record(&row)
.map_err(|e| format!("failure writing row to output: {e}"))?;
}
}
Ok(())
}
fn spatial_lookup(
reader: csv::Reader<File>,
overlay: Arc<PolygonalRTree<f64, String>>,
headers: &HashMap<String, usize>,
col_type: &GeometryColumnType,
verbose: bool,
) -> Result<Vec<(String, (csv::StringRecord, Geometry))>, String> {
let iter = tqdm!(reader.into_records(), desc = "spatial lookup");
let mut result = vec![];
for (idx, row_result) in iter.enumerate() {
let row = row_result.map_err(|e| format!("cannot read row {idx}: {e}"))?;
let point = col_type
.get_point(&row, headers)
.map_err(|e| format!("failure reading geometry from row {idx}: {e}"))?;
let found = overlay
.intersection(&point)
.map_err(|e| format!("failure running spatial intersection for row {idx}: {e}"))?
.collect_vec();
match found[..] {
[single] => result.push((single.data.clone(), (row, point))),
[] if verbose => {
log::warn!("no spatial match found for row {idx}")
}
_ if verbose => {
log::warn!(
"more than one spatial match ({} > 1) found for row {idx}",
found.len()
)
}
_ => {}
}
}
eprintln!();
Ok(result)
}
pub fn build_header_lookup(
reader: &mut Reader<File>,
) -> Result<(StringRecord, HashMap<String, usize>), String> {
let headers = reader
.headers()
.map_err(|e| format!("failure retrieving headers: {e}"))?
.clone();
let lookup: HashMap<String, usize> = headers
.iter()
.enumerate()
.map(|(idx, col)| (String::from(col), idx))
.collect::<HashMap<_, _>>();
Ok((headers, lookup))
}