martin_core/tiles/postgres/
source.rs

1use async_trait::async_trait;
2use deadpool_postgres::tokio_postgres::types::{ToSql, Type};
3use log::debug;
4use martin_tile_utils::Encoding::Uncompressed;
5use martin_tile_utils::Format::Mvt;
6use martin_tile_utils::{TileCoord, TileData, TileInfo};
7use tilejson::TileJSON;
8
9use crate::tiles::postgres::PostgresError::{
10    GetTileError, GetTileWithQueryError, PrepareQueryError,
11};
12use crate::tiles::postgres::PostgresPool;
13use crate::tiles::postgres::utils::query_to_json;
14use crate::tiles::{BoxedSource, MartinCoreResult, Source, UrlQuery};
15
16#[derive(Clone, Debug)]
17/// `PostgreSQL` tile source that executes SQL queries to generate tiles.
18pub struct PostgresSource {
19    id: String,
20    info: PostgresSqlInfo,
21    pool: PostgresPool,
22    tilejson: TileJSON,
23}
24
25impl PostgresSource {
26    /// Creates a new `PostgreSQL` tile source.
27    #[must_use]
28    pub fn new(id: String, info: PostgresSqlInfo, tilejson: TileJSON, pool: PostgresPool) -> Self {
29        Self {
30            id,
31            info,
32            pool,
33            tilejson,
34        }
35    }
36}
37
38#[async_trait]
39impl Source for PostgresSource {
40    fn get_id(&self) -> &str {
41        &self.id
42    }
43
44    fn get_tilejson(&self) -> &TileJSON {
45        &self.tilejson
46    }
47
48    fn get_tile_info(&self) -> TileInfo {
49        TileInfo::new(Mvt, Uncompressed)
50    }
51
52    fn clone_source(&self) -> BoxedSource {
53        Box::new(self.clone())
54    }
55
56    fn support_url_query(&self) -> bool {
57        self.info.use_url_query
58    }
59
60    fn benefits_from_concurrent_scraping(&self) -> bool {
61        // pg does not parallelize queries well internally and having more requests in flight is thus beneficial
62        true
63    }
64
65    async fn get_tile(
66        &self,
67        xyz: TileCoord,
68        url_query: Option<&UrlQuery>,
69    ) -> MartinCoreResult<TileData> {
70        let conn = self.pool.get().await?;
71        let param_types: &[Type] = if self.support_url_query() {
72            &[Type::INT2, Type::INT8, Type::INT8, Type::JSON]
73        } else {
74            &[Type::INT2, Type::INT8, Type::INT8]
75        };
76
77        let sql = &self.info.sql_query;
78        let prep_query = conn
79            .prepare_typed_cached(sql, param_types)
80            .await
81            .map_err(|e| {
82                PrepareQueryError(
83                    e,
84                    self.id.to_string(),
85                    self.info.signature.to_string(),
86                    self.info.sql_query.to_string(),
87                )
88            })?;
89
90        let tile = if self.support_url_query() {
91            let json = query_to_json(url_query);
92            debug!("SQL: {sql} [{xyz}, {json:?}]");
93            let params: &[&(dyn ToSql + Sync)] = &[
94                &i16::from(xyz.z),
95                &i64::from(xyz.x),
96                &i64::from(xyz.y),
97                &json,
98            ];
99            conn.query_opt(&prep_query, params).await
100        } else {
101            debug!("SQL: {sql} [{xyz}]");
102            conn.query_opt(
103                &prep_query,
104                &[&i16::from(xyz.z), &i64::from(xyz.x), &i64::from(xyz.y)],
105            )
106            .await
107        };
108
109        let tile = tile
110            .map(|row| row.and_then(|r| r.get::<_, Option<TileData>>(0)))
111            .map_err(|e| {
112                if self.support_url_query() {
113                    GetTileWithQueryError(e, self.id.to_string(), xyz, url_query.cloned())
114                } else {
115                    GetTileError(e, self.id.to_string(), xyz)
116                }
117            })?
118            .unwrap_or_default();
119
120        Ok(tile)
121    }
122}
123
124#[derive(Clone, Debug)]
125/// SQL query information for `PostgreSQL` tile sources.
126pub struct PostgresSqlInfo {
127    /// SQL query string.
128    pub sql_query: String,
129    /// Whether the query uses URL query parameters.
130    pub use_url_query: bool,
131    /// Signature of the query.
132    pub signature: String,
133}
134
135impl PostgresSqlInfo {
136    /// Creates new SQL query information.
137    #[must_use]
138    pub fn new(query: String, has_query_params: bool, signature: String) -> Self {
139        Self {
140            sql_query: query,
141            use_url_query: has_query_params,
142            signature,
143        }
144    }
145}