use object_store::{ObjectMeta, ObjectStore};
use parquet::{
arrow::{
arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions},
async_reader::{ParquetObjectReader, ParquetRecordBatchStream},
ParquetRecordBatchStreamBuilder,
},
file::{
metadata::{PageIndexPolicy, RowGroupMetaData},
statistics::Statistics,
},
};
use std::sync::Arc;
use tokio::runtime::Handle;
use crate::collection::{Bbox, OvertureMapsCollectionError, RowFilter};
pub struct RowGroupTask {
pub obj_meta: Arc<ObjectMeta>,
pub row_groups: Vec<usize>,
pub parquet_metadata: ArrowReaderMetadata,
}
impl RowGroupTask {
pub fn build_stream(
self,
row_filter: Option<&RowFilter>,
obj_store: Arc<dyn ObjectStore>,
io_handle: Handle,
) -> Result<ParquetRecordBatchStream<ParquetObjectReader>, OvertureMapsCollectionError> {
let built_predicates = row_filter
.map(|f| f.build(self.parquet_metadata.metadata().file_metadata()))
.unwrap_or(Ok(vec![]))?;
let reader = ParquetObjectReader::new(obj_store, self.obj_meta.location.clone())
.with_runtime(io_handle);
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, self.parquet_metadata)
.with_row_groups(self.row_groups)
.with_row_filter(parquet::arrow::arrow_reader::RowFilter::new(
built_predicates,
))
.build()
.map_err(|e| OvertureMapsCollectionError::ArrowReaderError { source: e })
}
}
pub async fn process_meta_obj_into_tasks(
meta: ObjectMeta,
store: Arc<dyn ObjectStore>,
io_handle: Option<Handle>,
bbox_prune: Option<Bbox>,
row_group_chunk_size: Option<usize>,
) -> Result<Vec<RowGroupTask>, OvertureMapsCollectionError> {
let opts = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let mut reader = if let Some(handle) = io_handle {
ParquetObjectReader::new(store, meta.location.clone()).with_runtime(handle)
} else {
ParquetObjectReader::new(store, meta.location.clone())
};
let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, opts)
.await
.map_err(|e| OvertureMapsCollectionError::ArrowReaderError { source: e })?;
let parquet_metadata = arrow_metadata.metadata();
let row_group_indices = bbox_prune
.as_ref()
.map(|bbox| {
let indices = prune_row_groups_by_bbox(parquet_metadata.row_groups(), bbox);
log::debug!(
"Pruned to {}/{} row groups",
indices.len(),
parquet_metadata.num_row_groups()
);
indices
})
.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
let meta_arc = Arc::new(meta);
Ok(row_group_indices
.chunks(row_group_chunk_size.unwrap_or(4))
.map(|indices| RowGroupTask {
obj_meta: meta_arc.clone(),
row_groups: indices.to_vec(),
parquet_metadata: arrow_metadata.clone(),
})
.collect())
}
fn prune_row_groups_by_bbox(
row_groups: &[RowGroupMetaData],
bbox: &crate::collection::Bbox,
) -> Vec<usize> {
row_groups
.iter()
.enumerate()
.filter(|(_, rg)| {
let mut min_xmin: Option<f32> = None;
let mut min_ymin: Option<f32> = None;
let mut max_xmax: Option<f32> = None;
let mut max_ymax: Option<f32> = None;
for cc_meta in rg.columns() {
let column_path = cc_meta.column_path();
let name_parts = column_path.parts();
if name_parts.len() != 2 {
continue;
}
if name_parts[0] != "bbox" {
continue;
}
let element = &name_parts[1];
if element == "xmin" {
min_xmin = cc_meta.statistics().and_then(|ss| match ss {
Statistics::Float(value) => value.min_opt().copied(),
Statistics::Double(value) => value.min_opt().copied().map(|v| v as f32),
_ => None,
});
} else if element == "xmax" {
max_xmax = cc_meta.statistics().and_then(|ss| match ss {
Statistics::Float(value) => value.max_opt().copied(),
Statistics::Double(value) => value.max_opt().copied().map(|v| v as f32),
_ => None,
});
} else if element == "ymin" {
min_ymin = cc_meta.statistics().and_then(|ss| match ss {
Statistics::Float(value) => value.min_opt().copied(),
Statistics::Double(value) => value.min_opt().copied().map(|v| v as f32),
_ => None,
});
} else if element == "ymax" {
max_ymax = cc_meta.statistics().and_then(|ss| match ss {
Statistics::Float(value) => value.max_opt().copied(),
Statistics::Double(value) => value.max_opt().copied().map(|v| v as f32),
_ => None,
});
}
}
let condition_1 = max_xmax.map(|xmax| xmax >= bbox.xmin).unwrap_or(true);
let condition_2 = min_xmin.map(|xmin| bbox.xmax >= xmin).unwrap_or(true);
let condition_3 = max_ymax.map(|ymax| ymax >= bbox.ymin).unwrap_or(true);
let condition_4 = min_ymin.map(|ymin| bbox.ymax >= ymin).unwrap_or(true);
condition_1 && condition_2 && condition_3 && condition_4
})
.map(|(idx, _)| idx)
.collect()
}