use std::{collections::HashMap, io::Cursor};
use anyhow::Result;
use geo::Geometry;
use geojson::FeatureCollection;
use schemars::{JsonSchema, schema_for};
use serde::Deserialize;
use sqlx::types::Json;
use url::Url;
use ogcapi_drivers::{CollectionTransactions, postgres::Db};
use ogcapi_types::{
common::{Collection, Crs, Exception, Extent, SpatialExtent},
processes::{
Execute, Format, InlineOrRefData, Input, InputValueNoObject, Output, Process,
TransmissionMode,
},
};
use wkb::Endianness;
use crate::{ProcessResponseBody, Processor};
#[derive(Clone)]
pub struct GeoJsonLoader;
#[derive(Deserialize, Debug, JsonSchema)]
pub struct GeoJsonLoaderInputs {
pub input: String,
pub collection: String,
pub s_srs: Option<u32>,
pub database_url: String,
}
impl GeoJsonLoaderInputs {
pub fn execute_input(&self) -> HashMap<String, Input> {
let mut input = HashMap::from_iter([
(
"input".to_string(),
Input::InlineOrRefData(InlineOrRefData::InputValueNoObject(
InputValueNoObject::String(self.input.to_owned()),
)),
),
(
"collection".to_string(),
Input::InlineOrRefData(InlineOrRefData::InputValueNoObject(
InputValueNoObject::String(self.collection.to_owned()),
)),
),
(
"database_url".to_string(),
Input::InlineOrRefData(InlineOrRefData::InputValueNoObject(
InputValueNoObject::String(self.database_url.to_owned()),
)),
),
]);
if let Some(s_srs) = &self.s_srs {
input.insert(
"s_srs".to_owned(),
Input::InlineOrRefData(InlineOrRefData::InputValueNoObject(
InputValueNoObject::Integer(*s_srs as i64),
)),
);
}
input
}
}
#[derive(Clone, Debug, JsonSchema)]
pub struct GeoJsonLoaderOutputs {
pub collection_id: String,
}
impl GeoJsonLoaderOutputs {
pub fn execute_output() -> HashMap<String, Output> {
HashMap::from([(
"greeting".to_string(),
Output {
format: Some(Format {
media_type: Some("text/plain".to_string()),
encoding: Some("utf8".to_string()),
schema: None,
}),
transmission_mode: TransmissionMode::Value,
},
)])
}
}
impl TryFrom<ProcessResponseBody> for GeoJsonLoaderOutputs {
type Error = Exception;
fn try_from(value: ProcessResponseBody) -> Result<Self, Self::Error> {
if let ProcessResponseBody::Requested(buf) = value {
Ok(GeoJsonLoaderOutputs {
collection_id: String::from_utf8(buf).unwrap(),
})
} else {
Err(Exception::new("500"))
}
}
}
#[async_trait::async_trait]
impl Processor for GeoJsonLoader {
fn id(&self) -> &'static str {
"geojson-loader"
}
fn version(&self) -> &'static str {
"0.1.0"
}
fn process(&self) -> Result<Process> {
Process::try_new(
self.id(),
self.version(),
&schema_for!(GeoJsonLoaderInputs).schema,
&schema_for!(GeoJsonLoaderOutputs).schema,
)
.map_err(Into::into)
}
async fn execute(&self, execute: Execute) -> Result<ProcessResponseBody> {
let value = serde_json::to_value(execute.inputs)?;
let inputs: GeoJsonLoaderInputs = serde_json::from_value(value)?;
let db = Db::setup(&Url::parse(&inputs.database_url).unwrap()).await?;
let geojson_str = std::fs::read_to_string(&inputs.input)?;
let geojson = geojson_str.parse::<FeatureCollection>()?;
let collection = Collection {
id: inputs.collection.to_owned(),
item_type: Some("Feature".to_string()),
extent: geojson
.bbox
.map(|bbox| Extent {
spatial: Some(SpatialExtent {
bbox: vec![
bbox.as_slice()
.try_into()
.unwrap_or_else(|_| [-180.0, -90.0, 180.0, 90.0].into()),
],
crs: Crs::default(),
}),
..Default::default()
})
.or_else(|| Some(Extent::default())),
crs: vec![Crs::default(), Crs::from_epsg(3857), Crs::from_epsg(2056)],
storage_crs: Some(Crs::default()),
..Default::default()
};
db.delete_collection(&collection.id).await?;
db.create_collection(&collection).await?;
let chunk_size = 1000;
let chunks: Vec<_> = geojson
.features
.chunks(chunk_size)
.enumerate()
.map(|(i, chunk)| {
let mut ids = Vec::with_capacity(chunk_size);
let mut properties = Vec::with_capacity(chunk_size);
let mut geoms = Vec::with_capacity(chunk_size);
for (ii, feature) in chunk.iter().enumerate() {
let id = if let Some(id) = &feature.id {
match id {
geojson::feature::Id::String(s) => s.to_owned(),
geojson::feature::Id::Number(n) => n.to_string(),
}
} else {
((i * chunk_size) + ii).to_string()
};
ids.push(id);
let props = feature.properties.to_owned().map(Json);
properties.push(props);
let geom =
Geometry::try_from(feature.geometry.to_owned().unwrap().value).unwrap();
let mut wkb = Cursor::new(Vec::new());
wkb::writer::write_geometry(&mut wkb, &geom, Endianness::LittleEndian).unwrap();
geoms.push(wkb.into_inner());
}
(ids, properties, geoms)
})
.collect();
for (ids, properties, geoms) in chunks {
sqlx::query(&format!(
r#"
INSERT INTO items."{}" (id, properties, geom)
SELECT * FROM UNNEST($1::text[], $2::jsonb[], $3::bytea[])
"#,
collection.id
))
.bind(ids)
.bind(properties)
.bind(geoms)
.execute(&db.pool)
.await?;
}
Ok(ProcessResponseBody::Requested(
inputs.collection.as_bytes().to_owned(),
))
}
}
#[cfg(test)]
mod tests {
use ogcapi_types::processes::Execute;
use crate::{
Processor,
geojson_loader::{GeoJsonLoader, GeoJsonLoaderInputs, GeoJsonLoaderOutputs},
};
#[tokio::test]
async fn test_loader() {
let loader = GeoJsonLoader;
assert_eq!(loader.id(), "geojson-loader");
println!(
"Process:\n{}",
serde_json::to_string_pretty(&loader.process().unwrap()).unwrap()
);
let input = GeoJsonLoaderInputs {
input: "../data/ne_10m_railroads_north_america.geojson".to_owned(),
collection: "streets-geojson".to_string(),
s_srs: None,
database_url: "postgresql://postgres:password@localhost:5433/ogcapi".to_string(),
};
let execute = Execute {
inputs: input.execute_input(),
..Default::default()
};
let output: GeoJsonLoaderOutputs =
loader.execute(execute).await.unwrap().try_into().unwrap();
assert_eq!(output.collection_id, "streets-geojson");
}
}