1use std::{collections::HashMap, io::Cursor};
2
3use anyhow::Result;
4use geo::Geometry;
5use geojson::FeatureCollection;
6
7use schemars::{JsonSchema, schema_for};
8use serde::Deserialize;
9use sqlx::types::Json;
10use url::Url;
11
12use ogcapi_drivers::{CollectionTransactions, postgres::Db};
13use ogcapi_types::{
14 common::{Collection, Crs, Exception, Extent, SpatialExtent},
15 processes::{
16 Execute, Format, InlineOrRefData, Input, InputValueNoObject, Output, Process,
17 TransmissionMode,
18 },
19};
20use wkb::Endianness;
21
22use crate::{ProcessResponseBody, Processor};
23
24#[derive(Clone)]
28pub struct GeoJsonLoader;
29
30#[derive(Deserialize, Debug, JsonSchema)]
32pub struct GeoJsonLoaderInputs {
33 pub input: String,
35
36 pub collection: String,
38
39 pub s_srs: Option<u32>,
41
42 pub database_url: String,
44}
45
46impl GeoJsonLoaderInputs {
47 pub fn execute_input(&self) -> HashMap<String, Input> {
48 let mut input = HashMap::from_iter([
49 (
50 "input".to_string(),
51 Input::InlineOrRefData(InlineOrRefData::InputValueNoObject(
52 InputValueNoObject::String(self.input.to_owned()),
53 )),
54 ),
55 (
56 "collection".to_string(),
57 Input::InlineOrRefData(InlineOrRefData::InputValueNoObject(
58 InputValueNoObject::String(self.collection.to_owned()),
59 )),
60 ),
61 (
62 "database_url".to_string(),
63 Input::InlineOrRefData(InlineOrRefData::InputValueNoObject(
64 InputValueNoObject::String(self.database_url.to_owned()),
65 )),
66 ),
67 ]);
68
69 if let Some(s_srs) = &self.s_srs {
70 input.insert(
71 "s_srs".to_owned(),
72 Input::InlineOrRefData(InlineOrRefData::InputValueNoObject(
73 InputValueNoObject::Integer(*s_srs as i64),
74 )),
75 );
76 }
77
78 input
79 }
80}
81
82#[derive(Clone, Debug, JsonSchema)]
84pub struct GeoJsonLoaderOutputs {
85 pub collection_id: String,
86}
87
88impl GeoJsonLoaderOutputs {
89 pub fn execute_output() -> HashMap<String, Output> {
90 HashMap::from([(
91 "greeting".to_string(),
92 Output {
93 format: Some(Format {
94 media_type: Some("text/plain".to_string()),
95 encoding: Some("utf8".to_string()),
96 schema: None,
97 }),
98 transmission_mode: TransmissionMode::Value,
99 },
100 )])
101 }
102}
103
104impl TryFrom<ProcessResponseBody> for GeoJsonLoaderOutputs {
105 type Error = Exception;
106
107 fn try_from(value: ProcessResponseBody) -> Result<Self, Self::Error> {
108 if let ProcessResponseBody::Requested(buf) = value {
109 Ok(GeoJsonLoaderOutputs {
110 collection_id: String::from_utf8(buf).unwrap(),
111 })
112 } else {
113 Err(Exception::new("500"))
114 }
115 }
116}
117
118#[async_trait::async_trait]
119impl Processor for GeoJsonLoader {
120 fn id(&self) -> &'static str {
121 "geojson-loader"
122 }
123
124 fn version(&self) -> &'static str {
125 "0.1.0"
126 }
127
128 fn process(&self) -> Result<Process> {
129 Process::try_new(
130 self.id(),
131 self.version(),
132 &schema_for!(GeoJsonLoaderInputs).schema,
133 &schema_for!(GeoJsonLoaderOutputs).schema,
134 )
135 .map_err(Into::into)
136 }
137
138 async fn execute(&self, execute: Execute) -> Result<ProcessResponseBody> {
139 let value = serde_json::to_value(execute.inputs)?;
140 let inputs: GeoJsonLoaderInputs = serde_json::from_value(value)?;
141
142 let db = Db::setup(&Url::parse(&inputs.database_url).unwrap()).await?;
144
145 let geojson_str = std::fs::read_to_string(&inputs.input)?;
147 let geojson = geojson_str.parse::<FeatureCollection>()?;
148
149 let collection = Collection {
151 id: inputs.collection.to_owned(),
152 item_type: Some("Feature".to_string()),
153 extent: geojson
154 .bbox
155 .map(|bbox| Extent {
156 spatial: Some(SpatialExtent {
157 bbox: vec![
158 bbox.as_slice()
159 .try_into()
160 .unwrap_or_else(|_| [-180.0, -90.0, 180.0, 90.0].into()),
161 ],
162 crs: Crs::default(),
163 }),
164 ..Default::default()
165 })
166 .or_else(|| Some(Extent::default())),
167 crs: vec![Crs::default(), Crs::from_epsg(3857), Crs::from_epsg(2056)],
168 storage_crs: Some(Crs::default()),
169 ..Default::default()
172 };
173
174 db.delete_collection(&collection.id).await?;
175 db.create_collection(&collection).await?;
176
177 let chunk_size = 1000;
179 let chunks: Vec<_> = geojson
180 .features
181 .chunks(chunk_size)
182 .enumerate()
183 .map(|(i, chunk)| {
184 let mut ids = Vec::with_capacity(chunk_size);
185 let mut properties = Vec::with_capacity(chunk_size);
186 let mut geoms = Vec::with_capacity(chunk_size);
187
188 for (ii, feature) in chunk.iter().enumerate() {
189 let id = if let Some(id) = &feature.id {
191 match id {
192 geojson::feature::Id::String(s) => s.to_owned(),
193 geojson::feature::Id::Number(n) => n.to_string(),
194 }
195 } else {
196 ((i * chunk_size) + ii).to_string()
197 };
198 ids.push(id);
199
200 let props = feature.properties.to_owned().map(Json);
202 properties.push(props);
203
204 let geom =
206 Geometry::try_from(feature.geometry.to_owned().unwrap().value).unwrap();
207
208 let mut wkb = Cursor::new(Vec::new());
209 wkb::writer::write_geometry(&mut wkb, &geom, Endianness::LittleEndian).unwrap();
210 geoms.push(wkb.into_inner());
211 }
212
213 (ids, properties, geoms)
214 })
215 .collect();
216
217 for (ids, properties, geoms) in chunks {
218 sqlx::query(&format!(
219 r#"
220 INSERT INTO items."{}" (id, properties, geom)
221 SELECT * FROM UNNEST($1::text[], $2::jsonb[], $3::bytea[])
222 "#,
223 collection.id
224 ))
225 .bind(ids)
226 .bind(properties)
227 .bind(geoms)
228 .execute(&db.pool)
229 .await?;
230 }
231 Ok(ProcessResponseBody::Requested(
232 inputs.collection.as_bytes().to_owned(),
233 ))
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use ogcapi_types::processes::Execute;
240
241 use crate::{
242 Processor,
243 geojson_loader::{GeoJsonLoader, GeoJsonLoaderInputs, GeoJsonLoaderOutputs},
244 };
245
246 #[tokio::test]
247 async fn test_loader() {
248 let loader = GeoJsonLoader;
249 assert_eq!(loader.id(), "geojson-loader");
250
251 println!(
252 "Process:\n{}",
253 serde_json::to_string_pretty(&loader.process().unwrap()).unwrap()
254 );
255
256 let input = GeoJsonLoaderInputs {
257 input: "../data/ne_10m_railroads_north_america.geojson".to_owned(),
258 collection: "streets-geojson".to_string(),
259 s_srs: None,
260 database_url: "postgresql://postgres:password@localhost:5433/ogcapi".to_string(),
261 };
262
263 let execute = Execute {
264 inputs: input.execute_input(),
265 ..Default::default()
266 };
267
268 let output: GeoJsonLoaderOutputs =
269 loader.execute(execute).await.unwrap().try_into().unwrap();
270 assert_eq!(output.collection_id, "streets-geojson");
271 }
272}