use std::collections::{BTreeSet, HashMap};
use std::path::Path;
use serde_json::Value;
use crate::datasets::sodir::catalog::resolve;
use crate::datasets::sodir::client::ArcGISClient;
use crate::datasets::sodir::error::{Result, SodirError};
use crate::datasets::sodir::geojson_wkt::{
epoch_ms_to_iso_date, extract_properties, geometry_to_wkt, is_date_column, EPOCH_MS_THRESHOLD,
};
const PAGE_SIZE: usize = 1000;
const WKT_COLUMN: &str = "wkt_geometry";
pub async fn count(client: &ArcGISClient, stem: &str) -> Result<u64> {
let (base, layer_id) = resolve(stem)?;
let url = format!("{base}/{layer_id}/query?where=1%3D1&returnCountOnly=true&f=json");
let data = client.fetch_json(&url).await?;
Ok(data.get("count").and_then(Value::as_u64).unwrap_or(0))
}
pub async fn fetch_to_csv(client: &ArcGISClient, stem: &str, csv_path: &Path) -> Result<usize> {
let (base, layer_id) = resolve(stem)?;
if let Some(parent) = csv_path.parent() {
std::fs::create_dir_all(parent)?;
}
let mut rows: Vec<Vec<(String, Value)>> = Vec::new();
let mut offset = 0usize;
loop {
let url = format!(
"{base}/{layer_id}/query?where=1%3D1&outFields=*&returnGeometry=true\
&resultOffset={offset}&resultRecordCount={PAGE_SIZE}&f=geojson"
);
let data = client.fetch_json(&url).await?;
let features = match data.get("features").and_then(Value::as_array) {
Some(f) if !f.is_empty() => f.clone(),
_ => break,
};
let page_len = features.len();
for feature in features {
let mut row = extract_properties(feature.get("properties").unwrap_or(&Value::Null));
if let Some(geom) = feature.get("geometry") {
if !geom.is_null() {
let wkt = geometry_to_wkt(geom).unwrap_or_default();
row.push((WKT_COLUMN.to_string(), Value::String(wkt)));
}
}
rows.push(row);
}
if page_len < PAGE_SIZE {
break;
}
offset += PAGE_SIZE;
}
if rows.is_empty() {
let mut columns = layer_field_names(client, base, layer_id).await;
columns.push(WKT_COLUMN.to_string());
write_csv(csv_path, &columns, &[])?;
return Ok(0);
}
let mut prop_cols: BTreeSet<String> = BTreeSet::new();
let mut has_wkt = false;
for row in &rows {
for (k, _) in row {
if k == WKT_COLUMN {
has_wkt = true;
} else {
prop_cols.insert(k.clone());
}
}
}
let mut columns: Vec<String> = prop_cols.into_iter().collect();
if has_wkt {
columns.push(WKT_COLUMN.to_string());
}
let mut row_maps: Vec<HashMap<String, Value>> =
rows.into_iter().map(|r| r.into_iter().collect()).collect();
convert_date_columns(&columns, &mut row_maps);
write_csv(csv_path, &columns, &row_maps)?;
Ok(row_maps.len())
}
fn convert_date_columns(columns: &[String], rows: &mut [HashMap<String, Value>]) {
for col in columns {
if !is_date_column(col) {
continue;
}
let first = rows
.iter()
.find_map(|r| r.get(col).filter(|v| !v.is_null()));
let is_epoch_ms = matches!(
first,
Some(Value::Number(n)) if n.as_i64().is_some_and(|i| i >= EPOCH_MS_THRESHOLD)
);
if !is_epoch_ms {
continue;
}
for row in rows.iter_mut() {
if let Some(v) = row.get_mut(col) {
if let Some(ms) = v.as_i64() {
*v = epoch_ms_to_iso_date(ms)
.map(Value::String)
.unwrap_or(Value::Null);
}
}
}
}
}
async fn layer_field_names(client: &ArcGISClient, base: &str, layer_id: u32) -> Vec<String> {
let url = format!("{base}/{layer_id}?f=json");
match client.fetch_json(&url).await {
Ok(data) => data
.get("fields")
.and_then(Value::as_array)
.map(|fields| {
fields
.iter()
.filter_map(|f| f.get("name").and_then(Value::as_str).map(String::from))
.collect()
})
.unwrap_or_default(),
Err(_) => Vec::new(),
}
}
fn cell_to_string(value: Option<&Value>) -> String {
match value {
None | Some(Value::Null) => String::new(),
Some(Value::String(s)) => s.clone(),
Some(Value::Bool(b)) => b.to_string(),
Some(Value::Number(n)) => n.to_string(),
Some(other) => other.to_string(),
}
}
fn write_csv(csv_path: &Path, columns: &[String], rows: &[HashMap<String, Value>]) -> Result<()> {
let tmp = csv_path.with_extension("csv.tmp");
{
let mut wtr = csv::WriterBuilder::new()
.quote_style(csv::QuoteStyle::Necessary)
.from_path(&tmp)
.map_err(|e| SodirError::Csv(format!("open {}: {e}", tmp.display())))?;
wtr.write_record(columns)
.map_err(|e| SodirError::Csv(format!("write header: {e}")))?;
for row in rows {
let record: Vec<String> = columns.iter().map(|c| cell_to_string(row.get(c))).collect();
wtr.write_record(&record)
.map_err(|e| SodirError::Csv(format!("write row: {e}")))?;
}
wtr.flush()?;
}
std::fs::rename(&tmp, csv_path)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn cell_rendering() {
assert_eq!(cell_to_string(None), "");
assert_eq!(cell_to_string(Some(&Value::Null)), "");
assert_eq!(cell_to_string(Some(&json!("hi"))), "hi");
assert_eq!(cell_to_string(Some(&json!(42))), "42");
assert_eq!(cell_to_string(Some(&json!(true))), "true");
}
#[test]
fn date_columns_converted_in_place() {
let cols = vec!["wlbEntryDate".to_string(), "wlbName".to_string()];
let mut rows = vec![
HashMap::from([
("wlbEntryDate".to_string(), json!(946_684_800_000_i64)),
("wlbName".to_string(), json!("1/2-3")),
]),
HashMap::from([
("wlbEntryDate".to_string(), json!(978_307_200_000_i64)),
("wlbName".to_string(), json!("4/5-6")),
]),
];
convert_date_columns(&cols, &mut rows);
assert_eq!(rows[0]["wlbEntryDate"], json!("2000-01-01"));
assert_eq!(rows[1]["wlbEntryDate"], json!("2001-01-01"));
assert_eq!(rows[0]["wlbName"], json!("1/2-3"));
}
#[test]
fn small_integers_in_date_column_left_alone() {
let cols = vec!["seqNoFrom".to_string()];
let mut rows = vec![HashMap::from([("seqNoFrom".to_string(), json!(3))])];
convert_date_columns(&cols, &mut rows);
assert_eq!(rows[0]["seqNoFrom"], json!(3));
}
}