1use crate::config::{PostgisSourceParamsCfg, TilesetTmsCfg, VectorLayerCfg};
4use crate::datasource::{
5 mvt::MvtBuilder,
6 postgis_queries::{QueryParam, SqlQuery},
7 wms_fcgi::HttpRequestParams,
8 LayerInfo, SourceType, TileSource, TileSourceError,
9};
10use crate::filter_params::FilterParams;
11use crate::service::{TileSetGrid, TmsExtensions};
12use async_trait::async_trait;
13use bbox_core::pg_ds::PgDatasource;
14use bbox_core::{Format, TileResponse};
15use futures::TryStreamExt;
16use geozero::{mvt, wkb, ToMvt};
17use log::{debug, error, info, warn};
18use serde_json::json;
19use sqlx::{
20 postgres::{PgColumn, PgRow, PgStatement, PgTypeInfo},
21 Column, Executor, Row, Statement, TypeInfo,
22};
23use std::collections::{BTreeMap, HashMap};
24use std::io::Cursor;
25use tile_grid::{BoundingBox, Tms, Xyz};
26use tilejson::{tilejson, TileJSON};
27
28#[derive(Clone, Debug)]
29pub struct PgSource {
30 ds: PgDatasource,
31 layers: BTreeMap<String, PgMvtLayer>,
32 config: PostgisSourceParamsCfg,
34}
35
36#[derive(Clone, Debug)]
37pub struct PgMvtLayer {
38 geometry_type: Option<String>,
39 tile_coord_sys: bool,
41 tile_size: u32,
42 fid_field: Option<String>,
43 query_limit: Option<u32>,
44 queries: HashMap<i32, HashMap<u8, QueryInfo>>,
46 query_zoom_steps: HashMap<u8, u8>,
48}
49
50#[derive(Clone, PartialEq, Debug)]
51pub enum FieldTypeInfo {
52 Property(PgTypeInfo),
53 Geometry,
54 Ignored,
55}
56
57#[derive(Clone, Debug)]
58struct QueryInfo {
59 stmt: PgStatement<'static>,
60 params: Vec<QueryParam>,
61 geometry_field: String,
62 fields: Vec<FieldInfo>,
63}
64
65#[derive(Clone, Debug)]
66pub struct FieldInfo {
67 pub name: String,
68 pub info: FieldTypeInfo,
69}
70
71pub type Datasource = PgDatasource;
72
73impl PgMvtLayer {
74 fn query(&self, grid_srid: i32, zoom: u8) -> Option<&QueryInfo> {
76 self.query_zoom_steps
77 .get(&zoom)
78 .and_then(|minzoom| self.queries.get(&grid_srid).and_then(|gq| gq.get(minzoom)))
79 }
80 pub fn minzoom(&self) -> u8 {
81 *self.query_zoom_steps.keys().min().unwrap_or(&0)
82 }
83 pub fn maxzoom(&self) -> u8 {
84 *self.query_zoom_steps.keys().max().unwrap_or(&0)
85 }
86}
87
88impl PgSource {
89 pub async fn create(
90 ds: &PgDatasource,
91 cfg: &PostgisSourceParamsCfg,
92 ts_grids: &[TileSetGrid],
93 tms_cfg: &[TilesetTmsCfg],
94 ) -> PgSource {
95 let mut layers = BTreeMap::new();
96 for layer in &cfg.layers {
97 match Self::setup_layer(ds, layer, ts_grids, tms_cfg, cfg.postgis2).await {
98 Ok(mvt_layer) => {
99 layers.insert(layer.name.clone(), mvt_layer);
100 }
101 Err(_) => {
102 error!("Layer `{}`: skipping", layer.name)
103 }
104 };
105 }
106 PgSource {
107 ds: ds.clone(),
108 layers,
109 config: cfg.clone(),
110 }
111 }
112 async fn setup_layer(
113 ds: &PgDatasource,
114 layer: &VectorLayerCfg,
115 ts_grids: &[TileSetGrid],
116 tms_cfg: &[TilesetTmsCfg],
117 postgis2: bool,
118 ) -> Result<PgMvtLayer, TileSourceError> {
119 if layer.queries.is_empty() && layer.table_name.is_none() {
121 error!("Layer '{}': table_name undefined", layer.name);
122 return Err(TileSourceError::TypeDetectionError);
123 }
124
125 fn tile_srid_z(ts_grids: &[TileSetGrid], zoom: u8) -> Option<i32> {
126 ts_grids
127 .iter()
128 .rev()
129 .find(|entry| entry.minzoom <= zoom && entry.maxzoom >= zoom)
130 .map(|entry| entry.tms.srid())
131 }
132
133 let zoom_steps = layer.zoom_steps(tms_cfg);
134 if zoom_steps.len() > 1 {
135 debug!("Layer `{}` zoom steps: {:?}", layer.name, zoom_steps);
136 }
137 let mut layer_queries = HashMap::new();
138 for grid in ts_grids {
139 for zs in &zoom_steps {
140 let zoom = *zs;
141 let layer_query = layer.query(zoom);
142 let tile_srid = tile_srid_z(ts_grids, zoom).unwrap_or(grid.tms.srid());
143 let field_query = SqlQuery::build_field_query(layer, layer_query);
144 let param_types = field_query.param_types();
145 let mut geometry_field = None;
146 let mut fields = Vec::new();
147 match ds.pool.prepare_with(&field_query.sql, ¶m_types).await {
148 Ok(stmt) => {
149 for col in stmt.columns() {
150 let info = column_info(col, &layer.name);
151 if let Some(geom_col) = &layer.geometry_field {
152 if col.name() == geom_col {
153 if info == FieldTypeInfo::Geometry {
154 geometry_field = Some(geom_col.to_string());
155 } else {
156 error!(
157 "Layer `{}` z{zoom}: Unsupported geometry type",
158 layer.name
159 );
160 continue;
161 }
162 }
163 } else if info == FieldTypeInfo::Geometry && geometry_field.is_none() {
164 geometry_field = Some(col.name().to_string());
166 }
167 if info != FieldTypeInfo::Ignored {
168 let field_info = FieldInfo {
169 name: col.name().to_string(),
170 info,
171 };
172 fields.push(field_info);
173 }
174 }
175 debug!("Query parameters: {:?}", stmt.parameters());
176 }
177 Err(e) => {
178 error!(
179 "Layer `{}` z{zoom}: Field detection failed - {e}",
180 layer.name
181 );
182 error!(" Query: {}", field_query.sql);
183 return Err(TileSourceError::TypeDetectionError);
184 }
185 };
186 let Some(geometry_field) = geometry_field else {
187 error!("Layer `{}`: No geometry column found", layer.name);
188 return Err(TileSourceError::TypeDetectionError);
189 };
190 let geom_name = layer.geometry_field.as_ref().unwrap_or(&geometry_field);
191 let query = SqlQuery::build_tile_query(
192 layer,
193 geom_name,
194 &fields,
195 tile_srid,
196 zoom,
197 layer_query,
198 postgis2,
199 );
200 let param_types = query.param_types();
201 let stmt = match ds.pool.prepare_with(&query.sql, ¶m_types).await {
202 Ok(stmt) => Statement::to_owned(&stmt), Err(e) => {
204 error!("Layer `{}` z{zoom}: Invalid query - {e}", layer.name);
205 error!(" Query: {}", query.sql);
206 return Err(TileSourceError::TypeDetectionError);
207 }
208 };
209 debug!(
214 "Layer `{}`: Query for minzoom {zoom}: {}",
215 layer.name, query.sql
216 );
217 let query_info = QueryInfo {
218 stmt,
219 params: query.params.clone(),
220 fields: fields.clone(),
221 geometry_field: geometry_field.clone(),
222 };
223 layer_queries
224 .entry(tile_srid)
225 .or_insert(HashMap::new())
226 .insert(zoom, query_info);
227 }
228 }
229
230 let zoom_steps = layer.zoom_steps(tms_cfg);
232 let maxzoom = ts_grids
233 .iter()
234 .map(|g| g.tms.maxzoom())
235 .max()
236 .expect("default grid missing");
237 let mut query_zoom_steps = HashMap::new();
238 for zoom in layer.minzoom()..=layer.maxzoom(maxzoom) {
239 let z = zoom_steps
240 .iter()
241 .rev()
242 .find(|z| zoom >= **z)
243 .expect("invalid zoom steps");
244 query_zoom_steps.insert(zoom, *z);
245 }
246
247 Ok(PgMvtLayer {
248 geometry_type: layer.geometry_type.clone(),
249 tile_coord_sys: !postgis2,
250 tile_size: layer.tile_size,
251 fid_field: layer.fid_field.clone(),
252 query_limit: layer.query_limit,
253 queries: layer_queries,
254 query_zoom_steps,
255 })
256 }
257}
258
259fn layer_query<'a>(
260 layer: &'a PgMvtLayer,
261 query_info: &'a QueryInfo,
262 tile: &Xyz,
263 grid: &Tms,
264 extent: &BoundingBox,
265 filter: &'a FilterParams,
266) -> Result<sqlx::query::Query<'a, sqlx::Postgres, sqlx::postgres::PgArguments>, TileSourceError> {
267 let mut query = query_info.stmt.query();
268 for param in &query_info.params {
269 query = match *param {
270 QueryParam::Bbox => query
271 .bind(extent.left)
272 .bind(extent.bottom)
273 .bind(extent.right)
274 .bind(extent.top),
275 QueryParam::Zoom => query.bind(tile.z as i32),
276 QueryParam::X => query.bind(tile.x as i32),
277 QueryParam::Y => query.bind(tile.y as i32),
278 QueryParam::PixelWidth => {
279 if let Some(pixel_width) = grid.resolution_z(tile.z) {
280 let grid_width: u16 = grid.tms.tile_matrices[tile.z as usize].tile_width.into();
282 let mvt_pixel_width = pixel_width * grid_width as f64 / layer.tile_size as f64;
283 query.bind(mvt_pixel_width)
284 } else {
285 info!("Undefined resolution for z={}", tile.z);
286 return Err(TileSourceError::TileXyzError);
287 }
288 }
289 QueryParam::ScaleDenominator => {
290 if let Some(m) = grid.matrix_z(tile.z) {
291 query.bind(m.scale_denominator)
292 } else {
293 info!("Undefined scale_denominator for z={}", tile.z);
294 return Err(TileSourceError::FilterParamError);
295 }
296 }
297 QueryParam::QueryField(ref field) => {
298 if let Some(value) = filter.filters.get(field) {
299 query.bind(value)
300 } else {
301 info!("Filter parameter `{field}` missing");
302 return Err(TileSourceError::FilterParamError);
303 }
304 }
305 }
306 }
307 Ok(query)
308}
309
310#[async_trait]
311impl TileSource for PgSource {
312 async fn xyz_request(
313 &self,
314 tms: &Tms,
315 tile: &Xyz,
316 filter: &FilterParams,
317 _format: &Format,
318 _request_params: HttpRequestParams<'_>,
319 ) -> Result<TileResponse, TileSourceError> {
320 let extent_info = tms.xyz_extent(tile)?;
321 let extent = &extent_info.extent;
322 debug!(
323 "Query tile {}/{}/{} with {extent:?}",
324 tile.z, tile.x, tile.y
325 );
326 let tile_srid = tms.srid();
327 let mut mvt = MvtBuilder::new();
328 for (id, layer) in &self.layers {
329 let Some(query_info) = layer.query(tile_srid, tile.z) else {
330 continue;
331 };
332 let query = layer_query(layer, query_info, tile, tms, extent, filter)?;
333 debug!("Query layer `{id}`");
334 let mut rows = query.fetch(&self.ds.pool);
335 let mut mvt_layer = MvtBuilder::new_layer(id, layer.tile_size);
336 let mut cnt = 0;
337 let query_limit = layer.query_limit.unwrap_or(0);
338 while let Some(row) = rows.try_next().await? {
339 let Some(wkb) =
340 row.try_get::<Option<wkb::Ewkb>, _>(query_info.geometry_field.as_str())?
341 else {
342 continue;
344 };
345 let mut feat = if layer.tile_coord_sys {
346 wkb.to_mvt_unscaled()?
347 } else {
348 wkb.to_mvt(
349 layer.tile_size,
350 extent.left,
351 extent.bottom,
352 extent.right,
353 extent.top,
354 )?
355 };
356 for field in &query_info.fields {
357 if field.name == query_info.geometry_field {
358 continue;
359 }
360 if let Some(val) = column_value(&row, field)? {
361 if let Some(fid_field) = &layer.fid_field {
362 if &field.name == fid_field {
363 if let Some(val) = val.int_value {
364 feat.id = Some(u64::try_from(val)?);
365 continue;
366 }
367 }
368 }
369 mvt_layer.add_feature_attribute(&mut feat, &field.name, val)?;
370 } }
372 mvt_layer.push_feature(feat);
373 cnt += 1;
374 if cnt == query_limit {
375 info!(
376 "Layer `{id}`: Features limited to {cnt} (tile query_limit reached, zoom level {})",
377 tile.z
378 );
379 break;
380 }
381 }
382 mvt.push_layer(mvt_layer);
383 }
384 if let Some(diaganostics_cfg) = &self.config.diagnostics {
385 mvt.add_diagnostics_layer(diaganostics_cfg, tile, &extent_info)?;
386 }
387 let blob = mvt.into_blob()?;
388 let mut response = TileResponse::new();
389 response.set_content_type("application/x-protobuf");
390 let body = Box::new(Cursor::new(blob));
391 Ok(response.with_body(body))
392 }
393 fn source_type(&self) -> SourceType {
394 SourceType::Vector
395 }
396 async fn tilejson(&self, tms: &Tms, format: &Format) -> Result<TileJSON, TileSourceError> {
397 let mut tj = tilejson! { tiles: vec![] };
398 tj.attribution = Some(self.config.attribution());
399 tj.minzoom = Some(tms.minzoom());
402 tj.maxzoom = Some(tms.maxzoom());
406 let extent = self.config.get_extent();
407 tj.bounds = Some(tilejson::Bounds {
408 left: extent.minx,
409 bottom: extent.miny,
410 right: extent.maxx,
411 top: extent.maxy,
412 });
413 let center = self.config.get_center();
414 tj.center = Some(tilejson::Center {
415 longitude: center.1,
416 latitude: center.0,
417 zoom: self.config.get_start_zoom(),
418 });
419 tj.other
420 .insert("format".to_string(), format.file_suffix().into());
421
422 let grid_srid = tms.srid();
423 if grid_srid != 3857 {
424 tj.other
427 .insert("srs".to_string(), tms.crs().as_known_crs().into());
428 }
429 let empty_queries = HashMap::new();
430 let mut layers: Vec<tilejson::VectorLayer> = self
432 .layers
433 .iter()
434 .map(|(id, layer)| {
435 let fields = layer
437 .queries
438 .get(&grid_srid)
439 .or({
440 if grid_srid == 3857 {
442 Some(&empty_queries)
443 } else {
444 None
445 }
446 })
447 .expect("invalid srid lookup")
448 .clone()
449 .into_values()
450 .flat_map(|q| q.fields)
451 .map(|f| (f.name.clone(), f))
452 .collect::<HashMap<_, _>>()
453 .values()
454 .filter(|field| {
455 if let FieldTypeInfo::Property(_) = &field.info {
456 if let Some(fid_field) = &layer.fid_field {
457 if &field.name == fid_field {
458 return false;
459 }
460 }
461 true
462 } else {
463 false
464 }
465 })
466 .map(|field| (field.name.clone(), "".to_string()))
467 .collect();
468 tilejson::VectorLayer {
469 id: id.clone(),
470 fields,
471 description: None,
472 minzoom: Some(layer.minzoom()),
473 maxzoom: Some(layer.maxzoom()),
474 other: BTreeMap::default(),
475 }
476 })
477 .collect();
478 if self.config.diagnostics.is_some() {
479 layers.push(tilejson::VectorLayer {
480 id: "diagnostics-tile".to_string(),
481 fields: BTreeMap::from([
482 (
483 "layer-total-percent".to_string(),
484 "Total size in bytes (uncompressed)".to_string(),
485 ),
486 (
487 "layer-total-percent".to_string(),
488 "Total size relative to reference size".to_string(),
489 ),
490 ]),
491 description: None,
492 maxzoom: None,
493 minzoom: None,
494 other: BTreeMap::default(),
495 });
496 layers.push(tilejson::VectorLayer {
497 id: "diagnostics-label".to_string(),
498 fields: BTreeMap::from([
499 ("zxy".to_string(), "tile number".to_string()),
500 ("tile-top".to_string(), "tile extent".to_string()),
501 ("tile-left".to_string(), "tile extent".to_string()),
502 ("tile-bottom".to_string(), "tile extent".to_string()),
503 ("tile-right".to_string(), "tile extent".to_string()),
504 ]),
505 description: None,
506 maxzoom: None,
507 minzoom: None,
508 other: BTreeMap::default(),
509 });
510 }
511 tj.vector_layers = Some(layers);
512 Ok(tj)
513 }
514 async fn layers(&self) -> Result<Vec<LayerInfo>, TileSourceError> {
515 let mut layers: Vec<LayerInfo> = self
516 .layers
517 .iter()
518 .map(|(id, layer)| LayerInfo {
519 name: id.clone(),
520 geometry_type: layer.geometry_type.clone(),
521 style: None,
522 })
523 .collect();
524 if self.config.diagnostics.is_some() {
525 layers.push(LayerInfo {
526 name: "diagnostics-tile".to_string(),
527 geometry_type: Some("line".to_string()),
528 style: Some(json!({"paint": {
529 "line-color": "rgba(196, 43, 43, 0.81)",
530 "line-width": [
531 "interpolate",
532 ["linear"],
533 ["get", "layer-total-percent"],
534 0, 1,
535 100, 50
536 ],
537 }})),
538 });
539 layers.push(LayerInfo {
540 name: "diagnostics-label".to_string(),
541 geometry_type: Some("symbol".to_string()),
542 style: Some(json!({
543 "layout": {"text-field": "{zxy}"},
544 "paint": {
545 "text-color": "rgba(196, 43, 43, 1)",
546 "text-halo-width": 2,
547 "text-halo-color": "rgba(255, 255, 255, 1)"
548 }
549 })),
550 });
551 }
552 Ok(layers)
553 }
554}
555
556fn column_info(col: &PgColumn, layer_name: &str) -> FieldTypeInfo {
557 let pg_type = col.type_info().name();
558 if [
561 "VARCHAR",
562 "TEXT",
563 "CHAR_ARRAY",
564 "FLOAT4",
565 "FLOAT8",
566 "INT2",
567 "INT4",
568 "INT8",
569 "BOOL",
570 ]
571 .contains(&pg_type)
572 {
573 FieldTypeInfo::Property(col.type_info().clone())
574 } else if ["NUMERIC"].contains(&pg_type) {
575 warn!(
576 "Layer `{layer_name}`: Converting column `{}` with type `{}` to supported type",
577 col.name(),
578 col.type_info()
579 );
580 FieldTypeInfo::Property(col.type_info().clone())
581 } else if ["geometry", "geography"].contains(&pg_type) {
582 FieldTypeInfo::Geometry
583 } else {
584 warn!(
585 "Layer `{layer_name}`: Type `{}` of column `{}` not supported",
586 col.type_info(),
587 col.name()
588 );
589 FieldTypeInfo::Ignored
590 }
591}
592
593fn column_value(row: &PgRow, field: &FieldInfo) -> Result<Option<mvt::tile::Value>, sqlx::Error> {
595 let FieldTypeInfo::Property(pg_type) = &field.info else {
596 return Ok(None); };
598 let col = field.name.as_str();
599 let mut mvt_val = mvt::tile::Value::default();
600 match pg_type.name() {
601 "VARCHAR" | "TEXT" | "CHAR_ARRAY" => {
602 mvt_val.string_value = row.try_get::<Option<String>, _>(col)?;
603 }
605 "FLOAT4" => {
606 mvt_val.float_value = row.try_get::<Option<f32>, _>(col)?;
607 }
608 "FLOAT8" => {
609 mvt_val.double_value = row.try_get::<Option<f64>, _>(col)?;
610 }
611 "INT2" => {
612 mvt_val.int_value = row.try_get::<Option<i16>, _>(col)?.map(i16::into);
613 }
614 "INT4" => {
615 mvt_val.int_value = row.try_get::<Option<i32>, _>(col)?.map(i32::into);
616 }
617 "INT8" => {
618 mvt_val.int_value = row.try_get::<Option<i64>, _>(col)?;
619 }
620 "BOOL" => {
621 mvt_val.bool_value = row.try_get::<Option<bool>, _>(col)?;
622 }
623 _ => {}
624 }
625 if mvt_val == mvt::tile::Value::default() {
626 Ok(None)
628 } else {
629 Ok(Some(mvt_val))
630 }
631}
632
633#[cfg(test)]
634mod tests {
635 use super::*;
636 use crate::config::VectorLayerQueryCfg;
637 use bbox_core::config::DsPostgisCfg;
638 use bbox_core::pg_ds::PgDatasource;
639 use test_log::test;
640 use tile_grid::tms;
641
642 async fn pg_source(query: Option<String>) -> PgSource {
648 let ds_cfg = DsPostgisCfg {
649 url: "postgresql://mvtbench:mvtbench@127.0.0.1:5439/mvtbench".to_string(),
650 };
651 let mut queries = Vec::new();
652 if let Some(sql) = query {
653 queries.push(VectorLayerQueryCfg {
654 minzoom: Some(0),
655 maxzoom: None,
656 simplify: None,
657 tolerance: None,
658 sql: Some(sql),
659 });
660 }
661 let layer = VectorLayerCfg {
662 name: "layer1".to_string(),
663 geometry_field: Some("wkb_geometry".to_string()),
664 geometry_type: None,
665 srid: Some(3857),
666 no_transform: false,
667 fid_field: None,
668 table_name: Some("ne_10m_rivers_lake_centerlines".to_string()),
669 query_limit: None,
670 queries,
671 minzoom: None,
672 maxzoom: None,
673 tile_size: 4096,
674 simplify: false,
675 tolerance: "!pixel_width!/2".to_string(),
676 buffer_size: Some(0),
677 make_valid: false,
678 shift_longitude: false,
679 };
680 let pg_src_cfg = PostgisSourceParamsCfg {
681 datasource: None,
682 extent: None,
683 center: None,
684 start_zoom: None,
685 attribution: None,
686 postgis2: false,
687 diagnostics: None,
688 layers: vec![layer],
689 };
690 let tms = tms().lookup("WebMercatorQuad").unwrap();
691 let ts_grids = vec![TileSetGrid {
692 tms,
693 minzoom: 0,
694 maxzoom: 24,
695 }];
696 let ds = PgDatasource::from_config(&ds_cfg, None).await.unwrap();
697 PgSource::create(&ds, &pg_src_cfg, &ts_grids, &Vec::new()).await
698 }
699
700 #[test(tokio::test)]
701 #[ignore]
702 async fn tile_query() {
703 let pg = pg_source(None).await;
704 let layer = pg.layers.get("layer1").unwrap();
705 let tms = tms().lookup("WebMercatorQuad").unwrap();
706 let tile = Xyz::new(0, 0, 0);
707 let query_info = layer.query(tms.srid(), tile.z).unwrap();
708 let extent = tms.xy_bounds(&tile);
709 let filter = FilterParams::default();
710 let query = layer_query(layer, query_info, &tile, &tms, &extent, &filter).unwrap();
711 let rows = query.fetch_all(&pg.ds.pool).await.unwrap();
712 assert_eq!(rows.len(), 1473);
713 }
714
715 #[test(tokio::test)]
716 #[ignore]
717 async fn country_geoms() {
718 let pg = pg_source(Some("SELECT wkb_geometry, adm0_a3, mapcolor7 FROM ne_10m_admin_0_countries WHERE sov_a3 IN ('BRA', 'ARG')".to_string())).await;
725 let layer = pg.layers.get("layer1").unwrap();
726 let tms = tms().lookup("WebMercatorQuad").unwrap();
727 let tile = Xyz::new(0, 0, 0);
728 let query_info = layer.query(tms.srid(), tile.z).unwrap();
729 let extent = tms.xy_bounds(&tile);
730 let filter = FilterParams::default();
731 let query = layer_query(layer, query_info, &tile, &tms, &extent, &filter).unwrap();
732 let rows = query.fetch_all(&pg.ds.pool).await.unwrap();
733 assert_eq!(rows.len(), 2);
734 let geoms = rows
738 .iter()
739 .map(|row| row.try_get::<Option<wkb::Ewkb>, _>("wkb_geometry").unwrap())
740 .collect::<Vec<_>>();
741 assert_eq!(geoms.len(), 2);
742 assert!(geoms[0].is_some());
743 assert!(geoms[1].is_some());
744 }
745}