use std::ops::Add as _;
use std::time::Duration;
use futures::future::try_join;
use futures::pin_mut;
use martin_core::tiles::BoxedSource;
use serde::{Deserialize, Serialize};
use tilejson::TileJSON;
use tokio::time::timeout;
use tracing::warn;
use super::{FuncInfoSources, TableInfoSources};
use crate::MartinResult;
use crate::config::args::{BoundsCalcType, DEFAULT_BOUNDS_TIMEOUT};
use crate::config::file::postgres::PostgresAutoDiscoveryBuilder;
use crate::config::file::{
ConfigFileError, ConfigFileResult, ConfigurationLivecycleHooks, TileSourceWarning,
UnrecognizedKeys, UnrecognizedValues, copy_unrecognized_keys_from_config,
};
use crate::config::primitives::{IdResolver, OptBoolObj, OptOneMany};
pub trait PostgresInfo {
fn format_id(&self) -> String;
fn to_tilejson(&self, source_id: String) -> TileJSON;
}
#[serde_with::skip_serializing_none]
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct PostgresSslCerts {
pub ssl_cert: Option<std::path::PathBuf>,
pub ssl_key: Option<std::path::PathBuf>,
pub ssl_root_cert: Option<std::path::PathBuf>,
#[serde(flatten, skip_serializing)]
pub unrecognized: UnrecognizedValues,
}
#[serde_with::skip_serializing_none]
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct PostgresConfig {
pub connection_string: Option<String>,
#[serde(flatten)]
pub ssl_certificates: PostgresSslCerts,
pub default_srid: Option<i32>,
pub auto_bounds: Option<BoundsCalcType>,
pub max_feature_count: Option<usize>,
pub pool_size: Option<usize>,
#[serde(default, skip_serializing_if = "OptBoolObj::is_none")]
pub auto_publish: OptBoolObj<PostgresCfgPublish>,
pub tables: Option<TableInfoSources>,
pub functions: Option<FuncInfoSources>,
#[serde(flatten, skip_serializing)]
pub unrecognized: UnrecognizedValues,
}
pub const POOL_SIZE_DEFAULT: usize = 20;
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct PostgresCfgPublish {
#[serde(alias = "from_schema")]
#[serde(default, skip_serializing_if = "OptOneMany::is_none")]
pub from_schemas: OptOneMany<String>,
#[serde(default, skip_serializing_if = "OptBoolObj::is_none")]
pub tables: OptBoolObj<PostgresCfgPublishTables>,
#[serde(default, skip_serializing_if = "OptBoolObj::is_none")]
pub functions: OptBoolObj<PostgresCfgPublishFuncs>,
#[serde(flatten, skip_serializing)]
pub unrecognized: UnrecognizedValues,
}
impl ConfigurationLivecycleHooks for PostgresCfgPublish {
fn get_unrecognized_keys(&self) -> UnrecognizedKeys {
let mut keys = self
.unrecognized
.keys()
.cloned()
.collect::<UnrecognizedKeys>();
match &self.functions {
OptBoolObj::NoValue | OptBoolObj::Bool(_) => {}
OptBoolObj::Object(o) => keys.extend(
o.get_unrecognized_keys()
.iter()
.map(|k| format!("functions.{k}")),
),
}
match &self.tables {
OptBoolObj::NoValue | OptBoolObj::Bool(_) => {}
OptBoolObj::Object(o) => keys.extend(
o.get_unrecognized_keys()
.iter()
.map(|k| format!("tables.{k}")),
),
}
keys
}
}
#[serde_with::skip_serializing_none]
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct PostgresCfgPublishTables {
#[serde(alias = "from_schema")]
#[serde(default, skip_serializing_if = "OptOneMany::is_none")]
pub from_schemas: OptOneMany<String>,
#[serde(alias = "id_format")]
pub source_id_format: Option<String>,
#[serde(alias = "id_column")]
#[serde(default, skip_serializing_if = "OptOneMany::is_none")]
pub id_columns: OptOneMany<String>,
pub clip_geom: Option<bool>,
pub buffer: Option<u32>,
pub extent: Option<u32>,
#[serde(flatten, skip_serializing)]
pub unrecognized: UnrecognizedValues,
}
impl ConfigurationLivecycleHooks for PostgresCfgPublishTables {
fn get_unrecognized_keys(&self) -> UnrecognizedKeys {
self.unrecognized.keys().cloned().collect()
}
}
#[serde_with::skip_serializing_none]
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct PostgresCfgPublishFuncs {
#[serde(alias = "from_schema")]
#[serde(default, skip_serializing_if = "OptOneMany::is_none")]
pub from_schemas: OptOneMany<String>,
#[serde(alias = "id_format")]
pub source_id_format: Option<String>,
#[serde(flatten, skip_serializing)]
pub unrecognized: UnrecognizedValues,
}
impl ConfigurationLivecycleHooks for PostgresCfgPublishFuncs {
fn get_unrecognized_keys(&self) -> UnrecognizedKeys {
self.unrecognized.keys().cloned().collect()
}
}
impl PostgresConfig {
pub async fn resolve(
&mut self,
id_resolver: IdResolver,
) -> MartinResult<(Vec<BoxedSource>, Vec<TileSourceWarning>)> {
let pg = PostgresAutoDiscoveryBuilder::new(self, id_resolver).await?;
let inst_tables = on_slow(
pg.instantiate_tables(),
DEFAULT_BOUNDS_TIMEOUT.add(Duration::from_secs(1)),
|| {
if pg.auto_bounds() == BoundsCalcType::Skip {
warn!(
"Discovering tables in PostgreSQL database '{}' is taking too long. Bounds calculation is already disabled. You may need to tune your database.",
pg.get_id()
);
} else {
warn!(
"Discovering tables in PostgreSQL database '{}' is taking too long. Make sure your table geo columns have a GIS index, or use '--auto-bounds skip' CLI/config to skip bbox calculation.",
pg.get_id()
);
}
},
);
let ((mut tables, tbl_info, mut tbl_warnings), (funcs, func_info, func_warnings)) =
try_join(inst_tables, pg.instantiate_functions()).await?;
self.tables = Some(tbl_info);
self.functions = Some(func_info);
tables.extend(funcs);
tbl_warnings.extend(func_warnings);
Ok((tables, tbl_warnings))
}
}
impl ConfigurationLivecycleHooks for PostgresConfig {
fn finalize(&mut self) -> ConfigFileResult<()> {
if self.tables.is_none() && self.functions.is_none() && self.auto_publish.is_none() {
self.auto_publish = OptBoolObj::Bool(true);
}
if self.pool_size.is_some_and(|size| size < 1) {
return Err(ConfigFileError::PostgresPoolSizeInvalid);
}
if self.connection_string.is_none() {
return Err(ConfigFileError::PostgresConnectionStringMissing);
}
Ok(())
}
fn get_unrecognized_keys(&self) -> UnrecognizedKeys {
let mut keys = self
.unrecognized
.keys()
.cloned()
.collect::<UnrecognizedKeys>();
if let Some(ref ts) = self.tables {
for (k, v) in ts {
copy_unrecognized_keys_from_config(
&mut keys,
&format!("tables.{k}."),
&v.unrecognized,
);
}
}
if let Some(ref fs) = self.functions {
for (k, v) in fs {
copy_unrecognized_keys_from_config(
&mut keys,
&format!("functions.{k}."),
&v.unrecognized,
);
}
}
keys.extend(
self.ssl_certificates
.unrecognized
.keys()
.map(|k| format!("ssl_certificates.{k}")),
);
match &self.auto_publish {
OptBoolObj::NoValue | OptBoolObj::Bool(_) => {}
OptBoolObj::Object(o) => keys.extend(
o.get_unrecognized_keys()
.iter()
.map(|k| format!("auto_publish.{k}"))
.collect::<UnrecognizedKeys>(),
),
}
keys
}
}
async fn on_slow<T, S: FnOnce()>(
future: impl Future<Output = T>,
duration: Duration,
fn_on_slow: S,
) -> T {
pin_mut!(future);
if let Ok(result) = timeout(duration, &mut future).await {
result
} else {
fn_on_slow();
future.await
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::path::Path;
use indoc::indoc;
use tilejson::Bounds;
use super::*;
use crate::config::file::postgres::{FunctionInfo, TableInfo};
use crate::config::file::{Config, parse_config};
use crate::config::primitives::OptOneMany::{Many, One};
use crate::config::primitives::env::FauxEnv;
pub fn parse_cfg(yaml: &str) -> Config {
parse_config(yaml, &FauxEnv::default(), Path::new("<test>")).unwrap()
}
pub fn assert_config(yaml: &str, expected: &Config) {
let mut config = parse_cfg(yaml);
let res = config.finalize().unwrap();
assert!(res.is_empty(), "unrecognized config: {res:?}");
assert_eq!(&config, expected);
}
#[test]
fn parse_pg_one() {
assert_config(
indoc! {"
postgres:
connection_string: 'postgresql://postgres@localhost/db'
"},
&Config {
postgres: One(PostgresConfig {
connection_string: Some("postgresql://postgres@localhost/db".to_string()),
auto_publish: OptBoolObj::Bool(true),
..Default::default()
}),
..Default::default()
},
);
}
#[test]
fn parse_pg_two() {
assert_config(
indoc! {"
postgres:
- connection_string: 'postgres://postgres@localhost:5432/db'
- connection_string: 'postgresql://postgres@localhost:5433/db'
"},
&Config {
postgres: Many(vec![
PostgresConfig {
connection_string: Some(
"postgres://postgres@localhost:5432/db".to_string(),
),
auto_publish: OptBoolObj::Bool(true),
..Default::default()
},
PostgresConfig {
connection_string: Some(
"postgresql://postgres@localhost:5433/db".to_string(),
),
auto_publish: OptBoolObj::Bool(true),
..Default::default()
},
]),
..Default::default()
},
);
}
#[test]
fn parse_pg_config() {
assert_config(
indoc! {"
postgres:
connection_string: 'postgres://postgres@localhost:5432/db'
default_srid: 4326
pool_size: 20
max_feature_count: 100
tables:
table_source:
schema: public
table: table_source
srid: 4326
geometry_column: geom
id_column: ~
minzoom: 0
maxzoom: 30
bounds: [-180.0, -90.0, 180.0, 90.0]
extent: 2048
buffer: 10
clip_geom: false
geometry_type: GEOMETRY
properties:
gid: int4
functions:
function_zxy_query:
schema: public
function: function_zxy_query
minzoom: 0
maxzoom: 30
bounds: [-180.0, -90.0, 180.0, 90.0]
"},
&Config {
postgres: One(PostgresConfig {
connection_string: Some("postgres://postgres@localhost:5432/db".to_string()),
default_srid: Some(4326),
pool_size: Some(20),
max_feature_count: Some(100),
tables: Some(BTreeMap::from([(
"table_source".to_string(),
TableInfo {
schema: "public".to_string(),
table: "table_source".to_string(),
srid: 4326,
geometry_column: "geom".to_string(),
minzoom: Some(0),
maxzoom: Some(30),
bounds: Some([-180, -90, 180, 90].into()),
extent: Some(2048),
buffer: Some(10),
clip_geom: Some(false),
geometry_type: Some("GEOMETRY".to_string()),
properties: Some(BTreeMap::from([(
"gid".to_string(),
"int4".to_string(),
)])),
..Default::default()
},
)])),
functions: Some(BTreeMap::from([(
"function_zxy_query".to_string(),
FunctionInfo::new_extended(
"public".to_string(),
"function_zxy_query".to_string(),
0,
30,
Bounds::MAX,
),
)])),
..Default::default()
}),
..Default::default()
},
);
}
}