use std::collections::BTreeMap;
use std::path::PathBuf;
use std::time::Duration;
use futures::stream::TryStreamExt as _;
use martin_core::tiles::BoxedSource;
use object_store::ObjectStore as _;
use url::Url;
use crate::MartinResult;
use crate::config::file::file_config::is_remote_url;
use crate::config::file::pmtiles::PmtConfig;
use crate::config::file::process::ProcessConfig;
use crate::config::file::tiles::discovery::{Discovery, Version};
use crate::config::file::{
CachePolicy, ConfigFileError, FileConfigEnum, TileSourceConfiguration as _,
};
use crate::config::primitives::{IdResolver, OptOneMany};
const PMTILES_EXT_DOT: &str = ".pmtiles";
pub struct ObjectStoreDiscovery {
remote_prefixes: Vec<Url>,
id_resolver: IdResolver,
config: PmtConfig,
process: ProcessConfig,
}
impl ObjectStoreDiscovery {
#[must_use]
pub fn from_config(
config: &FileConfigEnum<PmtConfig>,
id_resolver: IdResolver,
process: ProcessConfig,
) -> Self {
let mut remote_prefixes: Vec<Url> = vec![];
let mut collect = |path: &PathBuf| {
if !is_remote_url(path) {
return;
}
let Some(url) = path.to_str().and_then(|s| Url::parse(s).ok()) else {
tracing::warn!(
"remote URL prefix {:?} could not be parsed as URL; skipping",
path
);
return;
};
remote_prefixes.push(url);
};
match config {
FileConfigEnum::Config(cfg) => match &cfg.paths {
OptOneMany::One(path) => collect(path),
OptOneMany::Many(paths) => paths.iter().for_each(&mut collect),
OptOneMany::NoVals => {}
},
FileConfigEnum::Path(path) => collect(path),
FileConfigEnum::Paths(paths) => paths.iter().for_each(collect),
FileConfigEnum::None => {}
}
remote_prefixes.sort_by(|a, b| a.as_str().cmp(b.as_str()));
remote_prefixes.dedup();
let pmt_config = match config {
FileConfigEnum::Config(cfg) => cfg.custom.clone(),
_ => PmtConfig::default(),
};
Self {
remote_prefixes,
id_resolver,
config: pmt_config,
process,
}
}
#[must_use]
pub fn remote_prefixes(&self) -> &[Url] {
&self.remote_prefixes
}
#[must_use]
pub fn reload_interval(&self) -> Duration {
self.config.reload_interval
}
}
impl Discovery for ObjectStoreDiscovery {
type Args = Url;
async fn discover(&self) -> MartinResult<BTreeMap<String, (Version, Self::Args)>> {
let mut out: BTreeMap<String, (Version, Url)> = BTreeMap::new();
for prefix in &self.remote_prefixes {
match list_remote_prefix(prefix, &self.config.options, &self.id_resolver).await {
Ok(entries) => {
for (id, url, version) in entries {
out.insert(id, (version, url));
}
}
Err(e) => {
tracing::warn!(
"PmtilesReloader: list failed for {prefix}: {e:?}; skipping prefix this tick"
);
}
}
}
Ok(out)
}
async fn build(&self, id: &str, args: &Self::Args) -> MartinResult<BoxedSource> {
self.config
.new_sources_url(id.to_string(), args.clone(), CachePolicy::default())
.await
}
fn process(&self) -> ProcessConfig {
self.process.clone()
}
}
fn version_from_meta(meta: &object_store::ObjectMeta) -> Version {
if let Some(etag) = &meta.e_tag {
Version::Tracked(xxhash_rust::xxh3::xxh3_128(etag.as_bytes()))
} else {
u128::try_from(meta.last_modified.timestamp_millis())
.map_or(Version::Opaque, Version::Tracked)
}
}
async fn list_remote_prefix(
prefix: &Url,
options: &std::collections::HashMap<String, String>,
id_resolver: &IdResolver,
) -> MartinResult<Vec<(String, Url, Version)>> {
let (store, base) = object_store::parse_url_opts(prefix, options)
.map_err(|e| ConfigFileError::ObjectStoreUrlParsing(e, prefix.to_string()))?;
let mut out = Vec::new();
let mut stream = store.list(Some(&base));
while let Some(meta) = stream
.try_next()
.await
.map_err(|e| ConfigFileError::ObjectStoreList(e, prefix.to_string()))?
{
if !meta.location.as_ref().ends_with(PMTILES_EXT_DOT) {
continue;
}
let stem = meta
.location
.filename()
.and_then(|f| f.strip_suffix(PMTILES_EXT_DOT))
.unwrap_or("_unknown");
let object_url_str = format!(
"{}://{}/{}",
prefix.scheme(),
prefix.host_str().unwrap_or(""),
meta.location
);
let Ok(object_url) = Url::parse(&object_url_str) else {
tracing::warn!("cannot build absolute URL from {object_url_str}");
continue;
};
let version = version_from_meta(&meta);
let id = id_resolver.resolve(stem, object_url.to_string());
out.push((id, object_url, version));
}
Ok(out)
}