use std::sync::{Arc, OnceLock};
use crate::{
basic::LogicalType, errors::ParquetError, geospatial::statistics::GeospatialStatistics,
schema::types::ColumnDescPtr,
};
pub fn try_new_geo_stats_accumulator(
descr: &ColumnDescPtr,
) -> Option<Box<dyn GeoStatsAccumulator>> {
if !matches!(
descr.logical_type_ref(),
Some(LogicalType::Geometry { .. }) | Some(LogicalType::Geography { .. })
) {
return None;
}
Some(
ACCUMULATOR_FACTORY
.get_or_init(|| Arc::new(DefaultGeoStatsAccumulatorFactory::default()))
.new_accumulator(descr),
)
}
pub fn init_geo_stats_accumulator_factory(
factory: Arc<dyn GeoStatsAccumulatorFactory>,
) -> Result<(), ParquetError> {
if ACCUMULATOR_FACTORY.set(factory).is_err() {
Err(ParquetError::General(
"Global GeoStatsAccumulatorFactory already set".to_string(),
))
} else {
Ok(())
}
}
static ACCUMULATOR_FACTORY: OnceLock<Arc<dyn GeoStatsAccumulatorFactory>> = OnceLock::new();
pub trait GeoStatsAccumulatorFactory: Send + Sync {
fn new_accumulator(&self, descr: &ColumnDescPtr) -> Box<dyn GeoStatsAccumulator>;
}
pub trait GeoStatsAccumulator: Send {
fn is_valid(&self) -> bool;
fn update_wkb(&mut self, wkb: &[u8]);
fn finish(&mut self) -> Option<Box<GeospatialStatistics>>;
}
#[derive(Debug, Default)]
pub struct DefaultGeoStatsAccumulatorFactory {}
impl GeoStatsAccumulatorFactory for DefaultGeoStatsAccumulatorFactory {
fn new_accumulator(&self, _descr: &ColumnDescPtr) -> Box<dyn GeoStatsAccumulator> {
#[cfg(feature = "geospatial")]
if let Some(crate::basic::LogicalType::Geometry { .. }) = _descr.logical_type_ref() {
Box::new(ParquetGeoStatsAccumulator::default())
} else {
Box::new(VoidGeoStatsAccumulator::default())
}
#[cfg(not(feature = "geospatial"))]
return Box::new(VoidGeoStatsAccumulator::default());
}
}
#[derive(Debug, Default)]
pub struct VoidGeoStatsAccumulator {}
impl GeoStatsAccumulator for VoidGeoStatsAccumulator {
fn is_valid(&self) -> bool {
false
}
fn update_wkb(&mut self, _wkb: &[u8]) {}
fn finish(&mut self) -> Option<Box<GeospatialStatistics>> {
None
}
}
#[cfg(feature = "geospatial")]
#[derive(Debug)]
pub struct ParquetGeoStatsAccumulator {
bounder: parquet_geospatial::bounding::GeometryBounder,
invalid: bool,
}
#[cfg(feature = "geospatial")]
impl Default for ParquetGeoStatsAccumulator {
fn default() -> Self {
Self {
bounder: parquet_geospatial::bounding::GeometryBounder::empty(),
invalid: false,
}
}
}
#[cfg(feature = "geospatial")]
impl GeoStatsAccumulator for ParquetGeoStatsAccumulator {
fn is_valid(&self) -> bool {
!self.invalid
}
fn update_wkb(&mut self, wkb: &[u8]) {
if self.bounder.update_wkb(wkb).is_err() {
self.invalid = true;
}
}
fn finish(&mut self) -> Option<Box<GeospatialStatistics>> {
use parquet_geospatial::interval::IntervalTrait;
use crate::geospatial::bounding_box::BoundingBox;
if self.invalid {
self.invalid = false;
self.bounder = parquet_geospatial::bounding::GeometryBounder::empty();
return None;
}
let bbox = if self.bounder.x().is_empty() || self.bounder.y().is_empty() {
None
} else {
let mut bbox = BoundingBox::new(
self.bounder.x().lo(),
self.bounder.x().hi(),
self.bounder.y().lo(),
self.bounder.y().hi(),
);
if !self.bounder.z().is_empty() {
bbox = bbox.with_zrange(self.bounder.z().lo(), self.bounder.z().hi());
}
if !self.bounder.m().is_empty() {
bbox = bbox.with_mrange(self.bounder.m().lo(), self.bounder.m().hi());
}
Some(bbox)
};
let bounder_geometry_types = self.bounder.geometry_types();
let geometry_types = if bounder_geometry_types.is_empty() {
None
} else {
Some(bounder_geometry_types)
};
self.bounder = parquet_geospatial::bounding::GeometryBounder::empty();
Some(Box::new(GeospatialStatistics::new(bbox, geometry_types)))
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_void_accumulator() {
let mut accumulator = VoidGeoStatsAccumulator {};
assert!(!accumulator.is_valid());
accumulator.update_wkb(&[0x01, 0x02, 0x03]);
assert!(accumulator.finish().is_none());
}
#[cfg(feature = "geospatial")]
#[test]
fn test_default_accumulator_geospatial_factory() {
use std::sync::Arc;
use parquet_geospatial::testing::wkb_point_xy;
use crate::{
basic::LogicalType,
geospatial::bounding_box::BoundingBox,
schema::types::{ColumnDescriptor, ColumnPath, Type},
};
let parquet_type = Type::primitive_type_builder("geom", crate::basic::Type::BYTE_ARRAY)
.with_logical_type(Some(LogicalType::Geometry { crs: None }))
.build()
.unwrap();
let column_descr =
ColumnDescriptor::new(Arc::new(parquet_type), 0, 0, ColumnPath::new(vec![]));
let mut accumulator = try_new_geo_stats_accumulator(&Arc::new(column_descr)).unwrap();
assert!(accumulator.is_valid());
accumulator.update_wkb(&wkb_point_xy(1.0, 2.0));
accumulator.update_wkb(&wkb_point_xy(11.0, 12.0));
let stats = accumulator.finish().unwrap();
assert_eq!(
stats.bounding_box().unwrap(),
&BoundingBox::new(1.0, 11.0, 2.0, 12.0)
);
let parquet_type = Type::primitive_type_builder("geom", crate::basic::Type::BYTE_ARRAY)
.with_logical_type(Some(LogicalType::Geography {
crs: None,
algorithm: None,
}))
.build()
.unwrap();
let column_descr =
ColumnDescriptor::new(Arc::new(parquet_type), 0, 0, ColumnPath::new(vec![]));
let mut accumulator = try_new_geo_stats_accumulator(&Arc::new(column_descr)).unwrap();
assert!(!accumulator.is_valid());
assert!(accumulator.finish().is_none());
let parquet_type = Type::primitive_type_builder("geom", crate::basic::Type::BYTE_ARRAY)
.build()
.unwrap();
let column_descr =
ColumnDescriptor::new(Arc::new(parquet_type), 0, 0, ColumnPath::new(vec![]));
assert!(try_new_geo_stats_accumulator(&Arc::new(column_descr)).is_none());
assert!(
init_geo_stats_accumulator_factory(Arc::new(
DefaultGeoStatsAccumulatorFactory::default()
))
.is_err()
)
}
#[cfg(feature = "geospatial")]
#[test]
fn test_geometry_accumulator() {
use parquet_geospatial::testing::{wkb_point_xy, wkb_point_xyzm};
use crate::geospatial::bounding_box::BoundingBox;
let mut accumulator = ParquetGeoStatsAccumulator::default();
assert!(accumulator.is_valid());
accumulator.update_wkb(&wkb_point_xy(1.0, 2.0));
accumulator.update_wkb(&wkb_point_xy(11.0, 12.0));
let stats = accumulator.finish().unwrap();
assert_eq!(stats.geospatial_types().unwrap(), &vec![1]);
assert_eq!(
stats.bounding_box().unwrap(),
&BoundingBox::new(1.0, 11.0, 2.0, 12.0)
);
assert!(accumulator.is_valid());
accumulator.update_wkb(&wkb_point_xy(21.0, 22.0));
accumulator.update_wkb(&wkb_point_xy(31.0, 32.0));
let stats = accumulator.finish().unwrap();
assert_eq!(stats.geospatial_types().unwrap(), &vec![1]);
assert_eq!(
stats.bounding_box().unwrap(),
&BoundingBox::new(21.0, 31.0, 22.0, 32.0)
);
assert!(accumulator.is_valid());
accumulator.update_wkb(&wkb_point_xy(41.0, 42.0));
accumulator.update_wkb("these bytes are not WKB".as_bytes());
assert!(!accumulator.is_valid());
assert!(accumulator.finish().is_none());
assert!(accumulator.is_valid());
accumulator.update_wkb(&wkb_point_xy(41.0, 42.0));
accumulator.update_wkb(&wkb_point_xy(51.0, 52.0));
let stats = accumulator.finish().unwrap();
assert_eq!(stats.geospatial_types().unwrap(), &vec![1]);
assert_eq!(
stats.bounding_box().unwrap(),
&BoundingBox::new(41.0, 51.0, 42.0, 52.0)
);
assert!(accumulator.is_valid());
let stats = accumulator.finish().unwrap();
assert!(stats.geospatial_types().is_none());
assert!(stats.bounding_box().is_none());
assert!(accumulator.is_valid());
accumulator.update_wkb(&wkb_point_xy(f64::NAN, f64::NAN));
let stats = accumulator.finish().unwrap();
assert_eq!(stats.geospatial_types().unwrap(), &vec![1]);
assert!(stats.bounding_box().is_none());
assert!(accumulator.is_valid());
accumulator.update_wkb(&wkb_point_xyzm(1.0, 2.0, 3.0, 4.0));
accumulator.update_wkb(&wkb_point_xyzm(5.0, 6.0, 7.0, 8.0));
let stats = accumulator.finish().unwrap();
assert_eq!(stats.geospatial_types().unwrap(), &vec![3001]);
assert_eq!(
stats.bounding_box().unwrap(),
&BoundingBox::new(1.0, 5.0, 2.0, 6.0)
.with_zrange(3.0, 7.0)
.with_mrange(4.0, 8.0)
);
}
}