1use crate::config::PostgisCollectionCfg;
4use crate::datasource::{
5 AutoscanCollectionDatasource, CollectionDatasource, CollectionSource, CollectionSourceCfg,
6 ConfiguredCollectionCfg, ItemsResult,
7};
8use crate::error::{Error, Result};
9use crate::filter_params::{FilterParams, TemporalType};
10use crate::inventory::FeatureCollection;
11use async_trait::async_trait;
12use bbox_core::ogcapi::*;
13use bbox_core::pg_ds::PgDatasource;
14use chrono::DateTime;
15use futures::TryStreamExt;
16use log::{debug, error, info, warn};
17use sqlx::postgres::PgTypeInfo;
18use sqlx::{postgres::PgRow, Column, Postgres, QueryBuilder, Row};
19use std::collections::HashMap;
20
21pub type Datasource = PgDatasource;
22
23#[async_trait]
24impl CollectionDatasource for PgDatasource {
25 async fn setup_collection(
26 &mut self,
27 cfg: &ConfiguredCollectionCfg,
28 _extent: Option<CoreExtent>,
29 ) -> Result<FeatureCollection> {
30 info!("Setup Postgis Collection `{}`", &cfg.name);
31 let CollectionSourceCfg::Postgis(ref srccfg) = cfg.source else {
32 panic!();
33 };
34
35 let id = &cfg.name;
36 if srccfg.table_name.is_none() && srccfg.sql.is_none() {
37 return Err(Error::DatasourceSetupError(format!(
38 "Datasource `{id}`: configuration `table_name` or `sql` missing"
39 )));
40 } else if srccfg.table_name.is_some() && srccfg.sql.is_some() {
41 warn!("Datasource`{id}`: configuration `table_name` ignored, using `sql` instead");
42 }
43 let temporal_column = srccfg.temporal_field.clone();
44 let temporal_end_column = srccfg.temporal_end_field.clone();
45 let (pk_column, geometry_column, sql) = if let Some(table_name) = &srccfg.table_name {
46 let public = "public".to_string();
47 let table_schema = srccfg.table_schema.as_ref().unwrap_or(&public);
48 let pk_column = srccfg
49 .fid_field
50 .clone()
51 .or(detect_pk(self, table_schema, table_name).await?);
52 let geometry_column = detect_geometry(self, table_schema, table_name).await?;
53 let sql = check_query(
54 self,
55 format!(r#"SELECT * FROM "{table_schema}"."{table_name}""#),
56 )
57 .await?;
58 (pk_column, geometry_column, sql)
59 } else {
60 let pk_column = srccfg.fid_field.clone();
61 let geometry_column =
63 srccfg
64 .geometry_field
65 .clone()
66 .ok_or(Error::DatasourceSetupError(format!(
67 "Datasource `{id}`: configuration `geometry_field` missing"
68 )))?;
69 let sql = check_query(self, srccfg.sql.clone().expect("config checked")).await?;
70 (pk_column, geometry_column, sql)
71 };
72 if pk_column.is_none() {
73 warn!("Datasource `{id}`: `fid_field` missing - single item queries will be ignored");
74 }
75 let mut queryable_fields = srccfg.queryable_fields.clone();
76 if let Some(ref t) = temporal_column {
77 queryable_fields.push(t.clone());
78 }
79 if let Some(ref t) = temporal_end_column {
80 queryable_fields.push(t.clone());
81 }
82 let queryables_types = get_column_info(self, &sql, Some(&queryable_fields)).await?;
83 let mut other_columns = HashMap::new();
84 for (k, v) in &queryables_types {
85 let queryable_type = match v.to_string().as_str() {
86 "TEXT" | "VARCHAR" | "CHAR" => QueryableType::String,
87 "INT4" | "INT8" => QueryableType::Integer,
88 "FLOAT4" | "FLOAT8" => QueryableType::Number,
89 "TIMESTAMP" | "TIMESTAMPTZ" => QueryableType::Datetime,
90 "BOOL" => QueryableType::Bool,
91 _ => {
92 return Err(Error::DatasourceSetupError(format!(
93 "{k} has a postgres type {v} which is not currently handled and can't be used a queryable"
94 )))
95 }
96 };
97 other_columns.insert(k.clone(), queryable_type);
98 }
99
100 let source = PgCollectionSource {
101 ds: self.clone(),
102 sql,
103 geometry_column,
104 pk_column,
105 temporal_column,
106 temporal_end_column,
107 other_columns,
108 };
109
110 let bbox = source
111 .query_bbox()
112 .await
113 .unwrap_or(vec![-180.0, -90.0, 180.0, 90.0]);
114
115 let mut collection = CoreCollection {
116 id: id.clone(),
117 title: Some(id.clone()),
118 description: cfg.description.clone(),
119 extent: Some(CoreExtent {
120 spatial: Some(CoreExtentSpatial {
121 bbox: vec![bbox],
122 crs: None,
123 }),
124 temporal: None,
125 }),
126 item_type: None,
127 crs: vec![],
128 links: vec![ApiLink {
129 href: format!("/collections/{id}/items"),
130 rel: Some("items".to_string()),
131 type_: Some("application/geo+json".to_string()),
132 title: Some(id.clone()),
133 hreflang: None,
134 length: None,
135 }],
136 };
137
138 if !queryable_fields.is_empty() {
139 collection.links.push(ApiLink {
140 href: format!("/collections/{id}/queryables"),
141 rel: Some("http://www.opengis.net/def/rel/ogc/1.0/queryables".to_string()),
142 type_: Some("application/schema+json".to_string()),
143 title: Some(id.clone()),
144 hreflang: None,
145 length: None,
146 })
147 }
148
149 let fc = FeatureCollection {
150 collection,
151 source: Box::new(source),
152 };
153 Ok(fc)
154 }
155}
156
157#[async_trait]
158impl AutoscanCollectionDatasource for PgDatasource {
159 async fn collections(&mut self) -> Result<Vec<FeatureCollection>> {
160 let mut collections = Vec::new();
161 let sql = r#"
162 SELECT contents.*
163 FROM geometry_columns contents
164 JOIN spatial_ref_sys refsys ON refsys.srid = contents.srid
165 WHERE f_table_schema = 'public'
166 "#;
167 let mut rows = sqlx::query(sql).fetch(&self.pool);
168 while let Some(row) = rows.try_next().await? {
169 let table_schema: String = row.try_get("f_table_schema")?;
170 let table_name: String = row.try_get("f_table_name")?;
171 let coll_cfg = ConfiguredCollectionCfg {
172 source: CollectionSourceCfg::Postgis(PostgisCollectionCfg {
173 table_schema: Some(table_schema),
174 table_name: Some(table_name.clone()),
175 ..Default::default()
176 }),
177 name: table_name.clone(),
178 title: Some(table_name),
179 description: None,
180 };
181 let fc = self.setup_collection(&coll_cfg, None).await?;
182 collections.push(fc);
183 }
184 Ok(collections)
185 }
186}
187
188#[derive(Clone, Debug)]
189pub struct PgCollectionSource {
190 ds: PgDatasource,
191 sql: String,
192 geometry_column: String,
193 pk_column: Option<String>,
195 temporal_column: Option<String>,
196 temporal_end_column: Option<String>,
197 other_columns: HashMap<String, QueryableType>,
199}
200
201#[async_trait]
202impl CollectionSource for PgCollectionSource {
203 async fn items(&self, filter: &FilterParams) -> Result<ItemsResult> {
204 let geometry_column = &self.geometry_column;
205 let temporal_column = &self.temporal_column;
206 let mut builder: QueryBuilder<Postgres> =
207 QueryBuilder::new(format!("WITH query AS ({sql})\n", sql = &self.sql));
208 let select_sql = if let Some(pk) = &self.pk_column {
209 format!(
210 r#"SELECT to_jsonb(t.*)-'{geometry_column}'-'{pk}' AS properties, ST_AsGeoJSON({geometry_column})::jsonb AS geometry,
211 "{pk}"::varchar AS pk,
212 count(*) OVER () AS __total_cnt
213 FROM query t"#,
214 )
215 } else {
216 format!(
217 r#"SELECT to_jsonb(t.*)-'{geometry_column}' AS properties, ST_AsGeoJSON({geometry_column})::jsonb AS geometry,
218 NULL AS pk,
219 --row_number() OVER () ::varchar AS pk,
220 count(*) OVER () AS __total_cnt
221 FROM query t"#,
222 )
223 };
224 builder.push(&select_sql);
225 let mut where_term = false;
226 match filter.bbox() {
227 Ok(Some(bbox)) => {
228 builder.push(format!(" WHERE ( {geometry_column} && ST_MakeEnvelope("));
229 let mut separated = builder.separated(",");
230 separated.push_bind(bbox[0]);
231 separated.push_bind(bbox[1]);
232 separated.push_bind(bbox[2]);
233 separated.push_bind(bbox[3]);
234 builder.push(") ) ");
235 where_term = true;
236 }
237 Ok(None) => {}
238 Err(e) => {
239 error!("Ignoring invalid bbox: {e}");
240 return Err(Error::QueryParams);
241 }
242 }
243 if let Some(temporal_column) = temporal_column {
244 let temporal_end_column = self.temporal_end_column.as_ref().unwrap_or(temporal_column);
245 match filter.temporal() {
246 Ok(Some(parts)) => {
247 if where_term {
248 builder.push(" AND ");
249 } else {
250 builder.push(" WHERE ");
251 where_term = true;
252 }
253 if parts.len() == 1 {
254 if let TemporalType::DateTime(dt) = parts[0] {
255 builder.push(format!(" {temporal_column} = ",));
256 builder.push_bind(dt);
257 debug!("{temporal_column} = {}", dt);
258 }
259 } else {
260 match parts[0] {
261 TemporalType::Open => match parts[1] {
262 TemporalType::Open => {
263 error!("Open to Open datetimes doesn't make sense");
264 return Err(Error::QueryParams);
265 }
266 TemporalType::DateTime(dt) => {
267 builder.push(format!(" {temporal_column} <= ",));
268 builder.push_bind(dt);
269 debug!("{temporal_column} <= {}", dt);
270 }
271 },
272 TemporalType::DateTime(dt1) => match parts[1] {
273 TemporalType::Open => {
274 builder.push(format!(" {temporal_column} >= ",));
275 builder.push_bind(dt1);
276 debug!("{temporal_column} >= {}", dt1);
277 }
278 TemporalType::DateTime(dt2) => {
279 builder.push(format!(" {temporal_column} >= "));
280 builder.push_bind(dt1);
281 debug!("{temporal_column} >= {}", dt1);
282 builder.push(format!(" and {temporal_end_column} <= ",));
283 builder.push_bind(dt2);
284 debug!("{temporal_column} <= {}", dt2);
285 }
286 },
287 }
288 }
289 }
290 Ok(None) => {}
291 Err(e) => {
292 error!("Ignoring invalid temporal field: {e}");
293 return Err(Error::QueryParams);
294 }
295 }
296 }
297
298 match filter.other_params() {
299 Ok(others) => {
300 if !others.is_empty() {
301 if where_term {
302 builder.push(" AND ");
303 } else {
304 builder.push(" WHERE ");
305 }
306 }
307 let mut separated = builder.separated(" AND ");
308 for (key, val) in others {
309 if let Some((k, v)) = self.other_columns.get_key_value(key) {
312 if val.rfind('*').is_some() {
313 separated.push(format!("{k}::text like "));
314 let val = val.replace('*', "%");
315 debug!("{k}::text like {val}");
316 separated.push_bind_unseparated(val);
317 } else {
318 separated.push(format!("{k}="));
319 debug!("{k} = {val}");
320 match v {
321 QueryableType::String => separated.push_bind_unseparated(val),
322 QueryableType::Integer => separated.push_bind_unseparated(
323 val.parse::<i64>().map_err(|_| Error::QueryParams)?,
324 ),
325 QueryableType::Number => separated.push_bind_unseparated(
326 val.parse::<f64>().map_err(|_| Error::QueryParams)?,
327 ),
328 QueryableType::Bool => separated.push_bind_unseparated(
329 val.parse::<bool>().map_err(|_| Error::QueryParams)?,
330 ),
331 QueryableType::Datetime => separated.push_bind_unseparated(
332 DateTime::parse_from_rfc3339(val)
333 .map_err(|_| Error::QueryParams)?,
334 ),
335 };
336 }
337 } else {
338 error!("Invalid query param {key}");
339 return Err(Error::QueryParams);
340 }
341 }
342 }
343 Err(e) => {
344 error!("{e}");
345 return Err(Error::QueryParams);
346 }
347 }
348 let limit = filter.limit_or_default();
349 if limit > 0 {
350 builder.push(" LIMIT ");
351 debug!("LIMIT {limit}");
352 builder.push_bind(limit as i64);
353 }
354 if let Some(offset) = filter.offset {
355 builder.push(" OFFSET ");
356 debug!("OFFSET {offset}");
357 builder.push_bind(offset as i64);
358 }
359 debug!("SQL: {}", builder.sql());
360 let query = builder.build();
361 let rows = query.fetch_all(&self.ds.pool).await?;
362 let number_matched = if let Some(row) = rows.first() {
363 row.try_get::<i64, _>("__total_cnt")? as u64
364 } else {
365 0
366 };
367 let number_returned = rows.len() as u64;
368 let items = rows
369 .iter()
370 .map(|row| row_to_feature(row, self))
371 .collect::<Result<Vec<_>>>()?;
372 let result = ItemsResult {
373 features: items,
374 number_matched,
375 number_returned,
376 };
377 Ok(result)
378 }
379
380 async fn item(&self, collection_id: &str, feature_id: &str) -> Result<Option<CoreFeature>> {
381 let Some(pk) = &self.pk_column else {
382 warn!("Ignoring error getting item for {collection_id} without single primary key");
383 return Ok(None);
384 };
385 let sql = format!(
386 r#"
387 WITH query AS ({sql})
388 SELECT to_jsonb(t.*)-'{geometry_column}'-'{pk}' AS properties, ST_AsGeoJSON({geometry_column})::jsonb AS geometry,
389 "{pk}"::varchar AS pk
390 FROM query t
391 WHERE {pk}::varchar = '{feature_id}'"#,
392 sql = &self.sql,
393 geometry_column = &self.geometry_column,
394 );
395 if let Some(row) = sqlx::query(&sql)
396 .fetch_optional(&self.ds.pool)
398 .await?
399 {
400 let mut item = row_to_feature(&row, self)?;
401 item.links = vec![
402 ApiLink {
403 href: format!("/collections/{collection_id}/items/{feature_id}"),
404 rel: Some("self".to_string()),
405 type_: Some("application/geo+json".to_string()),
406 title: Some("this document".to_string()),
407 hreflang: None,
408 length: None,
409 },
410 ApiLink {
411 href: format!("/collections/{collection_id}"),
412 rel: Some("collection".to_string()),
413 type_: Some("application/geo+json".to_string()),
414 title: Some("the collection document".to_string()),
415 hreflang: None,
416 length: None,
417 },
418 ];
419 Ok(Some(item))
420 } else {
421 Ok(None)
422 }
423 }
424 async fn queryables(&self, collection_id: &str) -> Result<Option<Queryables>> {
425 let properties: HashMap<String, QueryableProperty> = self
426 .other_columns
427 .iter()
428 .map(|s| {
429 let title = s.0.to_string();
430 (
431 title.clone(),
432 QueryableProperty {
433 title: Some(title),
434 type_: Some(s.1.clone()),
435 format: None,
436 },
437 )
438 })
439 .collect();
440 Ok(Some(Queryables {
441 id: format!("/collections/{collection_id}/queryables"),
442 title: Some(collection_id.to_string()),
443 schema: "http://json-schema.org/draft/2019-09/schema".to_string(),
444 type_: "object".to_string(),
445 properties,
446 }))
447 }
448}
449
450fn row_to_feature(row: &PgRow, _table_info: &PgCollectionSource) -> Result<CoreFeature> {
451 let properties: serde_json::Value = row.try_get("properties")?;
452 let geometry: serde_json::Value = row.try_get("geometry")?;
461 let id: Option<String> = row.try_get("pk")?;
463
464 let item = CoreFeature {
465 type_: "Feature".to_string(),
466 id,
467 geometry,
468 properties: Some(properties),
469 links: vec![],
470 };
471
472 Ok(item)
473}
474
475impl PgCollectionSource {
476 async fn query_bbox(&self) -> Result<Vec<f64>> {
477 let sql = &format!(
479 r#"
480 WITH query AS ({sql}),
481 extent AS (
482 SELECT ST_Extent("{geometry_column}") AS bbox
483 FROM query
484 )
485 SELECT ST_XMin(bbox), ST_YMin(bbox), ST_XMax(bbox), ST_YMax(bbox)
486 FROM extent
487 "#,
488 sql = &self.sql,
489 geometry_column = &self.geometry_column,
490 );
491 let row = sqlx::query(sql).fetch_one(&self.ds.pool).await?;
492 let extent: Vec<f64> = vec![
493 row.try_get(0)?,
494 row.try_get(1)?,
495 row.try_get(2)?,
496 row.try_get(3)?,
497 ];
498 Ok(extent)
499 }
500}
501
502async fn detect_pk(ds: &PgDatasource, schema: &str, table: &str) -> Result<Option<String>> {
503 let sql = &format!(
504 r#"
505 WITH pkeys AS (
506 SELECT a.attname
507 FROM pg_index i
508 JOIN pg_attribute a ON a.attrelid = i.indrelid
509 AND a.attnum = ANY(i.indkey)
510 WHERE i.indrelid = '{schema}.{table}'::regclass
511 AND i.indisprimary
512 )
513 SELECT
514 (SELECT COUNT(*) FROM pkeys) AS pksize,
515 (SELECT attname FROM pkeys LIMIT 1) AS pk
516 "#
517 );
518 let row = sqlx::query(sql).fetch_one(&ds.pool).await?;
519 let pksize: i64 = row.try_get("pksize")?;
520 let pk_column: Option<String> = if pksize == 1 {
521 row.try_get("pk")?
522 } else {
523 None
524 };
525 Ok(pk_column)
526}
527
528async fn detect_geometry(ds: &PgDatasource, schema: &str, table: &str) -> Result<String> {
529 let sql = &format!(
530 r#"
531 SELECT f_geometry_column
532 FROM geometry_columns
533 JOIN spatial_ref_sys refsys ON refsys.srid = geometry_columns.srid
534 WHERE f_table_schema = '{schema}' AND f_table_name = '{table}'
535 "#
536 );
537 let row = sqlx::query(sql)
538 .fetch_one(&ds.pool)
542 .await?;
543 let geometry_column: String = row.try_get("f_geometry_column")?;
544 Ok(geometry_column)
545}
546
547async fn check_query(ds: &PgDatasource, sql: String) -> Result<String> {
548 debug!("Collection query: {sql}");
549 if let Err(e) = sqlx::query(&sql).fetch_one(&ds.pool).await {
550 error!("Error in collection query `{sql}`: {e}");
551 return Err(e.into());
552 }
553 Ok(sql)
554}
555
556async fn get_column_info(
557 ds: &PgDatasource,
558 sql: &str,
559 cols: Option<&Vec<String>>,
560) -> Result<HashMap<String, PgTypeInfo>> {
561 match sqlx::query(sql).fetch_one(&ds.pool).await {
562 Ok(res) => {
563 let mut hm = HashMap::new();
564 for col in res.columns() {
565 let colname = col.name().to_string();
566 if let Some(filter_cols) = cols {
567 if !filter_cols.contains(&colname) {
568 continue;
569 }
570 }
571 let type_info = col.type_info();
572 hm.insert(colname, type_info.clone());
573 }
574 Ok(hm)
575 }
576 Err(e) => {
577 error!("Error in collection query `{sql}`: {e}");
578 Err(e.into())
579 }
580 }
581}
582
583#[cfg(test)]
584mod tests {
585 use super::*;
586 use bbox_core::ogcapi::QueryableType;
587 use std::collections::HashMap;
588 use test_log::test;
589
590 #[test(tokio::test)]
593 #[ignore]
594 async fn pg_content() {
595 let mut pool =
596 PgDatasource::new_pool("postgresql://mvtbench:mvtbench@127.0.0.1:5439/mvtbench")
597 .await
598 .unwrap();
599 let collections = pool.collections().await.unwrap();
600 assert!(collections.len() >= 3);
601 assert!(collections
602 .iter()
603 .any(|col| col.collection.id == "ne_10m_rivers_lake_centerlines"));
604 }
605
606 #[test(tokio::test)]
607 #[ignore]
608 async fn pg_features() {
609 let filter = FilterParams::default();
610 let ds = PgDatasource::new_pool("postgresql://mvtbench:mvtbench@127.0.0.1:5439/mvtbench")
611 .await
612 .unwrap();
613 let source = PgCollectionSource {
614 ds,
615 sql: "SELECT * FROM ne_10m_rivers_lake_centerlines".to_string(),
616 geometry_column: "wkb_geometry".to_string(),
617 pk_column: Some("fid".to_string()),
618 temporal_column: None,
619 temporal_end_column: None,
620 other_columns: HashMap::new(),
621 };
622 let items = source.items(&filter).await.unwrap();
623 assert_eq!(items.features.len(), filter.limit_or_default() as usize);
624 }
625
626 #[test(tokio::test)]
627 #[ignore]
628 async fn pg_bbox_filter() {
629 let filter = FilterParams {
630 limit: Some(50),
631 offset: None,
632 bbox: Some("633510.0904,5762740.4365,1220546.4677,6051366.6553".to_string()),
633 datetime: None,
635 filters: HashMap::new(),
636 };
637 let ds = PgDatasource::new_pool("postgresql://mvtbench:mvtbench@127.0.0.1:5439/mvtbench")
638 .await
639 .unwrap();
640 let source = PgCollectionSource {
641 ds,
642 sql: "SELECT * FROM ne_10m_rivers_lake_centerlines".to_string(),
643 geometry_column: "wkb_geometry".to_string(),
644 pk_column: Some("fid".to_string()),
645 temporal_column: None,
646 temporal_end_column: None,
647 other_columns: HashMap::new(),
648 };
649 let items = source.items(&filter).await.unwrap();
650 assert_eq!(items.features.len(), 10);
651 }
652
653 #[test(tokio::test)]
654 #[ignore]
655 async fn pg_datetime_filter() {
656 let ds = PgDatasource::new_pool("postgresql://mvtbench:mvtbench@127.0.0.1:5439/mvtbench")
657 .await
658 .unwrap();
659 let source = PgCollectionSource {
660 ds,
661 sql: "SELECT *, '2024-01-01 00:00:00Z'::timestamptz - (fid-1) * INTERVAL '1 day' AS ts FROM ne_10m_rivers_lake_centerlines ORDER BY fid".to_string(),
662 geometry_column: "wkb_geometry".to_string(),
663 pk_column: Some("fid".to_string()),
664 temporal_column: Some("ts".to_string()),
665 temporal_end_column: None,
666 other_columns: HashMap::new(),
667 };
668
669 let filter = FilterParams {
670 limit: None,
671 offset: None,
672 bbox: None,
673 datetime: Some("2021-05-09T00:00:00Z".to_string()),
674 filters: HashMap::new(),
675 };
676 let items = source.items(&filter).await.unwrap();
677 assert_eq!(items.features.len(), 1);
678
679 let filter = FilterParams {
681 limit: None,
682 offset: None,
683 bbox: Some("633510.0904,5762740.4365,1220546.4677,6051366.6553".to_string()),
684 datetime: Some("2021-05-09T00:00:00Z".to_string()),
685 filters: HashMap::new(),
686 };
687 let items = source.items(&filter).await.unwrap();
688 assert_eq!(items.features.len(), 1);
689
690 let filter = FilterParams {
692 limit: None,
693 offset: None,
694 bbox: Some("633510.0904,5762740.4365,1220546.4677,6051366.6553".to_string()),
695 datetime: Some("2024-01-01T00:00:00Z".to_string()),
696 filters: HashMap::new(),
697 };
698 let items = source.items(&filter).await.unwrap();
699 assert_eq!(items.features.len(), 0);
700 }
701
702 #[test(tokio::test)]
703 #[ignore]
704 async fn pg_field_filter() {
705 let ds = PgDatasource::new_pool("postgresql://mvtbench:mvtbench@127.0.0.1:5439/mvtbench")
706 .await
707 .unwrap();
708
709 let other_columns: HashMap<String, QueryableType> =
710 [("name".to_string(), QueryableType::String)].into();
711 let source = PgCollectionSource {
712 ds,
713 sql: "SELECT *, '2024-01-01 00:00:00Z'::timestamptz - (fid-1) * INTERVAL '1 day' AS ts FROM ne_10m_rivers_lake_centerlines".to_string(),
714 geometry_column: "wkb_geometry".to_string(),
715 pk_column: Some("fid".to_string()),
716 temporal_column: Some("ts".to_string()),
717 temporal_end_column: None,
718 other_columns,
719 };
720
721 let filter = FilterParams {
722 limit: None,
723 offset: None,
724 bbox: None,
725 datetime: None,
726 filters: HashMap::from([("name".to_string(), "Rhein".to_string())]),
727 };
728 let items = source.items(&filter).await.unwrap();
729 assert_eq!(items.features.len(), 2);
730
731 let filter = FilterParams {
733 limit: None,
734 offset: None,
735 bbox: None,
736 datetime: None,
737 filters: HashMap::from([("scalerank".to_string(), "4".to_string())]),
738 };
739 assert!(source.items(&filter).await.is_err());
740
741 let filter = FilterParams {
743 limit: None,
744 offset: None,
745 bbox: None,
746 datetime: None,
747 filters: HashMap::from([("foo".to_string(), "bar".to_string())]),
748 };
749 assert!(source.items(&filter).await.is_err());
750
751 let filter = FilterParams {
753 limit: None,
754 offset: None,
755 bbox: Some("633510.0904,5762740.4365,1220546.4677,6051366.6553".to_string()),
756 datetime: None,
758 filters: HashMap::from([("name".to_string(), "Rhein".to_string())]),
759 };
760 let items = source.items(&filter).await.unwrap();
761 assert_eq!(items.features.len(), 2);
762
763 let filter = FilterParams {
765 limit: None,
766 offset: None,
767 bbox: Some("633510.0904,5762740.4365,633511,5762741".to_string()),
768 datetime: None,
769 filters: HashMap::from([("name".to_string(), "Rhein".to_string())]),
770 };
771 let items = source.items(&filter).await.unwrap();
772 assert_eq!(items.features.len(), 0);
773
774 let filter = FilterParams {
776 limit: None,
777 offset: None,
778 bbox: None,
779 datetime: Some("2021-05-09T00:00:00Z".to_string()),
780 filters: HashMap::from([("name".to_string(), "Rhein".to_string())]),
781 };
782 let items = source.items(&filter).await.unwrap();
783 assert_eq!(items.features.len(), 1);
784
785 let filter = FilterParams {
787 limit: None,
788 offset: None,
789 bbox: None,
790 datetime: Some("2023-10-01T00:00:00Z".to_string()),
791 filters: HashMap::from([("name".to_string(), "Rhein".to_string())]),
792 };
793 let items = source.items(&filter).await.unwrap();
794 assert_eq!(items.features.len(), 0);
795 }
796}