bbox_feature_server/datasource/
postgis.rs

1//! PostGIS feature source.
2
3use crate::config::PostgisCollectionCfg;
4use crate::datasource::{
5    AutoscanCollectionDatasource, CollectionDatasource, CollectionSource, CollectionSourceCfg,
6    ConfiguredCollectionCfg, ItemsResult,
7};
8use crate::error::{Error, Result};
9use crate::filter_params::{FilterParams, TemporalType};
10use crate::inventory::FeatureCollection;
11use async_trait::async_trait;
12use bbox_core::ogcapi::*;
13use bbox_core::pg_ds::PgDatasource;
14use chrono::DateTime;
15use futures::TryStreamExt;
16use log::{debug, error, info, warn};
17use sqlx::postgres::PgTypeInfo;
18use sqlx::{postgres::PgRow, Column, Postgres, QueryBuilder, Row};
19use std::collections::HashMap;
20
21pub type Datasource = PgDatasource;
22
23#[async_trait]
24impl CollectionDatasource for PgDatasource {
25    async fn setup_collection(
26        &mut self,
27        cfg: &ConfiguredCollectionCfg,
28        _extent: Option<CoreExtent>,
29    ) -> Result<FeatureCollection> {
30        info!("Setup Postgis Collection `{}`", &cfg.name);
31        let CollectionSourceCfg::Postgis(ref srccfg) = cfg.source else {
32            panic!();
33        };
34
35        let id = &cfg.name;
36        if srccfg.table_name.is_none() && srccfg.sql.is_none() {
37            return Err(Error::DatasourceSetupError(format!(
38                "Datasource `{id}`: configuration `table_name` or `sql` missing"
39            )));
40        } else if srccfg.table_name.is_some() && srccfg.sql.is_some() {
41            warn!("Datasource`{id}`: configuration `table_name` ignored, using `sql` instead");
42        }
43        let temporal_column = srccfg.temporal_field.clone();
44        let temporal_end_column = srccfg.temporal_end_field.clone();
45        let (pk_column, geometry_column, sql) = if let Some(table_name) = &srccfg.table_name {
46            let public = "public".to_string();
47            let table_schema = srccfg.table_schema.as_ref().unwrap_or(&public);
48            let pk_column = srccfg
49                .fid_field
50                .clone()
51                .or(detect_pk(self, table_schema, table_name).await?);
52            let geometry_column = detect_geometry(self, table_schema, table_name).await?;
53            let sql = check_query(
54                self,
55                format!(r#"SELECT * FROM "{table_schema}"."{table_name}""#),
56            )
57            .await?;
58            (pk_column, geometry_column, sql)
59        } else {
60            let pk_column = srccfg.fid_field.clone();
61            // TODO: We should also allow user queries without geometry
62            let geometry_column =
63                srccfg
64                    .geometry_field
65                    .clone()
66                    .ok_or(Error::DatasourceSetupError(format!(
67                        "Datasource `{id}`: configuration `geometry_field` missing"
68                    )))?;
69            let sql = check_query(self, srccfg.sql.clone().expect("config checked")).await?;
70            (pk_column, geometry_column, sql)
71        };
72        if pk_column.is_none() {
73            warn!("Datasource `{id}`: `fid_field` missing - single item queries will be ignored");
74        }
75        let mut queryable_fields = srccfg.queryable_fields.clone();
76        if let Some(ref t) = temporal_column {
77            queryable_fields.push(t.clone());
78        }
79        if let Some(ref t) = temporal_end_column {
80            queryable_fields.push(t.clone());
81        }
82        let queryables_types = get_column_info(self, &sql, Some(&queryable_fields)).await?;
83        let mut other_columns = HashMap::new();
84        for (k, v) in &queryables_types {
85            let queryable_type = match v.to_string().as_str() {
86                "TEXT" | "VARCHAR" | "CHAR" => QueryableType::String,
87                "INT4" | "INT8" => QueryableType::Integer,
88                "FLOAT4" | "FLOAT8" => QueryableType::Number,
89                "TIMESTAMP" | "TIMESTAMPTZ" => QueryableType::Datetime,
90                "BOOL" => QueryableType::Bool,
91                _ => {
92                    return Err(Error::DatasourceSetupError(format!(
93                        "{k} has a postgres type {v} which is not currently handled and can't be used a queryable"
94                    )))
95                }
96            };
97            other_columns.insert(k.clone(), queryable_type);
98        }
99
100        let source = PgCollectionSource {
101            ds: self.clone(),
102            sql,
103            geometry_column,
104            pk_column,
105            temporal_column,
106            temporal_end_column,
107            other_columns,
108        };
109
110        let bbox = source
111            .query_bbox()
112            .await
113            .unwrap_or(vec![-180.0, -90.0, 180.0, 90.0]);
114
115        let mut collection = CoreCollection {
116            id: id.clone(),
117            title: Some(id.clone()),
118            description: cfg.description.clone(),
119            extent: Some(CoreExtent {
120                spatial: Some(CoreExtentSpatial {
121                    bbox: vec![bbox],
122                    crs: None,
123                }),
124                temporal: None,
125            }),
126            item_type: None,
127            crs: vec![],
128            links: vec![ApiLink {
129                href: format!("/collections/{id}/items"),
130                rel: Some("items".to_string()),
131                type_: Some("application/geo+json".to_string()),
132                title: Some(id.clone()),
133                hreflang: None,
134                length: None,
135            }],
136        };
137
138        if !queryable_fields.is_empty() {
139            collection.links.push(ApiLink {
140                href: format!("/collections/{id}/queryables"),
141                rel: Some("http://www.opengis.net/def/rel/ogc/1.0/queryables".to_string()),
142                type_: Some("application/schema+json".to_string()),
143                title: Some(id.clone()),
144                hreflang: None,
145                length: None,
146            })
147        }
148
149        let fc = FeatureCollection {
150            collection,
151            source: Box::new(source),
152        };
153        Ok(fc)
154    }
155}
156
157#[async_trait]
158impl AutoscanCollectionDatasource for PgDatasource {
159    async fn collections(&mut self) -> Result<Vec<FeatureCollection>> {
160        let mut collections = Vec::new();
161        let sql = r#"
162            SELECT contents.*
163            FROM geometry_columns contents
164              JOIN spatial_ref_sys refsys ON refsys.srid = contents.srid
165            WHERE f_table_schema = 'public'
166        "#;
167        let mut rows = sqlx::query(sql).fetch(&self.pool);
168        while let Some(row) = rows.try_next().await? {
169            let table_schema: String = row.try_get("f_table_schema")?;
170            let table_name: String = row.try_get("f_table_name")?;
171            let coll_cfg = ConfiguredCollectionCfg {
172                source: CollectionSourceCfg::Postgis(PostgisCollectionCfg {
173                    table_schema: Some(table_schema),
174                    table_name: Some(table_name.clone()),
175                    ..Default::default()
176                }),
177                name: table_name.clone(),
178                title: Some(table_name),
179                description: None,
180            };
181            let fc = self.setup_collection(&coll_cfg, None).await?;
182            collections.push(fc);
183        }
184        Ok(collections)
185    }
186}
187
188#[derive(Clone, Debug)]
189pub struct PgCollectionSource {
190    ds: PgDatasource,
191    sql: String,
192    geometry_column: String,
193    /// Primary key column, None if multi column key.
194    pk_column: Option<String>,
195    temporal_column: Option<String>,
196    temporal_end_column: Option<String>,
197    /// Queriable columns.
198    other_columns: HashMap<String, QueryableType>,
199}
200
201#[async_trait]
202impl CollectionSource for PgCollectionSource {
203    async fn items(&self, filter: &FilterParams) -> Result<ItemsResult> {
204        let geometry_column = &self.geometry_column;
205        let temporal_column = &self.temporal_column;
206        let mut builder: QueryBuilder<Postgres> =
207            QueryBuilder::new(format!("WITH query AS ({sql})\n", sql = &self.sql));
208        let select_sql = if let Some(pk) = &self.pk_column {
209            format!(
210                r#"SELECT to_jsonb(t.*)-'{geometry_column}'-'{pk}' AS properties, ST_AsGeoJSON({geometry_column})::jsonb AS geometry,
211                    "{pk}"::varchar AS pk,
212                      count(*) OVER () AS __total_cnt 
213                   FROM query t"#,
214            )
215        } else {
216            format!(
217                r#"SELECT to_jsonb(t.*)-'{geometry_column}' AS properties, ST_AsGeoJSON({geometry_column})::jsonb AS geometry,
218                      NULL AS pk,
219                      --row_number() OVER () ::varchar AS pk,
220                      count(*) OVER () AS __total_cnt 
221               FROM query t"#,
222            )
223        };
224        builder.push(&select_sql);
225        let mut where_term = false;
226        match filter.bbox() {
227            Ok(Some(bbox)) => {
228                builder.push(format!(" WHERE ( {geometry_column} && ST_MakeEnvelope("));
229                let mut separated = builder.separated(",");
230                separated.push_bind(bbox[0]);
231                separated.push_bind(bbox[1]);
232                separated.push_bind(bbox[2]);
233                separated.push_bind(bbox[3]);
234                builder.push(") ) ");
235                where_term = true;
236            }
237            Ok(None) => {}
238            Err(e) => {
239                error!("Ignoring invalid bbox: {e}");
240                return Err(Error::QueryParams);
241            }
242        }
243        if let Some(temporal_column) = temporal_column {
244            let temporal_end_column = self.temporal_end_column.as_ref().unwrap_or(temporal_column);
245            match filter.temporal() {
246                Ok(Some(parts)) => {
247                    if where_term {
248                        builder.push(" AND ");
249                    } else {
250                        builder.push(" WHERE ");
251                        where_term = true;
252                    }
253                    if parts.len() == 1 {
254                        if let TemporalType::DateTime(dt) = parts[0] {
255                            builder.push(format!(" {temporal_column} = ",));
256                            builder.push_bind(dt);
257                            debug!("{temporal_column} = {}", dt);
258                        }
259                    } else {
260                        match parts[0] {
261                            TemporalType::Open => match parts[1] {
262                                TemporalType::Open => {
263                                    error!("Open to Open datetimes doesn't make sense");
264                                    return Err(Error::QueryParams);
265                                }
266                                TemporalType::DateTime(dt) => {
267                                    builder.push(format!(" {temporal_column} <= ",));
268                                    builder.push_bind(dt);
269                                    debug!("{temporal_column} <= {}", dt);
270                                }
271                            },
272                            TemporalType::DateTime(dt1) => match parts[1] {
273                                TemporalType::Open => {
274                                    builder.push(format!(" {temporal_column} >= ",));
275                                    builder.push_bind(dt1);
276                                    debug!("{temporal_column} >= {}", dt1);
277                                }
278                                TemporalType::DateTime(dt2) => {
279                                    builder.push(format!(" {temporal_column} >= "));
280                                    builder.push_bind(dt1);
281                                    debug!("{temporal_column} >= {}", dt1);
282                                    builder.push(format!(" and {temporal_end_column} <= ",));
283                                    builder.push_bind(dt2);
284                                    debug!("{temporal_column} <= {}", dt2);
285                                }
286                            },
287                        }
288                    }
289                }
290                Ok(None) => {}
291                Err(e) => {
292                    error!("Ignoring invalid temporal field: {e}");
293                    return Err(Error::QueryParams);
294                }
295            }
296        }
297
298        match filter.other_params() {
299            Ok(others) => {
300                if !others.is_empty() {
301                    if where_term {
302                        builder.push(" AND ");
303                    } else {
304                        builder.push(" WHERE ");
305                    }
306                }
307                let mut separated = builder.separated(" AND ");
308                for (key, val) in others {
309                    // check if the passed in field matches queryables
310                    // detect if value has wildcards
311                    if let Some((k, v)) = self.other_columns.get_key_value(key) {
312                        if val.rfind('*').is_some() {
313                            separated.push(format!("{k}::text like "));
314                            let val = val.replace('*', "%");
315                            debug!("{k}::text like {val}");
316                            separated.push_bind_unseparated(val);
317                        } else {
318                            separated.push(format!("{k}="));
319                            debug!("{k} = {val}");
320                            match v {
321                                QueryableType::String => separated.push_bind_unseparated(val),
322                                QueryableType::Integer => separated.push_bind_unseparated(
323                                    val.parse::<i64>().map_err(|_| Error::QueryParams)?,
324                                ),
325                                QueryableType::Number => separated.push_bind_unseparated(
326                                    val.parse::<f64>().map_err(|_| Error::QueryParams)?,
327                                ),
328                                QueryableType::Bool => separated.push_bind_unseparated(
329                                    val.parse::<bool>().map_err(|_| Error::QueryParams)?,
330                                ),
331                                QueryableType::Datetime => separated.push_bind_unseparated(
332                                    DateTime::parse_from_rfc3339(val)
333                                        .map_err(|_| Error::QueryParams)?,
334                                ),
335                            };
336                        }
337                    } else {
338                        error!("Invalid query param {key}");
339                        return Err(Error::QueryParams);
340                    }
341                }
342            }
343            Err(e) => {
344                error!("{e}");
345                return Err(Error::QueryParams);
346            }
347        }
348        let limit = filter.limit_or_default();
349        if limit > 0 {
350            builder.push(" LIMIT ");
351            debug!("LIMIT {limit}");
352            builder.push_bind(limit as i64);
353        }
354        if let Some(offset) = filter.offset {
355            builder.push(" OFFSET ");
356            debug!("OFFSET {offset}");
357            builder.push_bind(offset as i64);
358        }
359        debug!("SQL: {}", builder.sql());
360        let query = builder.build();
361        let rows = query.fetch_all(&self.ds.pool).await?;
362        let number_matched = if let Some(row) = rows.first() {
363            row.try_get::<i64, _>("__total_cnt")? as u64
364        } else {
365            0
366        };
367        let number_returned = rows.len() as u64;
368        let items = rows
369            .iter()
370            .map(|row| row_to_feature(row, self))
371            .collect::<Result<Vec<_>>>()?;
372        let result = ItemsResult {
373            features: items,
374            number_matched,
375            number_returned,
376        };
377        Ok(result)
378    }
379
380    async fn item(&self, collection_id: &str, feature_id: &str) -> Result<Option<CoreFeature>> {
381        let Some(pk) = &self.pk_column else {
382            warn!("Ignoring error getting item for {collection_id} without single primary key");
383            return Ok(None);
384        };
385        let sql = format!(
386            r#"
387            WITH query AS ({sql})
388            SELECT to_jsonb(t.*)-'{geometry_column}'-'{pk}' AS properties, ST_AsGeoJSON({geometry_column})::jsonb AS geometry,
389                "{pk}"::varchar AS pk
390               FROM query t
391               WHERE {pk}::varchar = '{feature_id}'"#,
392            sql = &self.sql,
393            geometry_column = &self.geometry_column,
394        );
395        if let Some(row) = sqlx::query(&sql)
396            // .bind(feature_id)
397            .fetch_optional(&self.ds.pool)
398            .await?
399        {
400            let mut item = row_to_feature(&row, self)?;
401            item.links = vec![
402                ApiLink {
403                    href: format!("/collections/{collection_id}/items/{feature_id}"),
404                    rel: Some("self".to_string()),
405                    type_: Some("application/geo+json".to_string()),
406                    title: Some("this document".to_string()),
407                    hreflang: None,
408                    length: None,
409                },
410                ApiLink {
411                    href: format!("/collections/{collection_id}"),
412                    rel: Some("collection".to_string()),
413                    type_: Some("application/geo+json".to_string()),
414                    title: Some("the collection document".to_string()),
415                    hreflang: None,
416                    length: None,
417                },
418            ];
419            Ok(Some(item))
420        } else {
421            Ok(None)
422        }
423    }
424    async fn queryables(&self, collection_id: &str) -> Result<Option<Queryables>> {
425        let properties: HashMap<String, QueryableProperty> = self
426            .other_columns
427            .iter()
428            .map(|s| {
429                let title = s.0.to_string();
430                (
431                    title.clone(),
432                    QueryableProperty {
433                        title: Some(title),
434                        type_: Some(s.1.clone()),
435                        format: None,
436                    },
437                )
438            })
439            .collect();
440        Ok(Some(Queryables {
441            id: format!("/collections/{collection_id}/queryables"),
442            title: Some(collection_id.to_string()),
443            schema: "http://json-schema.org/draft/2019-09/schema".to_string(),
444            type_: "object".to_string(),
445            properties,
446        }))
447    }
448}
449
450fn row_to_feature(row: &PgRow, _table_info: &PgCollectionSource) -> Result<CoreFeature> {
451    let properties: serde_json::Value = row.try_get("properties")?;
452    // properties[col.name()] = match col.type_info().name() {
453    //     "VARCHAR"|"TEXT" => json!(row.try_get::<Option<&str>, _>(col.ordinal())?),
454    //     "INT4" => json!(row.try_get::<Option<i32>, _>(col.ordinal())?),
455    //     "INT8" => json!(row.try_get::<Option<i64>, _>(col.ordinal())?),
456    //     "FLOAT4" => json!(row.try_get::<Option<f32>, _>(col.ordinal())?),
457    //     "FLOAT8" => json!(row.try_get::<Option<f64>, _>(col.ordinal())?),
458    //     ty => json!(format!("<{ty}>")),
459    // }
460    let geometry: serde_json::Value = row.try_get("geometry")?;
461    // ERROR:  lwgeom_to_geojson: 'CurvePolygon' geometry type not supported
462    let id: Option<String> = row.try_get("pk")?;
463
464    let item = CoreFeature {
465        type_: "Feature".to_string(),
466        id,
467        geometry,
468        properties: Some(properties),
469        links: vec![],
470    };
471
472    Ok(item)
473}
474
475impl PgCollectionSource {
476    async fn query_bbox(&self) -> Result<Vec<f64>> {
477        // TODO: Transform to WGS84, if necessary
478        let sql = &format!(
479            r#"
480        WITH query AS ({sql}),
481        extent AS (
482          SELECT ST_Extent("{geometry_column}") AS bbox
483          FROM query
484        )
485        SELECT ST_XMin(bbox), ST_YMin(bbox), ST_XMax(bbox), ST_YMax(bbox)
486        FROM extent
487    "#,
488            sql = &self.sql,
489            geometry_column = &self.geometry_column,
490        );
491        let row = sqlx::query(sql).fetch_one(&self.ds.pool).await?;
492        let extent: Vec<f64> = vec![
493            row.try_get(0)?,
494            row.try_get(1)?,
495            row.try_get(2)?,
496            row.try_get(3)?,
497        ];
498        Ok(extent)
499    }
500}
501
502async fn detect_pk(ds: &PgDatasource, schema: &str, table: &str) -> Result<Option<String>> {
503    let sql = &format!(
504        r#"
505        WITH pkeys AS (
506            SELECT a.attname
507            FROM   pg_index i
508            JOIN   pg_attribute a ON a.attrelid = i.indrelid
509                                 AND a.attnum = ANY(i.indkey)
510            WHERE  i.indrelid = '{schema}.{table}'::regclass
511            AND    i.indisprimary
512        )
513        SELECT
514          (SELECT COUNT(*) FROM pkeys) AS pksize,
515          (SELECT attname FROM pkeys LIMIT 1) AS pk
516        "#
517    );
518    let row = sqlx::query(sql).fetch_one(&ds.pool).await?;
519    let pksize: i64 = row.try_get("pksize")?;
520    let pk_column: Option<String> = if pksize == 1 {
521        row.try_get("pk")?
522    } else {
523        None
524    };
525    Ok(pk_column)
526}
527
528async fn detect_geometry(ds: &PgDatasource, schema: &str, table: &str) -> Result<String> {
529    let sql = &format!(
530        r#"
531        SELECT f_geometry_column
532        FROM geometry_columns
533          JOIN spatial_ref_sys refsys ON refsys.srid = geometry_columns.srid
534        WHERE f_table_schema = '{schema}' AND f_table_name = '{table}'
535        "#
536    );
537    let row = sqlx::query(sql)
538        // .bind(schema)
539        // .bind(table)
540        // We take the first result only
541        .fetch_one(&ds.pool)
542        .await?;
543    let geometry_column: String = row.try_get("f_geometry_column")?;
544    Ok(geometry_column)
545}
546
547async fn check_query(ds: &PgDatasource, sql: String) -> Result<String> {
548    debug!("Collection query: {sql}");
549    if let Err(e) = sqlx::query(&sql).fetch_one(&ds.pool).await {
550        error!("Error in collection query `{sql}`: {e}");
551        return Err(e.into());
552    }
553    Ok(sql)
554}
555
556async fn get_column_info(
557    ds: &PgDatasource,
558    sql: &str,
559    cols: Option<&Vec<String>>,
560) -> Result<HashMap<String, PgTypeInfo>> {
561    match sqlx::query(sql).fetch_one(&ds.pool).await {
562        Ok(res) => {
563            let mut hm = HashMap::new();
564            for col in res.columns() {
565                let colname = col.name().to_string();
566                if let Some(filter_cols) = cols {
567                    if !filter_cols.contains(&colname) {
568                        continue;
569                    }
570                }
571                let type_info = col.type_info();
572                hm.insert(colname, type_info.clone());
573            }
574            Ok(hm)
575        }
576        Err(e) => {
577            error!("Error in collection query `{sql}`: {e}");
578            Err(e.into())
579        }
580    }
581}
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586    use bbox_core::ogcapi::QueryableType;
587    use std::collections::HashMap;
588    use test_log::test;
589
590    // docker run -p 127.0.0.1:5439:5432 -d --name mvtbenchdb --rm sourcepole/mvtbenchdb:v1.2
591
592    #[test(tokio::test)]
593    #[ignore]
594    async fn pg_content() {
595        let mut pool =
596            PgDatasource::new_pool("postgresql://mvtbench:mvtbench@127.0.0.1:5439/mvtbench")
597                .await
598                .unwrap();
599        let collections = pool.collections().await.unwrap();
600        assert!(collections.len() >= 3);
601        assert!(collections
602            .iter()
603            .any(|col| col.collection.id == "ne_10m_rivers_lake_centerlines"));
604    }
605
606    #[test(tokio::test)]
607    #[ignore]
608    async fn pg_features() {
609        let filter = FilterParams::default();
610        let ds = PgDatasource::new_pool("postgresql://mvtbench:mvtbench@127.0.0.1:5439/mvtbench")
611            .await
612            .unwrap();
613        let source = PgCollectionSource {
614            ds,
615            sql: "SELECT * FROM ne_10m_rivers_lake_centerlines".to_string(),
616            geometry_column: "wkb_geometry".to_string(),
617            pk_column: Some("fid".to_string()),
618            temporal_column: None,
619            temporal_end_column: None,
620            other_columns: HashMap::new(),
621        };
622        let items = source.items(&filter).await.unwrap();
623        assert_eq!(items.features.len(), filter.limit_or_default() as usize);
624    }
625
626    #[test(tokio::test)]
627    #[ignore]
628    async fn pg_bbox_filter() {
629        let filter = FilterParams {
630            limit: Some(50),
631            offset: None,
632            bbox: Some("633510.0904,5762740.4365,1220546.4677,6051366.6553".to_string()),
633            // WGS84: 5.690918,45.890008,10.964355,47.665387
634            datetime: None,
635            filters: HashMap::new(),
636        };
637        let ds = PgDatasource::new_pool("postgresql://mvtbench:mvtbench@127.0.0.1:5439/mvtbench")
638            .await
639            .unwrap();
640        let source = PgCollectionSource {
641            ds,
642            sql: "SELECT * FROM ne_10m_rivers_lake_centerlines".to_string(),
643            geometry_column: "wkb_geometry".to_string(),
644            pk_column: Some("fid".to_string()),
645            temporal_column: None,
646            temporal_end_column: None,
647            other_columns: HashMap::new(),
648        };
649        let items = source.items(&filter).await.unwrap();
650        assert_eq!(items.features.len(), 10);
651    }
652
653    #[test(tokio::test)]
654    #[ignore]
655    async fn pg_datetime_filter() {
656        let ds = PgDatasource::new_pool("postgresql://mvtbench:mvtbench@127.0.0.1:5439/mvtbench")
657            .await
658            .unwrap();
659        let source = PgCollectionSource {
660            ds,
661            sql: "SELECT *, '2024-01-01 00:00:00Z'::timestamptz - (fid-1) * INTERVAL '1 day' AS ts FROM ne_10m_rivers_lake_centerlines ORDER BY fid".to_string(),
662            geometry_column: "wkb_geometry".to_string(),
663            pk_column: Some("fid".to_string()),
664            temporal_column: Some("ts".to_string()),
665            temporal_end_column: None,
666            other_columns: HashMap::new(),
667        };
668
669        let filter = FilterParams {
670            limit: None,
671            offset: None,
672            bbox: None,
673            datetime: Some("2021-05-09T00:00:00Z".to_string()),
674            filters: HashMap::new(),
675        };
676        let items = source.items(&filter).await.unwrap();
677        assert_eq!(items.features.len(), 1);
678
679        // Combined with bbox
680        let filter = FilterParams {
681            limit: None,
682            offset: None,
683            bbox: Some("633510.0904,5762740.4365,1220546.4677,6051366.6553".to_string()),
684            datetime: Some("2021-05-09T00:00:00Z".to_string()),
685            filters: HashMap::new(),
686        };
687        let items = source.items(&filter).await.unwrap();
688        assert_eq!(items.features.len(), 1);
689
690        // Outside of bbox
691        let filter = FilterParams {
692            limit: None,
693            offset: None,
694            bbox: Some("633510.0904,5762740.4365,1220546.4677,6051366.6553".to_string()),
695            datetime: Some("2024-01-01T00:00:00Z".to_string()),
696            filters: HashMap::new(),
697        };
698        let items = source.items(&filter).await.unwrap();
699        assert_eq!(items.features.len(), 0);
700    }
701
702    #[test(tokio::test)]
703    #[ignore]
704    async fn pg_field_filter() {
705        let ds = PgDatasource::new_pool("postgresql://mvtbench:mvtbench@127.0.0.1:5439/mvtbench")
706            .await
707            .unwrap();
708
709        let other_columns: HashMap<String, QueryableType> =
710            [("name".to_string(), QueryableType::String)].into();
711        let source = PgCollectionSource {
712            ds,
713            sql: "SELECT *, '2024-01-01 00:00:00Z'::timestamptz - (fid-1) * INTERVAL '1 day' AS ts FROM ne_10m_rivers_lake_centerlines".to_string(),
714            geometry_column: "wkb_geometry".to_string(),
715            pk_column: Some("fid".to_string()),
716            temporal_column: Some("ts".to_string()),
717            temporal_end_column: None,
718            other_columns,
719        };
720
721        let filter = FilterParams {
722            limit: None,
723            offset: None,
724            bbox: None,
725            datetime: None,
726            filters: HashMap::from([("name".to_string(), "Rhein".to_string())]),
727        };
728        let items = source.items(&filter).await.unwrap();
729        assert_eq!(items.features.len(), 2);
730
731        // Existing filter column, but not queriable
732        let filter = FilterParams {
733            limit: None,
734            offset: None,
735            bbox: None,
736            datetime: None,
737            filters: HashMap::from([("scalerank".to_string(), "4".to_string())]),
738        };
739        assert!(source.items(&filter).await.is_err());
740
741        // Existing filter column, but not queriable
742        let filter = FilterParams {
743            limit: None,
744            offset: None,
745            bbox: None,
746            datetime: None,
747            filters: HashMap::from([("foo".to_string(), "bar".to_string())]),
748        };
749        assert!(source.items(&filter).await.is_err());
750
751        // Combined with bbox
752        let filter = FilterParams {
753            limit: None,
754            offset: None,
755            bbox: Some("633510.0904,5762740.4365,1220546.4677,6051366.6553".to_string()),
756            // WGS84: 5.690918,45.890008,10.964355,47.665387
757            datetime: None,
758            filters: HashMap::from([("name".to_string(), "Rhein".to_string())]),
759        };
760        let items = source.items(&filter).await.unwrap();
761        assert_eq!(items.features.len(), 2);
762
763        // outside bbox
764        let filter = FilterParams {
765            limit: None,
766            offset: None,
767            bbox: Some("633510.0904,5762740.4365,633511,5762741".to_string()),
768            datetime: None,
769            filters: HashMap::from([("name".to_string(), "Rhein".to_string())]),
770        };
771        let items = source.items(&filter).await.unwrap();
772        assert_eq!(items.features.len(), 0);
773
774        // Combined with datetime
775        let filter = FilterParams {
776            limit: None,
777            offset: None,
778            bbox: None,
779            datetime: Some("2021-05-09T00:00:00Z".to_string()),
780            filters: HashMap::from([("name".to_string(), "Rhein".to_string())]),
781        };
782        let items = source.items(&filter).await.unwrap();
783        assert_eq!(items.features.len(), 1);
784
785        // Other datetime
786        let filter = FilterParams {
787            limit: None,
788            offset: None,
789            bbox: None,
790            datetime: Some("2023-10-01T00:00:00Z".to_string()),
791            filters: HashMap::from([("name".to_string(), "Rhein".to_string())]),
792        };
793        let items = source.items(&filter).await.unwrap();
794        assert_eq!(items.features.len(), 0);
795    }
796}