bbox_feature_server/datasource/
gpkg.rs

1//! GeoPackage feature source.
2
3use crate::config::GpkgCollectionCfg;
4use crate::datasource::{
5    AutoscanCollectionDatasource, CollectionDatasource, CollectionSource, CollectionSourceCfg,
6    ConfiguredCollectionCfg, ItemsResult,
7};
8use crate::error::{self, Error, Result};
9use crate::filter_params::FilterParams;
10use crate::inventory::FeatureCollection;
11use async_trait::async_trait;
12use bbox_core::config::DsGpkgCfg;
13use bbox_core::ogcapi::*;
14use futures::TryStreamExt;
15use geozero::{geojson, wkb};
16use log::{debug, error, info, warn};
17use serde_json::json;
18use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions, SqliteRow};
19use sqlx::{Column, Row, TypeInfo};
20
21#[derive(Clone, Debug)]
22pub struct SqliteDatasource {
23    pool: SqlitePool,
24}
25
26impl SqliteDatasource {
27    pub async fn from_config(cfg: &DsGpkgCfg) -> Result<Self> {
28        let path = cfg.abs_path().to_string_lossy().to_string();
29        info!("Opening `{path}`");
30        Self::new_pool(&path).await
31    }
32    pub async fn new_pool(gpkg: &str) -> Result<Self> {
33        let conn_options = SqliteConnectOptions::new().filename(gpkg).read_only(true);
34        let pool = SqlitePoolOptions::new()
35            .min_connections(0)
36            .max_connections(8)
37            .connect_with(conn_options)
38            .await?;
39        Ok(SqliteDatasource { pool })
40    }
41}
42
43pub type Datasource = SqliteDatasource;
44
45#[async_trait]
46impl CollectionDatasource for SqliteDatasource {
47    async fn setup_collection(
48        &mut self,
49        cfg: &ConfiguredCollectionCfg,
50        extent: Option<CoreExtent>,
51    ) -> Result<FeatureCollection> {
52        info!("Setup Gpkg Collection `{}`", &cfg.name);
53        let CollectionSourceCfg::Gpkg(ref srccfg) = cfg.source else {
54            panic!();
55        };
56
57        let id = &cfg.name;
58        if srccfg.table_name.is_none() && srccfg.sql.is_none() {
59            return Err(Error::DatasourceSetupError(format!(
60                "Datasource `{id}`: configuration `table_name` or `sql` missing"
61            )));
62        } else if srccfg.table_name.is_some() && srccfg.sql.is_some() {
63            warn!("Datasource`{id}`: configuration `table_name` ignored, using `sql` instead");
64        }
65        let (pk_column, geometry_column, sql) = if let Some(table_name) = &srccfg.table_name {
66            let pk_column = srccfg
67                .fid_field
68                .clone()
69                .or(detect_pk(self, table_name).await?);
70            let geometry_column = detect_geometry(self, table_name).await?;
71            let sql = check_query(self, format!("SELECT * FROM {table_name}")).await?;
72            (pk_column, geometry_column, sql)
73        } else {
74            let pk_column = srccfg.fid_field.clone();
75            // TODO: We should also allow user queries without geometry
76            let geometry_column =
77                srccfg
78                    .geometry_field
79                    .clone()
80                    .ok_or(Error::DatasourceSetupError(format!(
81                        "Datasource `{id}`: configuration `geometry_field` missing"
82                    )))?;
83            let sql = check_query(self, srccfg.sql.clone().expect("config checked")).await?;
84            (pk_column, geometry_column, sql)
85        };
86        if pk_column.is_none() {
87            warn!("Datasource `{id}`: `fid_field` missing - single item queries will be ignored");
88        }
89        let source = GpkgCollectionSource {
90            ds: self.clone(),
91            sql,
92            geometry_column,
93            pk_column,
94        };
95
96        let collection = CoreCollection {
97            id: id.clone(),
98            title: cfg.title.clone(),
99            description: cfg.description.clone(),
100            extent,
101            item_type: None,
102            crs: vec![],
103            links: vec![ApiLink {
104                href: format!("/collections/{id}/items"),
105                rel: Some("items".to_string()),
106                type_: Some("application/geo+json".to_string()),
107                title: cfg.title.clone(),
108                hreflang: None,
109                length: None,
110            }],
111        };
112        let fc = FeatureCollection {
113            collection,
114            source: Box::new(source),
115        };
116        Ok(fc)
117    }
118}
119
120#[async_trait]
121impl AutoscanCollectionDatasource for SqliteDatasource {
122    async fn collections(&mut self) -> Result<Vec<FeatureCollection>> {
123        let mut collections = Vec::new();
124        let sql = r#"
125            SELECT contents.*
126            FROM gpkg_contents contents
127              JOIN gpkg_spatial_ref_sys refsys ON refsys.srs_id = contents.srs_id
128              --JOIN gpkg_geometry_columns geom_cols ON geom_cols.table_name = contents.table_name
129            WHERE data_type='features'
130        "#;
131        let mut rows = sqlx::query(sql).fetch(&self.pool);
132        while let Some(row) = rows.try_next().await? {
133            let table_name: &str = row.try_get("table_name")?;
134            let id = table_name.to_string();
135            let title: String = row.try_get("identifier")?;
136            let extent = CoreExtent {
137                spatial: Some(CoreExtentSpatial {
138                    bbox: vec![vec![
139                        row.try_get("min_x")?,
140                        row.try_get("min_y")?,
141                        row.try_get("max_x")?,
142                        row.try_get("max_y")?,
143                    ]],
144                    crs: None,
145                }),
146                temporal: None,
147            };
148            let coll_cfg = ConfiguredCollectionCfg {
149                source: CollectionSourceCfg::Gpkg(GpkgCollectionCfg {
150                    table_name: Some(table_name.to_string()),
151                    ..Default::default()
152                }),
153                name: id.clone(),
154                title: Some(title),
155                description: row.try_get("description")?,
156            };
157            let fc = self.setup_collection(&coll_cfg, Some(extent)).await?;
158            collections.push(fc);
159        }
160        Ok(collections)
161    }
162}
163
164#[derive(Clone, Debug)]
165pub struct GpkgCollectionSource {
166    ds: SqliteDatasource,
167    sql: String,
168    geometry_column: String,
169    // geometry_type_name: String,
170    /// Primary key column, None if multi column key.
171    pk_column: Option<String>,
172}
173
174#[async_trait]
175impl CollectionSource for GpkgCollectionSource {
176    async fn items(&self, filter: &FilterParams) -> Result<ItemsResult> {
177        let mut sql = format!(
178            "
179            WITH query AS ({sql})
180            SELECT *, count(*) OVER() AS __total_cnt FROM query",
181            sql = &self.sql
182        );
183        if let Some(_bboxstr) = &filter.bbox {
184            warn!("Ignoring bbox filter (not supported for this datasource)");
185        }
186        let limit = filter.limit_or_default();
187        if limit > 0 {
188            sql.push_str(&format!(" LIMIT {limit}"));
189        }
190        if let Some(offset) = filter.offset {
191            sql.push_str(&format!(" OFFSET {offset}"));
192        }
193        let rows = sqlx::query(&sql).fetch_all(&self.ds.pool).await?;
194        let number_matched = if let Some(row) = rows.first() {
195            row.try_get::<u32, _>("__total_cnt")? as u64
196        } else {
197            0
198        };
199        let number_returned = rows.len() as u64;
200        let items = rows
201            .iter()
202            .map(|row| row_to_feature(row, self))
203            .collect::<Result<Vec<_>>>()?;
204        let result = ItemsResult {
205            features: items,
206            number_matched,
207            number_returned,
208        };
209        Ok(result)
210    }
211
212    async fn item(&self, collection_id: &str, feature_id: &str) -> Result<Option<CoreFeature>> {
213        let Some(pk) = &self.pk_column else {
214            warn!("Ignoring error getting item for {collection_id} without single primary key");
215            return Ok(None);
216        };
217        let sql = format!(
218            "
219            WITH query AS ({sql})
220            SELECT * FROM query WHERE {pk} = ?",
221            sql = &self.sql
222        );
223        if let Some(row) = sqlx::query(&sql)
224            .bind(feature_id)
225            .fetch_optional(&self.ds.pool)
226            .await?
227        {
228            let mut item = row_to_feature(&row, self)?;
229            item.links = vec![
230                ApiLink {
231                    href: format!("/collections/{collection_id}/items/{feature_id}"),
232                    rel: Some("self".to_string()),
233                    type_: Some("application/geo+json".to_string()),
234                    title: Some("this document".to_string()),
235                    hreflang: None,
236                    length: None,
237                },
238                ApiLink {
239                    href: format!("/collections/{collection_id}"),
240                    rel: Some("collection".to_string()),
241                    type_: Some("application/geo+json".to_string()),
242                    title: Some("the collection document".to_string()),
243                    hreflang: None,
244                    length: None,
245                },
246            ];
247            Ok(Some(item))
248        } else {
249            Ok(None)
250        }
251    }
252
253    async fn queryables(&self, _collection_id: &str) -> Result<Option<Queryables>> {
254        Ok(None)
255    }
256}
257
258fn row_to_feature(row: &SqliteRow, table_info: &GpkgCollectionSource) -> Result<CoreFeature> {
259    let mut id = None;
260    let mut properties = json!({});
261    for col in row.columns() {
262        #[allow(clippy::if_same_then_else)]
263        if col.name() == table_info.geometry_column {
264            // Skip geometry
265        } else if col.name() == "__total_cnt" {
266            // Skip count
267        } else if col.name() == table_info.pk_column.as_ref().unwrap_or(&"".to_string()) {
268            // Get id as String
269            id = match col.type_info().name() {
270                "TEXT" => Some(row.try_get::<String, _>(col.ordinal())?),
271                "INTEGER" => Some(row.try_get::<i64, _>(col.ordinal())?.to_string()),
272                _ => None,
273            }
274        } else {
275            properties[col.name()] = match col.type_info().name() {
276                "TEXT" => json!(row.try_get::<&str, _>(col.ordinal())?),
277                "INTEGER" => json!(row.try_get::<i64, _>(col.ordinal())?),
278                "REAL" => json!(row.try_get::<f64, _>(col.ordinal())?),
279                "DATETIME" => json!(row.try_get::<&str, _>(col.ordinal())?),
280                ty => json!(format!("<{ty}>")),
281            }
282        }
283    }
284    let wkb: wkb::Decode<geojson::GeoJsonString> =
285        row.try_get(table_info.geometry_column.as_str())?;
286    let geom = wkb.geometry.ok_or(error::Error::GeometryFormatError)?;
287
288    let item = CoreFeature {
289        type_: "Feature".to_string(),
290        id,
291        geometry: serde_json::from_str(&geom.0).map_err(|_| error::Error::GeometryFormatError)?,
292        properties: Some(properties),
293        links: vec![],
294    };
295
296    Ok(item)
297}
298
299async fn detect_pk(ds: &SqliteDatasource, table: &str) -> Result<Option<String>> {
300    let sql = r#"
301        SELECT
302          (SELECT COUNT(*) FROM pragma_table_info(?) ti WHERE ti.pk > 0) as pksize,
303          (SELECT ti.name FROM pragma_table_info(?) ti WHERE ti.pk = 1) as pk
304    "#;
305    let row = sqlx::query(sql)
306        .bind(table)
307        .bind(table)
308        .fetch_one(&ds.pool)
309        .await?;
310    let pksize: u16 = row.try_get("pksize")?;
311    let pk_column: Option<String> = if pksize == 1 {
312        row.try_get("pk")?
313    } else {
314        None
315    };
316    Ok(pk_column)
317}
318
319async fn detect_geometry(ds: &SqliteDatasource, table: &str) -> Result<String> {
320    let sql = r#"
321        SELECT column_name, geometry_type_name
322        FROM gpkg_geometry_columns
323        WHERE table_name = ?
324    "#;
325    let row = sqlx::query(sql)
326        .bind(table)
327        // We take the first result only
328        .fetch_one(&ds.pool)
329        .await?;
330    let geometry_column: String = row.try_get("column_name")?;
331    let _geometry_type_name: String = row.try_get("geometry_type_name")?;
332    Ok(geometry_column)
333}
334
335async fn check_query(ds: &SqliteDatasource, sql: String) -> Result<String> {
336    debug!("Collection query: {sql}");
337    // TODO: prepare only
338    if let Err(e) = sqlx::query(&sql).fetch_one(&ds.pool).await {
339        error!("Error in collection query `{sql}`: {e}");
340        return Err(e.into());
341    }
342    Ok(sql)
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348
349    #[tokio::test]
350    async fn gpkg_content() {
351        let mut pool = SqliteDatasource::new_pool("../assets/ne_extracts.gpkg")
352            .await
353            .unwrap();
354        let collections = pool.collections().await.unwrap();
355        assert_eq!(collections.len(), 3);
356        assert_eq!(
357            collections
358                .iter()
359                .map(|col| col.collection.id.clone())
360                .collect::<Vec<_>>(),
361            vec![
362                "ne_10m_rivers_lake_centerlines",
363                "ne_10m_lakes",
364                "ne_10m_populated_places"
365            ]
366        );
367    }
368
369    #[tokio::test]
370    async fn gpkg_features() {
371        let filter = FilterParams::default();
372        let ds = SqliteDatasource::new_pool("../assets/ne_extracts.gpkg")
373            .await
374            .unwrap();
375        let source = GpkgCollectionSource {
376            ds,
377            sql: "SELECT * FROM ne_10m_lakes".to_string(),
378            geometry_column: "geom".to_string(),
379            pk_column: Some("fid".to_string()),
380        };
381        let items = source.items(&filter).await.unwrap();
382        assert_eq!(items.features.len(), filter.limit_or_default() as usize);
383    }
384}