Skip to main content

ogcapi_processes/
geojson_loader.rs

1use std::{collections::HashMap, io::Cursor};
2
3use anyhow::Result;
4use geo::Geometry;
5use geojson::FeatureCollection;
6
7use schemars::{JsonSchema, schema_for};
8use serde::Deserialize;
9use sqlx::types::Json;
10use url::Url;
11
12use ogcapi_drivers::{CollectionTransactions, postgres::Db};
13use ogcapi_types::{
14    common::{Collection, Crs, Exception, Extent, SpatialExtent},
15    processes::{
16        Execute, Format, InlineOrRefData, Input, InputValueNoObject, Output, Process,
17        TransmissionMode,
18    },
19};
20use wkb::Endianness;
21
22use crate::{ProcessResponseBody, Processor};
23
24/// GeoJson loader `Processor`
25///
26/// Process to load vector data.
27#[derive(Clone)]
28pub struct GeoJsonLoader;
29
30/// Inputs for the `geojson-loader` process
31#[derive(Deserialize, Debug, JsonSchema)]
32pub struct GeoJsonLoaderInputs {
33    /// Input file
34    pub input: String,
35
36    /// Set the collection name
37    pub collection: String,
38
39    /// Source srs, if omitted tries to derive from the input layer
40    pub s_srs: Option<u32>,
41
42    /// Postgres database url
43    pub database_url: String,
44}
45
46impl GeoJsonLoaderInputs {
47    pub fn execute_input(&self) -> HashMap<String, Input> {
48        let mut input = HashMap::from_iter([
49            (
50                "input".to_string(),
51                Input::InlineOrRefData(InlineOrRefData::InputValueNoObject(
52                    InputValueNoObject::String(self.input.to_owned()),
53                )),
54            ),
55            (
56                "collection".to_string(),
57                Input::InlineOrRefData(InlineOrRefData::InputValueNoObject(
58                    InputValueNoObject::String(self.collection.to_owned()),
59                )),
60            ),
61            (
62                "database_url".to_string(),
63                Input::InlineOrRefData(InlineOrRefData::InputValueNoObject(
64                    InputValueNoObject::String(self.database_url.to_owned()),
65                )),
66            ),
67        ]);
68
69        if let Some(s_srs) = &self.s_srs {
70            input.insert(
71                "s_srs".to_owned(),
72                Input::InlineOrRefData(InlineOrRefData::InputValueNoObject(
73                    InputValueNoObject::Integer(*s_srs as i64),
74                )),
75            );
76        }
77
78        input
79    }
80}
81
82/// Outputs for the `gdal-loader` process
83#[derive(Clone, Debug, JsonSchema)]
84pub struct GeoJsonLoaderOutputs {
85    pub collection_id: String,
86}
87
88impl GeoJsonLoaderOutputs {
89    pub fn execute_output() -> HashMap<String, Output> {
90        HashMap::from([(
91            "greeting".to_string(),
92            Output {
93                format: Some(Format {
94                    media_type: Some("text/plain".to_string()),
95                    encoding: Some("utf8".to_string()),
96                    schema: None,
97                }),
98                transmission_mode: TransmissionMode::Value,
99            },
100        )])
101    }
102}
103
104impl TryFrom<ProcessResponseBody> for GeoJsonLoaderOutputs {
105    type Error = Exception;
106
107    fn try_from(value: ProcessResponseBody) -> Result<Self, Self::Error> {
108        if let ProcessResponseBody::Requested(buf) = value {
109            Ok(GeoJsonLoaderOutputs {
110                collection_id: String::from_utf8(buf).unwrap(),
111            })
112        } else {
113            Err(Exception::new("500"))
114        }
115    }
116}
117
118#[async_trait::async_trait]
119impl Processor for GeoJsonLoader {
120    fn id(&self) -> &'static str {
121        "geojson-loader"
122    }
123
124    fn version(&self) -> &'static str {
125        "0.1.0"
126    }
127
128    fn process(&self) -> Result<Process> {
129        Process::try_new(
130            self.id(),
131            self.version(),
132            &schema_for!(GeoJsonLoaderInputs).schema,
133            &schema_for!(GeoJsonLoaderOutputs).schema,
134        )
135        .map_err(Into::into)
136    }
137
138    async fn execute(&self, execute: Execute) -> Result<ProcessResponseBody> {
139        let value = serde_json::to_value(execute.inputs)?;
140        let inputs: GeoJsonLoaderInputs = serde_json::from_value(value)?;
141
142        // Setup driver
143        let db = Db::setup(&Url::parse(&inputs.database_url).unwrap()).await?;
144
145        // Extract data
146        let geojson_str = std::fs::read_to_string(&inputs.input)?;
147        let geojson = geojson_str.parse::<FeatureCollection>()?;
148
149        // Create collection
150        let collection = Collection {
151            id: inputs.collection.to_owned(),
152            item_type: Some("Feature".to_string()),
153            extent: geojson
154                .bbox
155                .map(|bbox| Extent {
156                    spatial: Some(SpatialExtent {
157                        bbox: vec![
158                            bbox.as_slice()
159                                .try_into()
160                                .unwrap_or_else(|_| [-180.0, -90.0, 180.0, 90.0].into()),
161                        ],
162                        crs: Crs::default(),
163                    }),
164                    ..Default::default()
165                })
166                .or_else(|| Some(Extent::default())),
167            crs: vec![Crs::default(), Crs::from_epsg(3857), Crs::from_epsg(2056)],
168            storage_crs: Some(Crs::default()),
169            // #[cfg(feature = "stac")]
170            // assets: crate::asset::load_asset_from_path(&args.input).await?,
171            ..Default::default()
172        };
173
174        db.delete_collection(&collection.id).await?;
175        db.create_collection(&collection).await?;
176
177        // Load features
178        let chunk_size = 1000;
179        let chunks: Vec<_> = geojson
180            .features
181            .chunks(chunk_size)
182            .enumerate()
183            .map(|(i, chunk)| {
184                let mut ids = Vec::with_capacity(chunk_size);
185                let mut properties = Vec::with_capacity(chunk_size);
186                let mut geoms = Vec::with_capacity(chunk_size);
187
188                for (ii, feature) in chunk.iter().enumerate() {
189                    // id
190                    let id = if let Some(id) = &feature.id {
191                        match id {
192                            geojson::feature::Id::String(s) => s.to_owned(),
193                            geojson::feature::Id::Number(n) => n.to_string(),
194                        }
195                    } else {
196                        ((i * chunk_size) + ii).to_string()
197                    };
198                    ids.push(id);
199
200                    // properties
201                    let props = feature.properties.to_owned().map(Json);
202                    properties.push(props);
203
204                    // geometry
205                    let geom =
206                        Geometry::try_from(feature.geometry.to_owned().unwrap().value).unwrap();
207
208                    let mut wkb = Cursor::new(Vec::new());
209                    wkb::writer::write_geometry(&mut wkb, &geom, Endianness::LittleEndian).unwrap();
210                    geoms.push(wkb.into_inner());
211                }
212
213                (ids, properties, geoms)
214            })
215            .collect();
216
217        for (ids, properties, geoms) in chunks {
218            sqlx::query(&format!(
219                r#"
220            INSERT INTO items."{}" (id, properties, geom)
221            SELECT * FROM UNNEST($1::text[], $2::jsonb[], $3::bytea[])
222            "#,
223                collection.id
224            ))
225            .bind(ids)
226            .bind(properties)
227            .bind(geoms)
228            .execute(&db.pool)
229            .await?;
230        }
231        Ok(ProcessResponseBody::Requested(
232            inputs.collection.as_bytes().to_owned(),
233        ))
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use ogcapi_types::processes::Execute;
240
241    use crate::{
242        Processor,
243        geojson_loader::{GeoJsonLoader, GeoJsonLoaderInputs, GeoJsonLoaderOutputs},
244    };
245
246    #[tokio::test]
247    async fn test_loader() {
248        let loader = GeoJsonLoader;
249        assert_eq!(loader.id(), "geojson-loader");
250
251        println!(
252            "Process:\n{}",
253            serde_json::to_string_pretty(&loader.process().unwrap()).unwrap()
254        );
255
256        let input = GeoJsonLoaderInputs {
257            input: "../data/ne_10m_railroads_north_america.geojson".to_owned(),
258            collection: "streets-geojson".to_string(),
259            s_srs: None,
260            database_url: "postgresql://postgres:password@localhost:5433/ogcapi".to_string(),
261        };
262
263        let execute = Execute {
264            inputs: input.execute_input(),
265            ..Default::default()
266        };
267
268        let output: GeoJsonLoaderOutputs =
269            loader.execute(execute).await.unwrap().try_into().unwrap();
270        assert_eq!(output.collection_id, "streets-geojson");
271    }
272}