martin_core/tiles/postgres/
source.rs1use async_trait::async_trait;
2use deadpool_postgres::tokio_postgres::types::{ToSql, Type};
3use log::debug;
4use martin_tile_utils::Encoding::Uncompressed;
5use martin_tile_utils::Format::Mvt;
6use martin_tile_utils::{TileCoord, TileData, TileInfo};
7use tilejson::TileJSON;
8
9use crate::tiles::postgres::PostgresError::{
10 GetTileError, GetTileWithQueryError, PrepareQueryError,
11};
12use crate::tiles::postgres::PostgresPool;
13use crate::tiles::postgres::utils::query_to_json;
14use crate::tiles::{BoxedSource, MartinCoreResult, Source, UrlQuery};
15
16#[derive(Clone, Debug)]
17pub struct PostgresSource {
19 id: String,
20 info: PostgresSqlInfo,
21 pool: PostgresPool,
22 tilejson: TileJSON,
23}
24
25impl PostgresSource {
26 #[must_use]
28 pub fn new(id: String, info: PostgresSqlInfo, tilejson: TileJSON, pool: PostgresPool) -> Self {
29 Self {
30 id,
31 info,
32 pool,
33 tilejson,
34 }
35 }
36}
37
38#[async_trait]
39impl Source for PostgresSource {
40 fn get_id(&self) -> &str {
41 &self.id
42 }
43
44 fn get_tilejson(&self) -> &TileJSON {
45 &self.tilejson
46 }
47
48 fn get_tile_info(&self) -> TileInfo {
49 TileInfo::new(Mvt, Uncompressed)
50 }
51
52 fn clone_source(&self) -> BoxedSource {
53 Box::new(self.clone())
54 }
55
56 fn support_url_query(&self) -> bool {
57 self.info.use_url_query
58 }
59
60 fn benefits_from_concurrent_scraping(&self) -> bool {
61 true
63 }
64
65 async fn get_tile(
66 &self,
67 xyz: TileCoord,
68 url_query: Option<&UrlQuery>,
69 ) -> MartinCoreResult<TileData> {
70 let conn = self.pool.get().await?;
71 let param_types: &[Type] = if self.support_url_query() {
72 &[Type::INT2, Type::INT8, Type::INT8, Type::JSON]
73 } else {
74 &[Type::INT2, Type::INT8, Type::INT8]
75 };
76
77 let sql = &self.info.sql_query;
78 let prep_query = conn
79 .prepare_typed_cached(sql, param_types)
80 .await
81 .map_err(|e| {
82 PrepareQueryError(
83 e,
84 self.id.to_string(),
85 self.info.signature.to_string(),
86 self.info.sql_query.to_string(),
87 )
88 })?;
89
90 let tile = if self.support_url_query() {
91 let json = query_to_json(url_query);
92 debug!("SQL: {sql} [{xyz}, {json:?}]");
93 let params: &[&(dyn ToSql + Sync)] = &[
94 &i16::from(xyz.z),
95 &i64::from(xyz.x),
96 &i64::from(xyz.y),
97 &json,
98 ];
99 conn.query_opt(&prep_query, params).await
100 } else {
101 debug!("SQL: {sql} [{xyz}]");
102 conn.query_opt(
103 &prep_query,
104 &[&i16::from(xyz.z), &i64::from(xyz.x), &i64::from(xyz.y)],
105 )
106 .await
107 };
108
109 let tile = tile
110 .map(|row| row.and_then(|r| r.get::<_, Option<TileData>>(0)))
111 .map_err(|e| {
112 if self.support_url_query() {
113 GetTileWithQueryError(e, self.id.to_string(), xyz, url_query.cloned())
114 } else {
115 GetTileError(e, self.id.to_string(), xyz)
116 }
117 })?
118 .unwrap_or_default();
119
120 Ok(tile)
121 }
122}
123
124#[derive(Clone, Debug)]
125pub struct PostgresSqlInfo {
127 pub sql_query: String,
129 pub use_url_query: bool,
131 pub signature: String,
133}
134
135impl PostgresSqlInfo {
136 #[must_use]
138 pub fn new(query: String, has_query_params: bool, signature: String) -> Self {
139 Self {
140 sql_query: query,
141 use_url_query: has_query_params,
142 signature,
143 }
144 }
145}