use async_trait::async_trait;
use deadpool_postgres::tokio_postgres::types::{ToSql, Type};
use martin_tile_utils::{TileCoord, TileData, TileInfo};
use tilejson::TileJSON;
use tracing::debug;
use crate::CacheZoomRange;
use crate::tiles::postgres::PostgresError::{
GetTileError, GetTileWithQueryError, PrepareQueryError,
};
use crate::tiles::postgres::PostgresPool;
use crate::tiles::postgres::utils::query_to_json;
use crate::tiles::{BoxedSource, MartinCoreResult, Source, UrlQuery};
#[derive(Clone, Debug)]
pub struct PostgresSource {
id: String,
info: PostgresSqlInfo,
pool: PostgresPool,
tilejson: TileJSON,
tile_info: TileInfo,
cache_zoom: CacheZoomRange,
}
impl PostgresSource {
#[must_use]
pub fn new(
id: String,
info: PostgresSqlInfo,
tilejson: TileJSON,
pool: PostgresPool,
tile_info: TileInfo,
cache_zoom: CacheZoomRange,
) -> Self {
Self {
id,
info,
pool,
tilejson,
tile_info,
cache_zoom,
}
}
}
#[async_trait]
impl Source for PostgresSource {
fn get_id(&self) -> &str {
&self.id
}
fn get_tilejson(&self) -> &TileJSON {
&self.tilejson
}
fn get_tile_info(&self) -> TileInfo {
self.tile_info
}
fn clone_source(&self) -> BoxedSource {
Box::new(self.clone())
}
fn support_url_query(&self) -> bool {
self.info.use_url_query
}
fn benefits_from_concurrent_scraping(&self) -> bool {
true
}
fn cache_zoom(&self) -> CacheZoomRange {
self.cache_zoom
}
async fn get_tile(
&self,
xyz: TileCoord,
url_query: Option<&UrlQuery>,
) -> MartinCoreResult<TileData> {
let conn = self.pool.get().await?;
let param_types: &[Type] = if self.support_url_query() {
&[Type::INT2, Type::INT8, Type::INT8, Type::JSON]
} else {
&[Type::INT2, Type::INT8, Type::INT8]
};
let sql = &self.info.sql_query;
let prep_query = conn
.prepare_typed_cached(sql, param_types)
.await
.map_err(|e| {
PrepareQueryError(
e,
self.id.clone(),
self.info.signature.clone(),
self.info.sql_query.clone(),
)
})?;
let tile = if self.support_url_query() {
let json = query_to_json(url_query);
debug!("SQL: {sql} [{xyz}, {json:?}]");
let params: &[&(dyn ToSql + Sync)] = &[
&i16::from(xyz.z),
&i64::from(xyz.x),
&i64::from(xyz.y),
&json,
];
conn.query_opt(&prep_query, params).await
} else {
debug!("SQL: {sql} [{xyz}]");
conn.query_opt(
&prep_query,
&[&i16::from(xyz.z), &i64::from(xyz.x), &i64::from(xyz.y)],
)
.await
};
let tile = tile
.map(|row| row.and_then(|r| r.get::<_, Option<TileData>>(0)))
.map_err(|e| {
if self.support_url_query() {
GetTileWithQueryError(e, self.id.clone(), xyz, url_query.cloned())
} else {
GetTileError(e, self.id.clone(), xyz)
}
})?
.unwrap_or_default();
Ok(tile)
}
}
#[derive(Clone, Debug)]
pub struct PostgresSqlInfo {
pub sql_query: String,
pub use_url_query: bool,
pub signature: String,
}
impl PostgresSqlInfo {
#[must_use]
pub fn new(query: String, has_query_params: bool, signature: String) -> Self {
Self {
sql_query: query,
use_url_query: has_query_params,
signature,
}
}
}