use crate::error::AppError;
use duckdb::Connection;
use std::path::Path;
use tracing::info;
pub fn drop_dataset(conn: &Connection) -> Result<(), AppError> {
conn.execute_batch(
"DROP VIEW IF EXISTS data; \
DROP INDEX IF EXISTS spatial_idx; \
DROP TABLE IF EXISTS raw_data; \
DROP TABLE IF EXISTS spatial_data; \
DETACH DATABASE IF EXISTS source;",
)
.map_err(|e| AppError::Internal(anyhow::anyhow!("Dataset reset error: {}", e)))?;
Ok(())
}
pub fn stage_file(conn: &Connection, path: &Path, table: Option<&str>) -> Result<(), AppError> {
if !path.exists() {
return Err(AppError::FileNotFound(path.display().to_string()));
}
discard_stage(conn)?;
if let Err(e) = stage_file_inner(conn, path, table) {
let _ = discard_stage(conn);
return Err(e);
}
Ok(())
}
fn stage_file_inner(conn: &Connection, path: &Path, table: Option<&str>) -> Result<(), AppError> {
let extension = path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("")
.to_lowercase();
let path_str = path.to_string_lossy();
info!(ext = %extension, "staging file via DuckDB");
match extension.as_str() {
"csv" => {
conn.execute_batch(&format!(
"CREATE TABLE raw_data_stage AS SELECT ROW_NUMBER() OVER () AS rowid, * FROM read_csv_auto('{}')",
escape_sql_string(&path_str)
))
.map_err(|e| AppError::Internal(anyhow::anyhow!("CSV ingestion error: {}", e)))?;
}
"parquet" => {
conn.execute_batch(&format!(
"CREATE TABLE raw_data_stage AS SELECT ROW_NUMBER() OVER () AS rowid, * FROM read_parquet('{}')",
escape_sql_string(&path_str)
))
.map_err(|e| AppError::Internal(anyhow::anyhow!("Parquet ingestion error: {}", e)))?;
}
"geojson" | "json" => {
conn.execute_batch("INSTALL spatial; LOAD spatial;")
.map_err(|e| {
AppError::Internal(anyhow::anyhow!("Spatial extension error: {}", e))
})?;
conn.execute_batch(&format!(
"CREATE TABLE raw_data_stage AS SELECT ROW_NUMBER() OVER () AS rowid, * FROM ST_Read('{}')",
escape_sql_string(&path_str)
))
.map_err(|e| AppError::Internal(anyhow::anyhow!("GeoJSON ingestion error: {}", e)))?;
}
"duckdb" => {
let tbl = table.ok_or_else(|| {
AppError::BadRequest(
"For .duckdb files, specify --table <TABLE> to select the table".into(),
)
})?;
crate::db::validate_column_name(tbl)?;
conn.execute_batch(&format!(
"ATTACH '{}' AS source_stage (READ_ONLY);",
escape_sql_string(&path_str)
))
.map_err(|e| AppError::Internal(anyhow::anyhow!("DuckDB attach error: {}", e)))?;
conn.execute_batch(&format!(
"CREATE TABLE raw_data_stage AS SELECT ROW_NUMBER() OVER () AS rowid, * FROM source_stage.\"{}\"",
tbl
))
.map_err(|e| AppError::Internal(anyhow::anyhow!("DuckDB table read error: {}", e)))?;
conn.execute_batch("DETACH DATABASE source_stage;")
.map_err(|e| AppError::Internal(anyhow::anyhow!("DuckDB detach error: {}", e)))?;
}
_ => {
return Err(AppError::BadRequest(format!(
"Unsupported file type: {}",
extension
)));
}
}
let row_count: i64 = conn
.query_row("SELECT COUNT(*) FROM raw_data_stage", [], |row| row.get(0))
.map_err(|e| AppError::Internal(anyhow::anyhow!("COUNT error: {}", e)))?;
info!(rows = row_count, "file staged into DuckDB");
Ok(())
}
pub fn promote_stage(conn: &Connection) -> Result<(), AppError> {
drop_dataset(conn)?;
conn.execute_batch(
"ALTER TABLE raw_data_stage RENAME TO raw_data; \
CREATE VIEW data AS SELECT * FROM raw_data;",
)
.map_err(|e| AppError::Internal(anyhow::anyhow!("Promote staging error: {}", e)))?;
Ok(())
}
pub fn discard_stage(conn: &Connection) -> Result<(), AppError> {
conn.execute_batch(
"DROP TABLE IF EXISTS raw_data_stage; \
DETACH DATABASE IF EXISTS source_stage;",
)
.map_err(|e| AppError::Internal(anyhow::anyhow!("Discard staging error: {}", e)))?;
Ok(())
}
const LAT_CANDIDATES: &[&str] = &["latitude", "lat", "y", "ylat", "geo_lat"];
const LON_CANDIDATES: &[&str] = &[
"longitude",
"lon",
"lng",
"x",
"xlon",
"xlong",
"geo_lon",
"geo_lng",
];
pub fn detect_lat_lon(
col_names: &[String],
lat_override: Option<&str>,
lon_override: Option<&str>,
) -> Result<(String, String), AppError> {
let col_lower: Vec<String> = col_names.iter().map(|c| c.to_lowercase()).collect();
let lat_col = if let Some(l) = lat_override {
if !col_lower.contains(&l.to_lowercase()) {
return Err(AppError::ColumnNotFound(format!(
"'{}'. Available: {}",
l,
col_names.join(", ")
)));
}
l.to_string()
} else {
detect_column(col_names, &col_lower, LAT_CANDIDATES, "latitude")?
};
let lon_col = if let Some(l) = lon_override {
if !col_lower.contains(&l.to_lowercase()) {
return Err(AppError::ColumnNotFound(format!(
"'{}'. Available: {}",
l,
col_names.join(", ")
)));
}
l.to_string()
} else {
detect_column(col_names, &col_lower, LON_CANDIDATES, "longitude")?
};
Ok((lat_col, lon_col))
}
fn detect_column(
columns: &[String],
col_lower: &[String],
candidates: &[&str],
label: &str,
) -> Result<String, AppError> {
for candidate in candidates {
if let Some(idx) = col_lower.iter().position(|c| c == candidate) {
return Ok(columns[idx].clone());
}
}
Err(AppError::BadRequest(format!(
"Could not auto-detect {} column. Available columns: {}. Use --lat/--lon to specify.",
label,
columns.join(", ")
)))
}
pub fn add_spatial_index(
conn: &duckdb::Connection,
lat_col: &str,
lon_col: &str,
) -> Result<(), AppError> {
info!(lat = %lat_col, lon = %lon_col, "building spatial index");
crate::db::ensure_spatial(conn)?;
conn.execute_batch(&format!(
"CREATE TABLE spatial_data AS SELECT *, ST_Point(\"{lon}\", \"{lat}\") AS geom FROM raw_data WHERE \"{lat}\" IS NOT NULL AND \"{lon}\" IS NOT NULL",
lat = lat_col, lon = lon_col,
))
.map_err(|e| AppError::Internal(anyhow::anyhow!("Spatial table creation error: {}", e)))?;
conn.execute_batch("DROP TABLE raw_data")
.map_err(|e| AppError::Internal(anyhow::anyhow!("Drop raw_data error: {}", e)))?;
conn.execute_batch("ALTER TABLE spatial_data RENAME TO raw_data")
.map_err(|e| AppError::Internal(anyhow::anyhow!("Rename table error: {}", e)))?;
conn.execute_batch("CREATE INDEX spatial_idx ON raw_data USING RTREE(geom)")
.map_err(|e| AppError::Internal(anyhow::anyhow!("R-tree index error: {}", e)))?;
conn.execute_batch("DROP VIEW IF EXISTS data")
.map_err(|e| AppError::Internal(anyhow::anyhow!("Drop view error: {}", e)))?;
let mut cols = Vec::new();
let mut stmt = conn
.prepare("DESCRIBE raw_data")
.map_err(|e| AppError::Internal(anyhow::anyhow!("DESCRIBE error: {}", e)))?;
let mut rows = stmt
.query([])
.map_err(|e| AppError::Internal(anyhow::anyhow!("DESCRIBE query error: {}", e)))?;
while let Some(row) = rows
.next()
.map_err(|e| AppError::Internal(anyhow::anyhow!("DESCRIBE row error: {}", e)))?
{
let name: String = row
.get(0)
.map_err(|e| AppError::Internal(anyhow::anyhow!("Column name error: {}", e)))?;
if name != "geom" {
cols.push(format!("\"{}\"", name));
}
}
drop(rows);
drop(stmt);
conn.execute_batch(&format!(
"CREATE VIEW data AS SELECT {} FROM raw_data",
cols.join(", ")
))
.map_err(|e| AppError::Internal(anyhow::anyhow!("Recreate view error: {}", e)))?;
info!("spatial index created");
Ok(())
}
fn escape_sql_string(s: &str) -> String {
s.replace('\'', "''")
}
#[cfg(test)]
mod tests {
use super::*;
fn cols(names: &[&str]) -> Vec<String> {
names.iter().map(|s| s.to_string()).collect()
}
#[test]
fn detects_common_names_case_insensitively() {
let (lat, lon) =
detect_lat_lon(&cols(&["id", "Latitude", "Longitude"]), None, None).unwrap();
assert_eq!(lat, "Latitude");
assert_eq!(lon, "Longitude");
let (lat, lon) = detect_lat_lon(&cols(&["x", "y", "name"]), None, None).unwrap();
assert_eq!(lat, "y");
assert_eq!(lon, "x");
}
#[test]
fn honors_priority_order() {
let (lat, lon) =
detect_lat_lon(&cols(&["lat", "latitude", "lon", "longitude"]), None, None).unwrap();
assert_eq!(lat, "latitude");
assert_eq!(lon, "longitude");
}
#[test]
fn overrides_must_exist() {
assert!(detect_lat_lon(&cols(&["a", "b"]), Some("a"), Some("b")).is_ok());
assert!(detect_lat_lon(&cols(&["a", "b"]), Some("missing"), Some("b")).is_err());
}
#[test]
fn detection_fails_without_candidates() {
assert!(detect_lat_lon(&cols(&["foo", "bar"]), None, None).is_err());
}
#[test]
fn escape_doubles_single_quotes() {
assert_eq!(escape_sql_string("a'b"), "a''b");
}
}