use std::collections::{BTreeMap, HashMap, HashSet};
use futures::pin_mut;
use martin_core::tiles::postgres::PostgresError::PostgresError;
use martin_core::tiles::postgres::{PostgresPool, PostgresResult, PostgresSqlInfo};
use martin_tile_utils::EARTH_CIRCUMFERENCE_DEGREES;
use postgis::ewkb;
use postgres_protocol::escape::{escape_identifier, escape_literal};
use serde_json::Value;
use tilejson::Bounds;
use tokio::time::timeout;
use tracing::{debug, warn};
use crate::config::args::{BoundsCalcType, DEFAULT_BOUNDS_TIMEOUT};
use crate::config::file::postgres::{PostgresInfo as _, TableInfo};
pub type SqlTableInfoMapMapMap = BTreeMap<String, BTreeMap<String, BTreeMap<String, TableInfo>>>;
const DEFAULT_EXTENT: u32 = 4096;
const DEFAULT_BUFFER: u32 = 64;
const DEFAULT_CLIP_GEOM: bool = true;
pub async fn query_available_tables(
pool: &PostgresPool,
restrict_to_tables: Option<HashSet<(String, String)>>,
) -> PostgresResult<SqlTableInfoMapMapMap> {
let rows = pool
.get()
.await?
.query(include_str!("scripts/query_available_tables.sql"), &[])
.await
.map_err(|e| PostgresError(e, "querying available tables"))?;
let mut res = SqlTableInfoMapMapMap::new();
for row in &rows {
let schema: String = row.get("schema");
let table: String = row.get("name");
if let Some(ref table_names) = restrict_to_tables
&& !table_names.contains(&(schema.to_lowercase(), table.to_lowercase()))
{
continue;
}
let tilejson = if let Some(text) = row.get("description") {
match serde_json::from_str::<Value>(text) {
Ok(v) => Some(v),
Err(e) => {
warn!(
"Unable to deserialize SQL comment on {schema}.{table} as tilejson, the automatically generated tilejson would be used: {e}"
);
None
}
}
} else {
debug!(
"Unable to find a SQL comment on {schema}.{table}, the tilejson would be generated automatically"
);
None
};
let info = TableInfo {
schema,
table,
geometry_column: row.get("geom"),
geometry_index: row.get("geom_idx"),
relkind: row
.get::<_, Option<i8>>("relkind")
.and_then(|r| u8::try_from(r).ok().map(char::from)),
srid: row.get("srid"), geometry_type: row.get("type"),
properties: Some(serde_json::from_value(row.get("properties")).unwrap()),
tilejson,
..Default::default()
};
if info.geometry_index == Some(false) && info.relkind != Some('v') {
warn!(
"Table {}.{} has no spatial index on column {}",
info.schema, info.table, info.geometry_column
);
}
if let Some(v) = res
.entry(info.schema.clone())
.or_default()
.entry(info.table.clone())
.or_default()
.insert(info.geometry_column.clone(), info)
{
warn!("Unexpected duplicate table {}", v.format_id());
}
}
Ok(res)
}
fn escape_with_alias(mapping: &HashMap<String, String>, field: &str) -> String {
let column = mapping.get(field).map_or(field, |v| v.as_str());
if field == column {
format!(", {}", escape_identifier(column))
} else {
format!(
", {} AS {}",
escape_identifier(column),
escape_identifier(field),
)
}
}
pub async fn table_to_query(
id: String,
mut info: TableInfo,
pool: PostgresPool,
bounds_type: BoundsCalcType,
max_feature_count: Option<usize>,
) -> PostgresResult<(String, PostgresSqlInfo, TableInfo)> {
let schema = escape_identifier(&info.schema);
let table = escape_identifier(&info.table);
let geometry_column = escape_identifier(&info.geometry_column);
let srid = info.srid;
if info.bounds.is_none() {
match bounds_type {
BoundsCalcType::Skip => {}
BoundsCalcType::Calc => {
debug!("Computing {} table bounds for {id}", info.format_id());
info.bounds = calc_bounds(&pool, &schema, &table, &geometry_column, srid).await?;
}
BoundsCalcType::Quick => {
debug!(
"Computing {} table bounds with {}s timeout for {id}",
info.format_id(),
DEFAULT_BOUNDS_TIMEOUT.as_secs()
);
let bounds = calc_bounds(&pool, &schema, &table, &geometry_column, srid);
pin_mut!(bounds);
if let Ok(bounds) = timeout(DEFAULT_BOUNDS_TIMEOUT, &mut bounds).await {
info.bounds = bounds?;
} else {
warn!(
"Timeout computing {} bounds for {id}, aborting query. Use --auto-bounds=calc to wait until complete, or check the table for missing indices.",
info.format_id(),
);
}
}
}
if let Some(bounds) = info.bounds {
debug!(
"The computed bounds for {id} from {} are {bounds}",
info.format_id()
);
}
}
let properties = if let Some(props) = &info.properties {
props
.keys()
.map(|column| escape_with_alias(&info.prop_mapping, column))
.collect::<String>()
} else {
String::new()
};
let (id_name, id_field) = if let Some(id_column) = &info.id_column {
(
format!(", {}", escape_literal(id_column)),
escape_with_alias(&info.prop_mapping, id_column),
)
} else {
(String::new(), String::new())
};
let extent = info.extent.unwrap_or(DEFAULT_EXTENT);
let buffer = info.buffer.unwrap_or(DEFAULT_BUFFER);
let margin = f64::from(buffer) / f64::from(extent);
let bbox_search = if buffer == 0 {
format!("ST_Transform(ST_TileEnvelope($1::integer, $2::integer, $3::integer), {srid})")
} else if pool.supports_tile_margin() && srid == 3857 {
format!(
"ST_Transform(ST_TileEnvelope($1::integer, $2::integer, $3::integer, margin => {margin}), {srid})"
)
} else if srid == 4326 {
format!(
"ST_Expand(ST_Transform(ST_TileEnvelope($1::integer, $2::integer, $3::integer), {srid}), ({margin} * {EARTH_CIRCUMFERENCE_DEGREES}) / 2^$1::integer)"
)
} else {
format!("ST_Transform(ST_TileEnvelope($1::integer, $2::integer, $3::integer), {srid})")
};
let limit_clause = max_feature_count.map_or(String::new(), |v| format!("LIMIT {v}"));
let layer_id = escape_literal(info.layer_id.as_ref().unwrap_or(&id));
let clip_geom = info.clip_geom.unwrap_or(DEFAULT_CLIP_GEOM);
let query = format!(
r"
SELECT
ST_AsMVT(tile, {layer_id}, {extent}, 'geom'{id_name})
FROM (
SELECT
ST_AsMVTGeom(
ST_Transform(ST_CurveToLine({geometry_column}::geometry), 3857),
ST_TileEnvelope($1::integer, $2::integer, $3::integer),
{extent}, {buffer}, {clip_geom}
) AS geom
{id_field}{properties}
FROM
{schema}.{table}
WHERE
{geometry_column} && {bbox_search}
{limit_clause}
) AS tile;
"
)
.trim()
.to_string();
Ok((
id,
PostgresSqlInfo::new(query, false, info.format_id()),
info,
))
}
async fn calc_bounds(
pool: &PostgresPool,
schema: &str,
table: &str,
geometry_column: &str,
srid: i32,
) -> PostgresResult<Option<Bounds>> {
Ok(pool.get()
.await?
.query_one(&format!(
r"
WITH real_bounds AS (SELECT ST_SetSRID(ST_Extent({geometry_column}::geometry), {srid}) AS rb FROM {schema}.{table})
SELECT ST_Transform(
CASE
WHEN (SELECT ST_GeometryType(rb) FROM real_bounds LIMIT 1) IN ('ST_Point', 'ST_LineString')
THEN ST_SetSRID(ST_Extent(ST_Expand({geometry_column}::geometry, 1)), {srid})
ELSE (SELECT * FROM real_bounds)
END,
4326
) AS bounds
FROM {schema}.{table};
"), &[])
.await
.map_err(|e| PostgresError(e, "querying table bounds"))?
.get::<_, Option<ewkb::Polygon>>("bounds")
.and_then(|p| polygon_to_bbox(&p)))
}
#[must_use]
pub fn polygon_to_bbox(polygon: &ewkb::Polygon) -> Option<Bounds> {
use postgis::{LineString as _, Point as _, Polygon as _};
polygon.rings().next().and_then(|linestring| {
let mut points = linestring.points();
if let (Some(bottom_left), Some(top_right)) = (points.next(), points.nth(1)) {
Some(Bounds::new(
bottom_left.x(),
bottom_left.y(),
top_right.x(),
top_right.y(),
))
} else {
None
}
})
}