martin_core/tiles/pmtiles/
source.rs

1//! `PMTiles` tile source implementations.
2
3use std::convert::identity;
4use std::fmt::{Debug, Formatter};
5use std::io;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::AtomicUsize;
8use std::sync::atomic::Ordering::SeqCst;
9use std::sync::{Arc, LazyLock};
10
11use async_trait::async_trait;
12use log::{trace, warn};
13use martin_tile_utils::{Encoding, Format, TileCoord, TileData, TileInfo};
14use pmtiles::aws_sdk_s3::Client as S3Client;
15use pmtiles::aws_sdk_s3::config::Builder as S3ConfigBuilder;
16use pmtiles::reqwest::Client;
17use pmtiles::{
18    AsyncPmTilesReader, AwsS3Backend, Compression, DirCacheResult, Directory, DirectoryCache,
19    HttpBackend, MmapBackend, TileId, TileType,
20};
21use tilejson::TileJSON;
22use url::Url;
23
24use crate::cache::{CacheKey, CacheValue, OptMainCache};
25use crate::get_cached_value;
26use crate::tiles::pmtiles::PmtilesError::{self, InvalidMetadata, InvalidUrlMetadata};
27use crate::tiles::{BoxedSource, MartinCoreResult, Source, UrlQuery};
28
29/// [`pmtiles::Directory`] cache for `PMTiles` files.
30#[derive(Clone, Debug)]
31pub struct PmtCache {
32    /// Unique identifier for this cache instance
33    ///
34    /// Uniqueness invariant is guaranteed by how the struct is constructed
35    id: usize,
36    /// Cache storing (id, offset) -> [`pmtiles::Directory`]
37    ///
38    /// Set to [`None`] to disable caching
39    cache: OptMainCache,
40}
41
42impl From<OptMainCache> for PmtCache {
43    fn from(cache: OptMainCache) -> Self {
44        static NEXT_CACHE_ID: LazyLock<AtomicUsize> = LazyLock::new(|| AtomicUsize::new(0));
45
46        Self {
47            id: NEXT_CACHE_ID.fetch_add(1, SeqCst),
48            cache,
49        }
50    }
51}
52
53impl DirectoryCache for PmtCache {
54    async fn get_dir_entry(&self, offset: usize, tile_id: TileId) -> DirCacheResult {
55        if let Some(dir) = get_cached_value!(&self.cache, CacheValue::PmtDirectory, {
56            CacheKey::PmtDirectory(self.id, offset)
57        }) {
58            dir.find_tile_id(tile_id).into()
59        } else {
60            DirCacheResult::NotCached
61        }
62    }
63
64    async fn insert_dir(&self, offset: usize, directory: Directory) {
65        if let Some(cache) = &self.cache {
66            cache
67                .insert(
68                    CacheKey::PmtDirectory(self.id, offset),
69                    CacheValue::PmtDirectory(directory),
70                )
71                .await;
72        }
73    }
74}
75
76macro_rules! impl_pmtiles_source {
77    ($name: ident, $backend: ty, $path: ty, $display_path: path, $err: ident, $concurrent: expr $(,)?) => {
78        /// A source for `PMTiles` files
79        #[derive(Clone)]
80        pub struct $name {
81            id: String,
82            path: $path,
83            pmtiles: Arc<AsyncPmTilesReader<$backend, PmtCache>>,
84            tilejson: TileJSON,
85            tile_info: TileInfo,
86        }
87
88        impl Debug for $name {
89            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
90                write!(
91                    f,
92                    "{} {{ id: {}, path: {:?} }}",
93                    stringify!($name),
94                    self.id,
95                    self.path
96                )
97            }
98        }
99
100        impl $name {
101            async fn new_int(
102                id: String,
103                path: $path,
104                reader: AsyncPmTilesReader<$backend, PmtCache>,
105            ) -> Result<Self, PmtilesError> {
106                let hdr = &reader.get_header();
107
108                if hdr.tile_type != TileType::Mvt && hdr.tile_compression != Compression::None {
109                    return Err(PmtilesError::from($err(
110                        format!(
111                            "Format {:?} and compression {:?} are not yet supported",
112                            hdr.tile_type, hdr.tile_compression
113                        ),
114                        path,
115                    )));
116                }
117
118                let format = match hdr.tile_type {
119                    TileType::Mvt => TileInfo::new(
120                        Format::Mvt,
121                        match hdr.tile_compression {
122                            Compression::None => Encoding::Uncompressed,
123                            Compression::Unknown => {
124                                warn!(
125                                    "MVT tiles have unknown compression in file {}",
126                                    $display_path(&path)
127                                );
128                                Encoding::Uncompressed
129                            }
130                            Compression::Gzip => Encoding::Gzip,
131                            Compression::Brotli => Encoding::Brotli,
132                            Compression::Zstd => Encoding::Zstd,
133                        },
134                    ),
135                    // All these assume uncompressed data (validated above)
136                    TileType::Png => Format::Png.into(),
137                    TileType::Jpeg => Format::Jpeg.into(),
138                    TileType::Webp => Format::Webp.into(),
139                    TileType::Unknown => {
140                        return Err($err("Unknown tile type".to_string(), path));
141                    }
142                };
143
144                let tilejson = reader.parse_tilejson(Vec::new()).await.unwrap_or_else(|e| {
145                    warn!(
146                        "{e:?}: Unable to parse metadata for {}",
147                        $display_path(&path)
148                    );
149                    hdr.get_tilejson(Vec::new())
150                });
151
152                Ok(Self {
153                    id,
154                    path,
155                    pmtiles: Arc::new(reader),
156                    tilejson,
157                    tile_info: format,
158                })
159            }
160        }
161
162        #[async_trait]
163        impl Source for $name {
164            fn get_id(&self) -> &str {
165                &self.id
166            }
167
168            fn get_tilejson(&self) -> &TileJSON {
169                &self.tilejson
170            }
171
172            fn get_tile_info(&self) -> TileInfo {
173                self.tile_info
174            }
175
176            fn clone_source(&self) -> BoxedSource {
177                Box::new(self.clone())
178            }
179
180            fn get_version(&self) -> Option<String> {
181                self.tilejson.version.clone()
182            }
183
184            fn benefits_from_concurrent_scraping(&self) -> bool {
185                $concurrent
186            }
187
188            async fn get_tile(
189                &self,
190                xyz: TileCoord,
191                _url_query: Option<&UrlQuery>,
192            ) -> MartinCoreResult<TileData> {
193                // TODO: optimize to return Bytes
194                if let Some(t) = self
195                    .pmtiles
196                    .get_tile(
197                        pmtiles::TileCoord::new(xyz.z, xyz.x, xyz.y)
198                            .map_err(|e| PmtilesError::PmtError(e))?,
199                    )
200                    .await
201                    .map_err(|e| {
202                        PmtilesError::PmtErrorWithCtx(e, $display_path(&self.path).to_string())
203                    })?
204                {
205                    Ok(t.to_vec())
206                } else {
207                    trace!(
208                        "Couldn't find tile data in {}/{}/{} of {}",
209                        xyz.z, xyz.x, xyz.y, &self.id
210                    );
211                    Ok(Vec::new())
212                }
213            }
214        }
215    };
216}
217
218impl_pmtiles_source!(
219    PmtHttpSource,
220    HttpBackend,
221    Url,
222    identity,
223    InvalidUrlMetadata,
224    // having multiple http requests in flight is beneficial
225    true,
226);
227
228impl PmtHttpSource {
229    /// Creates a new HTTP-based `PMTiles` source.
230    pub async fn new(cache: PmtCache, id: String, url: Url) -> Result<Self, PmtilesError> {
231        static CLIENT: LazyLock<Client> = LazyLock::new(Client::new);
232
233        let reader =
234            AsyncPmTilesReader::new_with_cached_url(cache, CLIENT.clone(), url.clone()).await;
235        let reader = reader.map_err(|e| PmtilesError::PmtErrorWithCtx(e, url.to_string()))?;
236
237        Self::new_int(id, url, reader).await
238    }
239}
240
241impl_pmtiles_source!(
242    PmtS3Source,
243    AwsS3Backend,
244    Url,
245    identity,
246    InvalidUrlMetadata,
247    // having multiple http requests in flight is beneficial
248    true,
249);
250
251impl PmtS3Source {
252    /// Creates a new S3-based `PMTiles` source.
253    pub async fn new(
254        cache: PmtCache,
255        id: String,
256        url: Url,
257        skip_credentials: bool,
258        force_path_style: bool,
259    ) -> Result<Self, PmtilesError> {
260        let mut aws_config_builder = aws_config::from_env();
261        if skip_credentials {
262            aws_config_builder = aws_config_builder.no_credentials();
263        }
264        let aws_config = aws_config_builder.load().await;
265
266        let s3_config = S3ConfigBuilder::from(&aws_config)
267            .force_path_style(force_path_style)
268            .build();
269        let client = S3Client::from_conf(s3_config);
270
271        let bucket = url
272            .host_str()
273            .ok_or_else(|| PmtilesError::S3BucketNameNotString(url.clone()))?
274            .to_string();
275
276        // Strip leading '/' from the key
277        let key = url.path()[1..].to_string();
278
279        let reader =
280            AsyncPmTilesReader::new_with_cached_client_bucket_and_path(cache, client, bucket, key)
281                .await
282                .map_err(|e| PmtilesError::PmtErrorWithCtx(e, url.to_string()))?;
283
284        Self::new_int(id, url, reader).await
285    }
286}
287
288impl_pmtiles_source!(
289    PmtFileSource,
290    MmapBackend,
291    PathBuf,
292    Path::display,
293    InvalidMetadata,
294    // when using local disks, it might not be beneficial to do concurrent calls in martin-cp
295    false,
296);
297
298impl PmtFileSource {
299    /// Creates a new file-based `PMTiles` source.
300    pub async fn new(cache: PmtCache, id: String, path: PathBuf) -> Result<Self, PmtilesError> {
301        let backend = MmapBackend::try_from(path.as_path())
302            .await
303            .map_err(|e| io::Error::other(format!("{e:?}: Cannot open file {}", path.display())))
304            .map_err(|e| PmtilesError::IoError(e, path.clone()))?;
305
306        let reader = AsyncPmTilesReader::try_from_cached_source(backend, cache).await;
307        let reader = reader
308            .map_err(|e| io::Error::other(format!("{e:?}: Cannot open file {}", path.display())))
309            .map_err(|e| PmtilesError::IoError(e, path.clone()))?;
310
311        Self::new_int(id, path, reader).await
312    }
313}