use std::collections::HashMap;
use arrow_schema::Schema;
use datafusion_common::{DFSchema, Result};
use crate::{datatypes::SedonaType, matchers::ArgMatcher};
pub trait SedonaSchema {
fn sedona_types(&self) -> impl ExactSizeIterator<Item = Result<SedonaType>>;
fn geometry_column_indices(&self) -> Result<Vec<usize>>;
fn primary_geometry_column_index(&self) -> Result<Option<usize>>;
}
impl SedonaSchema for DFSchema {
fn sedona_types(&self) -> impl ExactSizeIterator<Item = Result<SedonaType>> {
let arrow_schema = self.as_arrow();
<Schema as SedonaSchema>::sedona_types(arrow_schema)
}
fn geometry_column_indices(&self) -> Result<Vec<usize>> {
let arrow_schema = self.as_arrow();
<Schema as SedonaSchema>::geometry_column_indices(arrow_schema)
}
fn primary_geometry_column_index(&self) -> Result<Option<usize>> {
let arrow_schema = self.as_arrow();
<Schema as SedonaSchema>::primary_geometry_column_index(arrow_schema)
}
}
impl SedonaSchema for Schema {
fn sedona_types(&self) -> impl ExactSizeIterator<Item = Result<SedonaType>> {
self.fields()
.iter()
.map(|f| SedonaType::from_storage_field(f))
}
fn geometry_column_indices(&self) -> Result<Vec<usize>> {
let mut indices = Vec::new();
let matcher = ArgMatcher::is_geometry_or_geography();
for (i, sedona_type) in self.sedona_types().enumerate() {
if matcher.match_type(&sedona_type?) {
indices.push(i);
}
}
Ok(indices)
}
fn primary_geometry_column_index(&self) -> Result<Option<usize>> {
let indices = self.geometry_column_indices()?;
let primary_index_opt =
primary_geometry_column_from_names(indices.iter().map(|i| self.field(*i).name()));
if let Some(primary_index) = primary_index_opt {
Ok(Some(indices[primary_index]))
} else {
Ok(None)
}
}
}
pub fn primary_geometry_column_from_names(
column_names: impl DoubleEndedIterator<Item = impl AsRef<str>>,
) -> Option<usize> {
let names_map = column_names
.rev()
.enumerate()
.map(|(i, name)| (name.as_ref().to_lowercase(), i))
.collect::<HashMap<_, _>>();
if names_map.is_empty() {
return None;
}
for special_name in ["geometry", "geography", "geom", "geog"] {
if let Some(i) = names_map.get(special_name) {
return Some(names_map.len() - *i - 1);
}
}
Some(0)
}
#[cfg(test)]
mod test {
use arrow_schema::{DataType, Field};
use crate::datatypes::{WKB_GEOGRAPHY, WKB_GEOMETRY};
use super::*;
#[test]
fn sedona_types() {
let schema = Schema::new(vec![
WKB_GEOGRAPHY.to_storage_field("geog", true).unwrap(),
WKB_GEOMETRY.to_storage_field("geom", true).unwrap(),
Field::new("one", DataType::Int32, true),
]);
let df_schema: DFSchema = schema.clone().try_into().unwrap();
let sedona_types = schema.sedona_types().collect::<Result<Vec<_>>>().unwrap();
assert_eq!(
sedona_types,
vec![
WKB_GEOGRAPHY,
WKB_GEOMETRY,
SedonaType::Arrow(DataType::Int32)
]
);
let sedona_types = df_schema
.sedona_types()
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(
sedona_types,
vec![
WKB_GEOGRAPHY,
WKB_GEOMETRY,
SedonaType::Arrow(DataType::Int32)
]
);
}
#[test]
fn geometry_columns() {
let schema = Schema::new(vec![Field::new("one", DataType::Int32, true)]);
let df_schema: DFSchema = schema.clone().try_into().unwrap();
assert!(schema.geometry_column_indices().unwrap().is_empty());
assert!(schema.primary_geometry_column_index().unwrap().is_none());
assert!(df_schema.geometry_column_indices().unwrap().is_empty());
assert!(df_schema.primary_geometry_column_index().unwrap().is_none());
let schema = Schema::new(vec![
WKB_GEOGRAPHY.to_storage_field("geog", true).unwrap(),
WKB_GEOMETRY.to_storage_field("geom", true).unwrap(),
]);
assert_eq!(schema.geometry_column_indices().unwrap(), vec![0, 1]);
assert_eq!(schema.primary_geometry_column_index().unwrap(), Some(1));
let schema = Schema::new(vec![WKB_GEOMETRY
.to_storage_field("name_not_special_cased", true)
.unwrap()]);
assert_eq!(schema.geometry_column_indices().unwrap(), vec![0]);
assert_eq!(schema.primary_geometry_column_index().unwrap(), Some(0));
}
}