pub mod json;
use crate::{Error, Item, ItemCollection, Result};
use arrow_array::{Array, RecordBatch, RecordBatchReader, builder::BinaryBuilder, cast::AsArray};
use arrow_json::ReaderBuilder;
use arrow_schema::{DataType, Field, SchemaBuilder, SchemaRef, TimeUnit};
use geo_types::Geometry;
use geoarrow_array::{
GeoArrowArray,
array::{WkbArray, from_arrow_array},
builder::GeometryBuilder,
};
use geoarrow_schema::{GeoArrowType, GeometryType, Metadata};
use serde_json::{Value, json};
use std::{io::Cursor, sync::Arc};
pub const DATETIME_COLUMNS: [&str; 8] = [
"datetime",
"start_datetime",
"end_datetime",
"created",
"updated",
"expires",
"published",
"unpublished",
];
const DICTIONARY_COLUMNS: [&str; 3] = ["type", "stac_version", "collection"];
pub fn encode(items: Vec<Item>) -> Result<(RecordBatch, SchemaRef)> {
encode_with_options(items, Options::default())
}
pub fn encode_with_options(items: Vec<Item>, options: Options) -> Result<(RecordBatch, SchemaRef)> {
let (encoder, record_batch) = Encoder::new(items, options)?;
Ok((record_batch, encoder.into_schema()))
}
#[derive(Debug)]
pub struct Encoder {
options: Options,
base_schema: SchemaRef,
schema: SchemaRef,
}
#[derive(Debug)]
pub struct Options {
pub drop_invalid_attributes: bool,
}
#[derive(Debug)]
struct Writer {
values: Vec<Value>,
geometry_builder: GeometryBuilder,
proj_geometry_builder: BinaryBuilder,
}
impl Encoder {
pub fn new(items: Vec<Item>, options: Options) -> Result<(Encoder, RecordBatch)> {
let mut writer = Writer::new(items.len());
for result in iter_items(items, options.drop_invalid_attributes) {
writer.add(result?)?;
}
let base_schema = writer.infer_base_schema()?;
let record_batch = writer.write(base_schema.clone())?;
Ok((
Encoder {
options,
base_schema,
schema: record_batch.schema().clone(),
},
record_batch,
))
}
pub fn encode(&self, items: Vec<Item>) -> Result<RecordBatch> {
let mut writer = Writer::new(items.len());
for result in iter_items(items, self.options.drop_invalid_attributes) {
writer.add(result?)?;
}
let record_batch = writer.write(self.base_schema.clone())?;
if record_batch.schema() != self.schema {
Err(Error::ArrowSchemaMismatch)
} else {
Ok(record_batch)
}
}
pub fn into_schema(self) -> SchemaRef {
self.schema
}
}
impl Writer {
fn new(capacity: usize) -> Writer {
Writer {
values: Vec::with_capacity(capacity),
geometry_builder: GeometryBuilder::new(GeometryType::new(Default::default())),
proj_geometry_builder: BinaryBuilder::new(),
}
}
fn add(&mut self, mut value: Value) -> Result<()> {
let object = value
.as_object_mut()
.expect("a flat item should serialize to an object");
if let Some(value) = object.remove("geometry") {
let geometry: geojson::Geometry = serde_json::from_value(value)?;
self.geometry_builder
.push_geometry(Some(&(Geometry::try_from(geometry).map_err(Box::new)?)))?;
}
if let Some(value) = object.remove("proj:geometry") {
let geometry: geojson::Geometry = serde_json::from_value(value)?;
let mut cursor = Cursor::new(Vec::new());
wkb::writer::write_geometry(
&mut cursor,
&Geometry::try_from(geometry).map_err(Box::new)?,
&Default::default(),
)?;
self.proj_geometry_builder.append_value(cursor.into_inner());
}
if let Some(bbox) = object.remove("bbox") {
let bbox = convert_bbox(bbox)?;
let _ = object.insert("bbox".to_string(), bbox);
}
self.values.push(value);
Ok(())
}
fn infer_base_schema(&self) -> Result<SchemaRef> {
let schema =
arrow_json::reader::infer_json_schema_from_iterator(self.values.iter().map(Ok))?;
let mut schema_builder = SchemaBuilder::new();
for field in schema.fields().iter() {
if DATETIME_COLUMNS.contains(&field.name().as_str()) {
schema_builder.push(Field::new(
field.name(),
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
field.is_nullable(),
));
} else {
schema_builder.push(field.clone());
}
}
Ok(Arc::new(schema_builder.finish()))
}
fn write(mut self, base_schema: SchemaRef) -> Result<RecordBatch> {
let mut decoder = ReaderBuilder::new(base_schema.clone()).build_decoder()?;
decoder.serialize(&self.values)?;
let record_batch = decoder.flush()?.ok_or(Error::NoItems)?;
let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
let mut schema_builder = SchemaBuilder::new();
let mut columns = Vec::with_capacity(record_batch.num_columns());
for (i, field) in record_batch.schema().fields().iter().enumerate() {
let should_dictionary_encode = DICTIONARY_COLUMNS.contains(&field.name().as_str())
&& field.data_type() == &DataType::Utf8;
if should_dictionary_encode {
let dict_array = arrow_cast::cast(record_batch.column(i), &dict_type)?;
columns.push(dict_array);
schema_builder.push(Field::new(
field.name(),
dict_type.clone(),
field.is_nullable(),
));
} else {
columns.push(record_batch.column(i).clone());
schema_builder.push(field.as_ref().clone());
}
}
let geometry_array = self.geometry_builder.finish();
columns.push(geometry_array.to_array_ref());
schema_builder.push(geometry_array.data_type().to_field("geometry", true));
let proj_geometry_array = self.proj_geometry_builder.finish();
if !proj_geometry_array.is_empty() {
let data_type = proj_geometry_array.data_type().clone();
columns.push(Arc::new(proj_geometry_array));
schema_builder.push(Field::new("proj:geometry", data_type, true));
}
let schema = Arc::new(schema_builder.finish());
let record_batch = RecordBatch::try_new(schema, columns)?;
Ok(record_batch)
}
}
impl Default for Options {
fn default() -> Self {
Options {
drop_invalid_attributes: true,
}
}
}
fn iter_items(
items: Vec<Item>,
drop_invalid_attributes: bool,
) -> impl Iterator<Item = Result<Value>> {
items.into_iter().map(move |item| {
item.into_flat_item(drop_invalid_attributes)
.and_then(|flat_item| serde_json::to_value(flat_item).map_err(Error::from))
})
}
pub fn items_from_record_batch(record_batch: RecordBatch) -> Result<Vec<Item>> {
json::record_batch_to_json_rows(record_batch)?
.into_iter()
.map(|item| serde_json::from_value(Value::Object(item)).map_err(Error::from))
.collect()
}
pub fn from_record_batch_reader<R: RecordBatchReader>(reader: R) -> Result<ItemCollection> {
let item_collection = json::from_record_batch_reader(reader)?
.into_iter()
.map(|item| serde_json::from_value(Value::Object(item)).map_err(Error::from))
.collect::<Result<Vec<_>>>()
.map(ItemCollection::from)?;
Ok(item_collection)
}
pub fn with_native_geometry(
mut record_batch: RecordBatch,
column_name: &str,
) -> Result<RecordBatch> {
if let Some((index, _)) = record_batch.schema().column_with_name(column_name) {
let geometry_column = record_batch.remove_column(index);
let wkb_array = WkbArray::new(
geometry_column.as_binary::<i32>().clone(),
Default::default(),
);
let geometry_array = geoarrow_array::cast::from_wkb(
&wkb_array,
GeoArrowType::Geometry(GeometryType::new(Metadata::default().into())),
)?;
let mut columns = record_batch.columns().to_vec();
let mut schema_builder = SchemaBuilder::from(&*record_batch.schema());
schema_builder.push(geometry_array.data_type().to_field(column_name, true));
let schema = schema_builder.finish();
columns.push(geometry_array.to_array_ref());
record_batch = RecordBatch::try_new(schema.into(), columns)?;
}
Ok(record_batch)
}
pub fn with_wkb_geometry(mut record_batch: RecordBatch, column_name: &str) -> Result<RecordBatch> {
if let Some((index, field)) = record_batch.schema().column_with_name(column_name) {
let geometry_column = record_batch.remove_column(index);
let wkb_array = geoarrow_array::cast::to_wkb::<i32>(
from_arrow_array(&geometry_column, field)?.as_ref(),
)?;
let mut columns = record_batch.columns().to_vec();
let mut schema_builder = SchemaBuilder::from(&*record_batch.schema());
schema_builder.push(wkb_array.data_type().to_field(column_name, true));
let schema = schema_builder.finish();
columns.push(wkb_array.to_array_ref());
record_batch = RecordBatch::try_new(schema.into(), columns)?;
}
Ok(record_batch)
}
pub fn add_wkb_metadata(mut record_batch: RecordBatch, column_name: &str) -> Result<RecordBatch> {
if let Some((index, field)) = record_batch.schema().column_with_name(column_name) {
let mut metadata = field.metadata().clone();
let _ = metadata.insert(
"ARROW:extension:name".to_string(),
"geoarrow.wkb".to_string(),
);
let field = field.clone().with_metadata(metadata);
let mut schema_builder = SchemaBuilder::from(&*record_batch.schema());
let field_ref = schema_builder.field_mut(index);
*field_ref = field.into();
let schema = schema_builder.finish();
record_batch = record_batch.with_schema(schema.into())?;
}
Ok(record_batch)
}
fn convert_bbox(bbox: Value) -> Result<Value> {
let bbox = bbox
.as_array()
.expect("STAC items should always have a list as their bbox");
if bbox.len() == 4 {
Ok(json!({
"xmin": bbox[0].as_number().expect("all bbox values should be a number"),
"ymin": bbox[1].as_number().expect("all bbox values should be a number"),
"xmax": bbox[2].as_number().expect("all bbox values should be a number"),
"ymax": bbox[3].as_number().expect("all bbox values should be a number"),
}))
} else if bbox.len() == 6 {
Ok(json!({
"xmin": bbox[0].as_number().expect("all bbox values should be a number"),
"ymin": bbox[1].as_number().expect("all bbox values should be a number"),
"zmin": bbox[2].as_number().expect("all bbox values should be a number"),
"xmax": bbox[3].as_number().expect("all bbox values should be a number"),
"ymax": bbox[4].as_number().expect("all bbox values should be a number"),
"zmax": bbox[5].as_number().expect("all bbox values should be a number"),
}))
} else {
Err(Error::InvalidBbox(
bbox.iter().filter_map(|v| v.as_f64()).collect(),
"must have 4 or 6 values",
))
}
}
#[cfg(test)]
mod tests {
use super::Encoder;
use crate::{Item, ItemCollection};
use arrow_array::RecordBatchIterator;
#[test]
fn has_type() {
let item: Item = crate::read("examples/simple-item.json").unwrap();
let (_, schema) = super::encode(vec![item]).unwrap();
let _ = schema.field_with_name("type").unwrap();
}
#[test]
fn roundtrip() {
let item: Item = crate::read("examples/simple-item.json").unwrap();
let (record_batch, schema) = super::encode(vec![item]).unwrap();
let _ = super::from_record_batch_reader(RecordBatchIterator::new(
vec![record_batch].into_iter().map(Ok),
schema,
))
.unwrap();
}
#[test]
fn roundtrip_with_missing_asset() {
let item_collection: ItemCollection =
crate::read("data/two-sentinel-2-items.json").unwrap();
let (record_batch, schema) = super::encode(item_collection.items).unwrap();
let _ = super::from_record_batch_reader(RecordBatchIterator::new(
vec![record_batch].into_iter().map(Ok),
schema,
))
.unwrap();
}
#[test]
fn with_wkb_geometry() {
let item: Item = crate::read("examples/simple-item.json").unwrap();
let (record_batch, _) = super::encode(vec![item]).unwrap();
let _ = super::with_wkb_geometry(record_batch, "geometry").unwrap();
}
#[test]
fn has_proj_geometry() {
let item: Item =
crate::read("examples/extensions-collection/proj-example/proj-example.json").unwrap();
let (record_batch, _) = super::encode(vec![item]).unwrap();
assert!(
record_batch
.schema()
.column_with_name("proj:geometry")
.is_some()
);
}
#[test]
fn two_batches() {
let item: Item = crate::read("examples/simple-item.json").unwrap();
let (encoder, _) = Encoder::new(vec![item.clone()], Default::default()).unwrap();
let _ = encoder.encode(vec![item]).unwrap();
}
#[test]
fn dictionary_encoded_columns() {
use arrow_schema::DataType;
let item: Item = crate::read("examples/simple-item.json").unwrap();
let (record_batch, _) = super::encode(vec![item]).unwrap();
let schema = record_batch.schema();
for field in schema.fields() {
match field.name().as_str() {
"type" | "stac_version" | "collection" => {
assert!(
matches!(field.data_type(), DataType::Dictionary(_, _)),
"'{}' should be dictionary-encoded",
field.name()
);
}
_ => {}
}
}
}
}