bbox_feature_server/datasource/
gpkg.rs1use crate::config::GpkgCollectionCfg;
4use crate::datasource::{
5 AutoscanCollectionDatasource, CollectionDatasource, CollectionSource, CollectionSourceCfg,
6 ConfiguredCollectionCfg, ItemsResult,
7};
8use crate::error::{self, Error, Result};
9use crate::filter_params::FilterParams;
10use crate::inventory::FeatureCollection;
11use async_trait::async_trait;
12use bbox_core::config::DsGpkgCfg;
13use bbox_core::ogcapi::*;
14use futures::TryStreamExt;
15use geozero::{geojson, wkb};
16use log::{debug, error, info, warn};
17use serde_json::json;
18use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions, SqliteRow};
19use sqlx::{Column, Row, TypeInfo};
20
21#[derive(Clone, Debug)]
22pub struct SqliteDatasource {
23 pool: SqlitePool,
24}
25
26impl SqliteDatasource {
27 pub async fn from_config(cfg: &DsGpkgCfg) -> Result<Self> {
28 let path = cfg.abs_path().to_string_lossy().to_string();
29 info!("Opening `{path}`");
30 Self::new_pool(&path).await
31 }
32 pub async fn new_pool(gpkg: &str) -> Result<Self> {
33 let conn_options = SqliteConnectOptions::new().filename(gpkg).read_only(true);
34 let pool = SqlitePoolOptions::new()
35 .min_connections(0)
36 .max_connections(8)
37 .connect_with(conn_options)
38 .await?;
39 Ok(SqliteDatasource { pool })
40 }
41}
42
43pub type Datasource = SqliteDatasource;
44
45#[async_trait]
46impl CollectionDatasource for SqliteDatasource {
47 async fn setup_collection(
48 &mut self,
49 cfg: &ConfiguredCollectionCfg,
50 extent: Option<CoreExtent>,
51 ) -> Result<FeatureCollection> {
52 info!("Setup Gpkg Collection `{}`", &cfg.name);
53 let CollectionSourceCfg::Gpkg(ref srccfg) = cfg.source else {
54 panic!();
55 };
56
57 let id = &cfg.name;
58 if srccfg.table_name.is_none() && srccfg.sql.is_none() {
59 return Err(Error::DatasourceSetupError(format!(
60 "Datasource `{id}`: configuration `table_name` or `sql` missing"
61 )));
62 } else if srccfg.table_name.is_some() && srccfg.sql.is_some() {
63 warn!("Datasource`{id}`: configuration `table_name` ignored, using `sql` instead");
64 }
65 let (pk_column, geometry_column, sql) = if let Some(table_name) = &srccfg.table_name {
66 let pk_column = srccfg
67 .fid_field
68 .clone()
69 .or(detect_pk(self, table_name).await?);
70 let geometry_column = detect_geometry(self, table_name).await?;
71 let sql = check_query(self, format!("SELECT * FROM {table_name}")).await?;
72 (pk_column, geometry_column, sql)
73 } else {
74 let pk_column = srccfg.fid_field.clone();
75 let geometry_column =
77 srccfg
78 .geometry_field
79 .clone()
80 .ok_or(Error::DatasourceSetupError(format!(
81 "Datasource `{id}`: configuration `geometry_field` missing"
82 )))?;
83 let sql = check_query(self, srccfg.sql.clone().expect("config checked")).await?;
84 (pk_column, geometry_column, sql)
85 };
86 if pk_column.is_none() {
87 warn!("Datasource `{id}`: `fid_field` missing - single item queries will be ignored");
88 }
89 let source = GpkgCollectionSource {
90 ds: self.clone(),
91 sql,
92 geometry_column,
93 pk_column,
94 };
95
96 let collection = CoreCollection {
97 id: id.clone(),
98 title: cfg.title.clone(),
99 description: cfg.description.clone(),
100 extent,
101 item_type: None,
102 crs: vec![],
103 links: vec![ApiLink {
104 href: format!("/collections/{id}/items"),
105 rel: Some("items".to_string()),
106 type_: Some("application/geo+json".to_string()),
107 title: cfg.title.clone(),
108 hreflang: None,
109 length: None,
110 }],
111 };
112 let fc = FeatureCollection {
113 collection,
114 source: Box::new(source),
115 };
116 Ok(fc)
117 }
118}
119
120#[async_trait]
121impl AutoscanCollectionDatasource for SqliteDatasource {
122 async fn collections(&mut self) -> Result<Vec<FeatureCollection>> {
123 let mut collections = Vec::new();
124 let sql = r#"
125 SELECT contents.*
126 FROM gpkg_contents contents
127 JOIN gpkg_spatial_ref_sys refsys ON refsys.srs_id = contents.srs_id
128 --JOIN gpkg_geometry_columns geom_cols ON geom_cols.table_name = contents.table_name
129 WHERE data_type='features'
130 "#;
131 let mut rows = sqlx::query(sql).fetch(&self.pool);
132 while let Some(row) = rows.try_next().await? {
133 let table_name: &str = row.try_get("table_name")?;
134 let id = table_name.to_string();
135 let title: String = row.try_get("identifier")?;
136 let extent = CoreExtent {
137 spatial: Some(CoreExtentSpatial {
138 bbox: vec![vec![
139 row.try_get("min_x")?,
140 row.try_get("min_y")?,
141 row.try_get("max_x")?,
142 row.try_get("max_y")?,
143 ]],
144 crs: None,
145 }),
146 temporal: None,
147 };
148 let coll_cfg = ConfiguredCollectionCfg {
149 source: CollectionSourceCfg::Gpkg(GpkgCollectionCfg {
150 table_name: Some(table_name.to_string()),
151 ..Default::default()
152 }),
153 name: id.clone(),
154 title: Some(title),
155 description: row.try_get("description")?,
156 };
157 let fc = self.setup_collection(&coll_cfg, Some(extent)).await?;
158 collections.push(fc);
159 }
160 Ok(collections)
161 }
162}
163
164#[derive(Clone, Debug)]
165pub struct GpkgCollectionSource {
166 ds: SqliteDatasource,
167 sql: String,
168 geometry_column: String,
169 pk_column: Option<String>,
172}
173
174#[async_trait]
175impl CollectionSource for GpkgCollectionSource {
176 async fn items(&self, filter: &FilterParams) -> Result<ItemsResult> {
177 let mut sql = format!(
178 "
179 WITH query AS ({sql})
180 SELECT *, count(*) OVER() AS __total_cnt FROM query",
181 sql = &self.sql
182 );
183 if let Some(_bboxstr) = &filter.bbox {
184 warn!("Ignoring bbox filter (not supported for this datasource)");
185 }
186 let limit = filter.limit_or_default();
187 if limit > 0 {
188 sql.push_str(&format!(" LIMIT {limit}"));
189 }
190 if let Some(offset) = filter.offset {
191 sql.push_str(&format!(" OFFSET {offset}"));
192 }
193 let rows = sqlx::query(&sql).fetch_all(&self.ds.pool).await?;
194 let number_matched = if let Some(row) = rows.first() {
195 row.try_get::<u32, _>("__total_cnt")? as u64
196 } else {
197 0
198 };
199 let number_returned = rows.len() as u64;
200 let items = rows
201 .iter()
202 .map(|row| row_to_feature(row, self))
203 .collect::<Result<Vec<_>>>()?;
204 let result = ItemsResult {
205 features: items,
206 number_matched,
207 number_returned,
208 };
209 Ok(result)
210 }
211
212 async fn item(&self, collection_id: &str, feature_id: &str) -> Result<Option<CoreFeature>> {
213 let Some(pk) = &self.pk_column else {
214 warn!("Ignoring error getting item for {collection_id} without single primary key");
215 return Ok(None);
216 };
217 let sql = format!(
218 "
219 WITH query AS ({sql})
220 SELECT * FROM query WHERE {pk} = ?",
221 sql = &self.sql
222 );
223 if let Some(row) = sqlx::query(&sql)
224 .bind(feature_id)
225 .fetch_optional(&self.ds.pool)
226 .await?
227 {
228 let mut item = row_to_feature(&row, self)?;
229 item.links = vec![
230 ApiLink {
231 href: format!("/collections/{collection_id}/items/{feature_id}"),
232 rel: Some("self".to_string()),
233 type_: Some("application/geo+json".to_string()),
234 title: Some("this document".to_string()),
235 hreflang: None,
236 length: None,
237 },
238 ApiLink {
239 href: format!("/collections/{collection_id}"),
240 rel: Some("collection".to_string()),
241 type_: Some("application/geo+json".to_string()),
242 title: Some("the collection document".to_string()),
243 hreflang: None,
244 length: None,
245 },
246 ];
247 Ok(Some(item))
248 } else {
249 Ok(None)
250 }
251 }
252
253 async fn queryables(&self, _collection_id: &str) -> Result<Option<Queryables>> {
254 Ok(None)
255 }
256}
257
258fn row_to_feature(row: &SqliteRow, table_info: &GpkgCollectionSource) -> Result<CoreFeature> {
259 let mut id = None;
260 let mut properties = json!({});
261 for col in row.columns() {
262 #[allow(clippy::if_same_then_else)]
263 if col.name() == table_info.geometry_column {
264 } else if col.name() == "__total_cnt" {
266 } else if col.name() == table_info.pk_column.as_ref().unwrap_or(&"".to_string()) {
268 id = match col.type_info().name() {
270 "TEXT" => Some(row.try_get::<String, _>(col.ordinal())?),
271 "INTEGER" => Some(row.try_get::<i64, _>(col.ordinal())?.to_string()),
272 _ => None,
273 }
274 } else {
275 properties[col.name()] = match col.type_info().name() {
276 "TEXT" => json!(row.try_get::<&str, _>(col.ordinal())?),
277 "INTEGER" => json!(row.try_get::<i64, _>(col.ordinal())?),
278 "REAL" => json!(row.try_get::<f64, _>(col.ordinal())?),
279 "DATETIME" => json!(row.try_get::<&str, _>(col.ordinal())?),
280 ty => json!(format!("<{ty}>")),
281 }
282 }
283 }
284 let wkb: wkb::Decode<geojson::GeoJsonString> =
285 row.try_get(table_info.geometry_column.as_str())?;
286 let geom = wkb.geometry.ok_or(error::Error::GeometryFormatError)?;
287
288 let item = CoreFeature {
289 type_: "Feature".to_string(),
290 id,
291 geometry: serde_json::from_str(&geom.0).map_err(|_| error::Error::GeometryFormatError)?,
292 properties: Some(properties),
293 links: vec![],
294 };
295
296 Ok(item)
297}
298
299async fn detect_pk(ds: &SqliteDatasource, table: &str) -> Result<Option<String>> {
300 let sql = r#"
301 SELECT
302 (SELECT COUNT(*) FROM pragma_table_info(?) ti WHERE ti.pk > 0) as pksize,
303 (SELECT ti.name FROM pragma_table_info(?) ti WHERE ti.pk = 1) as pk
304 "#;
305 let row = sqlx::query(sql)
306 .bind(table)
307 .bind(table)
308 .fetch_one(&ds.pool)
309 .await?;
310 let pksize: u16 = row.try_get("pksize")?;
311 let pk_column: Option<String> = if pksize == 1 {
312 row.try_get("pk")?
313 } else {
314 None
315 };
316 Ok(pk_column)
317}
318
319async fn detect_geometry(ds: &SqliteDatasource, table: &str) -> Result<String> {
320 let sql = r#"
321 SELECT column_name, geometry_type_name
322 FROM gpkg_geometry_columns
323 WHERE table_name = ?
324 "#;
325 let row = sqlx::query(sql)
326 .bind(table)
327 .fetch_one(&ds.pool)
329 .await?;
330 let geometry_column: String = row.try_get("column_name")?;
331 let _geometry_type_name: String = row.try_get("geometry_type_name")?;
332 Ok(geometry_column)
333}
334
335async fn check_query(ds: &SqliteDatasource, sql: String) -> Result<String> {
336 debug!("Collection query: {sql}");
337 if let Err(e) = sqlx::query(&sql).fetch_one(&ds.pool).await {
339 error!("Error in collection query `{sql}`: {e}");
340 return Err(e.into());
341 }
342 Ok(sql)
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348
349 #[tokio::test]
350 async fn gpkg_content() {
351 let mut pool = SqliteDatasource::new_pool("../assets/ne_extracts.gpkg")
352 .await
353 .unwrap();
354 let collections = pool.collections().await.unwrap();
355 assert_eq!(collections.len(), 3);
356 assert_eq!(
357 collections
358 .iter()
359 .map(|col| col.collection.id.clone())
360 .collect::<Vec<_>>(),
361 vec![
362 "ne_10m_rivers_lake_centerlines",
363 "ne_10m_lakes",
364 "ne_10m_populated_places"
365 ]
366 );
367 }
368
369 #[tokio::test]
370 async fn gpkg_features() {
371 let filter = FilterParams::default();
372 let ds = SqliteDatasource::new_pool("../assets/ne_extracts.gpkg")
373 .await
374 .unwrap();
375 let source = GpkgCollectionSource {
376 ds,
377 sql: "SELECT * FROM ne_10m_lakes".to_string(),
378 geometry_column: "geom".to_string(),
379 pk_column: Some("fid".to_string()),
380 };
381 let items = source.items(&filter).await.unwrap();
382 assert_eq!(items.features.len(), filter.limit_or_default() as usize);
383 }
384}