martin_core/tiles/pmtiles/
source.rs1use std::convert::identity;
4use std::fmt::{Debug, Formatter};
5use std::io;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::AtomicUsize;
8use std::sync::atomic::Ordering::SeqCst;
9use std::sync::{Arc, LazyLock};
10
11use async_trait::async_trait;
12use log::{trace, warn};
13use martin_tile_utils::{Encoding, Format, TileCoord, TileData, TileInfo};
14use pmtiles::aws_sdk_s3::Client as S3Client;
15use pmtiles::aws_sdk_s3::config::Builder as S3ConfigBuilder;
16use pmtiles::reqwest::Client;
17use pmtiles::{
18 AsyncPmTilesReader, AwsS3Backend, Compression, DirCacheResult, Directory, DirectoryCache,
19 HttpBackend, MmapBackend, TileId, TileType,
20};
21use tilejson::TileJSON;
22use url::Url;
23
24use crate::cache::{CacheKey, CacheValue, OptMainCache};
25use crate::get_cached_value;
26use crate::tiles::pmtiles::PmtilesError::{self, InvalidMetadata, InvalidUrlMetadata};
27use crate::tiles::{BoxedSource, MartinCoreResult, Source, UrlQuery};
28
29#[derive(Clone, Debug)]
31pub struct PmtCache {
32 id: usize,
36 cache: OptMainCache,
40}
41
42impl From<OptMainCache> for PmtCache {
43 fn from(cache: OptMainCache) -> Self {
44 static NEXT_CACHE_ID: LazyLock<AtomicUsize> = LazyLock::new(|| AtomicUsize::new(0));
45
46 Self {
47 id: NEXT_CACHE_ID.fetch_add(1, SeqCst),
48 cache,
49 }
50 }
51}
52
53impl DirectoryCache for PmtCache {
54 async fn get_dir_entry(&self, offset: usize, tile_id: TileId) -> DirCacheResult {
55 if let Some(dir) = get_cached_value!(&self.cache, CacheValue::PmtDirectory, {
56 CacheKey::PmtDirectory(self.id, offset)
57 }) {
58 dir.find_tile_id(tile_id).into()
59 } else {
60 DirCacheResult::NotCached
61 }
62 }
63
64 async fn insert_dir(&self, offset: usize, directory: Directory) {
65 if let Some(cache) = &self.cache {
66 cache
67 .insert(
68 CacheKey::PmtDirectory(self.id, offset),
69 CacheValue::PmtDirectory(directory),
70 )
71 .await;
72 }
73 }
74}
75
76macro_rules! impl_pmtiles_source {
77 ($name: ident, $backend: ty, $path: ty, $display_path: path, $err: ident, $concurrent: expr $(,)?) => {
78 #[derive(Clone)]
80 pub struct $name {
81 id: String,
82 path: $path,
83 pmtiles: Arc<AsyncPmTilesReader<$backend, PmtCache>>,
84 tilejson: TileJSON,
85 tile_info: TileInfo,
86 }
87
88 impl Debug for $name {
89 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
90 write!(
91 f,
92 "{} {{ id: {}, path: {:?} }}",
93 stringify!($name),
94 self.id,
95 self.path
96 )
97 }
98 }
99
100 impl $name {
101 async fn new_int(
102 id: String,
103 path: $path,
104 reader: AsyncPmTilesReader<$backend, PmtCache>,
105 ) -> Result<Self, PmtilesError> {
106 let hdr = &reader.get_header();
107
108 if hdr.tile_type != TileType::Mvt && hdr.tile_compression != Compression::None {
109 return Err(PmtilesError::from($err(
110 format!(
111 "Format {:?} and compression {:?} are not yet supported",
112 hdr.tile_type, hdr.tile_compression
113 ),
114 path,
115 )));
116 }
117
118 let format = match hdr.tile_type {
119 TileType::Mvt => TileInfo::new(
120 Format::Mvt,
121 match hdr.tile_compression {
122 Compression::None => Encoding::Uncompressed,
123 Compression::Unknown => {
124 warn!(
125 "MVT tiles have unknown compression in file {}",
126 $display_path(&path)
127 );
128 Encoding::Uncompressed
129 }
130 Compression::Gzip => Encoding::Gzip,
131 Compression::Brotli => Encoding::Brotli,
132 Compression::Zstd => Encoding::Zstd,
133 },
134 ),
135 TileType::Png => Format::Png.into(),
137 TileType::Jpeg => Format::Jpeg.into(),
138 TileType::Webp => Format::Webp.into(),
139 TileType::Unknown => {
140 return Err($err("Unknown tile type".to_string(), path));
141 }
142 };
143
144 let tilejson = reader.parse_tilejson(Vec::new()).await.unwrap_or_else(|e| {
145 warn!(
146 "{e:?}: Unable to parse metadata for {}",
147 $display_path(&path)
148 );
149 hdr.get_tilejson(Vec::new())
150 });
151
152 Ok(Self {
153 id,
154 path,
155 pmtiles: Arc::new(reader),
156 tilejson,
157 tile_info: format,
158 })
159 }
160 }
161
162 #[async_trait]
163 impl Source for $name {
164 fn get_id(&self) -> &str {
165 &self.id
166 }
167
168 fn get_tilejson(&self) -> &TileJSON {
169 &self.tilejson
170 }
171
172 fn get_tile_info(&self) -> TileInfo {
173 self.tile_info
174 }
175
176 fn clone_source(&self) -> BoxedSource {
177 Box::new(self.clone())
178 }
179
180 fn get_version(&self) -> Option<String> {
181 self.tilejson.version.clone()
182 }
183
184 fn benefits_from_concurrent_scraping(&self) -> bool {
185 $concurrent
186 }
187
188 async fn get_tile(
189 &self,
190 xyz: TileCoord,
191 _url_query: Option<&UrlQuery>,
192 ) -> MartinCoreResult<TileData> {
193 if let Some(t) = self
195 .pmtiles
196 .get_tile(
197 pmtiles::TileCoord::new(xyz.z, xyz.x, xyz.y)
198 .map_err(|e| PmtilesError::PmtError(e))?,
199 )
200 .await
201 .map_err(|e| {
202 PmtilesError::PmtErrorWithCtx(e, $display_path(&self.path).to_string())
203 })?
204 {
205 Ok(t.to_vec())
206 } else {
207 trace!(
208 "Couldn't find tile data in {}/{}/{} of {}",
209 xyz.z, xyz.x, xyz.y, &self.id
210 );
211 Ok(Vec::new())
212 }
213 }
214 }
215 };
216}
217
218impl_pmtiles_source!(
219 PmtHttpSource,
220 HttpBackend,
221 Url,
222 identity,
223 InvalidUrlMetadata,
224 true,
226);
227
228impl PmtHttpSource {
229 pub async fn new(cache: PmtCache, id: String, url: Url) -> Result<Self, PmtilesError> {
231 static CLIENT: LazyLock<Client> = LazyLock::new(Client::new);
232
233 let reader =
234 AsyncPmTilesReader::new_with_cached_url(cache, CLIENT.clone(), url.clone()).await;
235 let reader = reader.map_err(|e| PmtilesError::PmtErrorWithCtx(e, url.to_string()))?;
236
237 Self::new_int(id, url, reader).await
238 }
239}
240
241impl_pmtiles_source!(
242 PmtS3Source,
243 AwsS3Backend,
244 Url,
245 identity,
246 InvalidUrlMetadata,
247 true,
249);
250
251impl PmtS3Source {
252 pub async fn new(
254 cache: PmtCache,
255 id: String,
256 url: Url,
257 skip_credentials: bool,
258 force_path_style: bool,
259 ) -> Result<Self, PmtilesError> {
260 let mut aws_config_builder = aws_config::from_env();
261 if skip_credentials {
262 aws_config_builder = aws_config_builder.no_credentials();
263 }
264 let aws_config = aws_config_builder.load().await;
265
266 let s3_config = S3ConfigBuilder::from(&aws_config)
267 .force_path_style(force_path_style)
268 .build();
269 let client = S3Client::from_conf(s3_config);
270
271 let bucket = url
272 .host_str()
273 .ok_or_else(|| PmtilesError::S3BucketNameNotString(url.clone()))?
274 .to_string();
275
276 let key = url.path()[1..].to_string();
278
279 let reader =
280 AsyncPmTilesReader::new_with_cached_client_bucket_and_path(cache, client, bucket, key)
281 .await
282 .map_err(|e| PmtilesError::PmtErrorWithCtx(e, url.to_string()))?;
283
284 Self::new_int(id, url, reader).await
285 }
286}
287
288impl_pmtiles_source!(
289 PmtFileSource,
290 MmapBackend,
291 PathBuf,
292 Path::display,
293 InvalidMetadata,
294 false,
296);
297
298impl PmtFileSource {
299 pub async fn new(cache: PmtCache, id: String, path: PathBuf) -> Result<Self, PmtilesError> {
301 let backend = MmapBackend::try_from(path.as_path())
302 .await
303 .map_err(|e| io::Error::other(format!("{e:?}: Cannot open file {}", path.display())))
304 .map_err(|e| PmtilesError::IoError(e, path.clone()))?;
305
306 let reader = AsyncPmTilesReader::try_from_cached_source(backend, cache).await;
307 let reader = reader
308 .map_err(|e| io::Error::other(format!("{e:?}: Cannot open file {}", path.display())))
309 .map_err(|e| PmtilesError::IoError(e, path.clone()))?;
310
311 Self::new_int(id, path, reader).await
312 }
313}