use std::sync::Arc;
use dashmap::DashMap;
use martin_core::tiles::{BoxedSource, OptTileCache};
use tracing::info;
use crate::MartinResult;
use crate::config::file::OnInvalid;
use crate::reload::ReloadAdvisory;
use crate::source::TileSources;
#[derive(Clone)]
pub struct TileSourceManager {
tile_sources: Arc<DashMap<String, BoxedSource>>,
tile_cache: OptTileCache,
on_invalid: OnInvalid,
}
impl TileSourceManager {
#[must_use]
pub fn new(tile_cache: OptTileCache, on_invalid: OnInvalid) -> Self {
Self {
tile_sources: Arc::new(DashMap::new()),
tile_cache,
on_invalid,
}
}
#[must_use]
pub fn from_sources(
tile_cache: OptTileCache,
on_invalid: OnInvalid,
sources: Vec<Vec<BoxedSource>>,
) -> Self {
let map: DashMap<String, BoxedSource> = sources
.into_iter()
.flatten()
.map(|src| (src.get_id().to_string(), src))
.collect();
Self {
tile_sources: Arc::new(map),
tile_cache,
on_invalid,
}
}
#[must_use]
pub fn tile_sources(&self) -> TileSources {
TileSources::from_dashmap(self.tile_sources.clone())
}
#[must_use]
pub fn tile_cache(&self) -> &OptTileCache {
&self.tile_cache
}
pub async fn apply_changes(&self, advisory: ReloadAdvisory) -> MartinResult<()> {
if advisory.is_empty() {
return Ok(());
}
for new_source in advisory.updates {
match new_source.source {
Ok(src) => {
if let Some(cache) = &self.tile_cache {
cache.invalidate_source(&new_source.id);
}
self.tile_sources.insert(new_source.id.clone(), src);
info!("Updated source: {:?}", new_source.id);
}
Err(err) => match self.on_invalid {
OnInvalid::Abort => return Err(err),
OnInvalid::Warn => {
tracing::warn!("Skipping update for {:?}: {err}", new_source.id);
}
},
}
}
for new_source in advisory.additions {
match new_source.source {
Ok(src) => {
self.tile_sources.insert(new_source.id.clone(), src);
info!("Added source: {:?}", new_source.id);
}
Err(err) => match self.on_invalid {
OnInvalid::Abort => return Err(err),
OnInvalid::Warn => {
tracing::warn!("Skipping addition of {:?}: {err}", new_source.id);
}
},
}
}
for deleted_source in &advisory.removals {
self.tile_sources.remove(&deleted_source.id);
if let Some(cache) = &self.tile_cache {
cache.invalidate_source(&deleted_source.id);
}
info!("Removed source: {:?}", deleted_source.id);
}
if let Some(cache) = &self.tile_cache {
cache.run_pending_tasks().await;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use async_trait::async_trait;
use insta::assert_yaml_snapshot;
use martin_core::CacheZoomRange;
use martin_core::tiles::{MartinCoreResult, Source, TileCache, UrlQuery};
use martin_tile_utils::{Encoding, Format, TileCoord, TileData, TileInfo};
use tilejson::{TileJSON, tilejson};
use super::*;
use crate::reload::{DeletedSource, NewSource};
#[derive(Debug, Clone)]
struct TestSource {
id: String,
tj: TileJSON,
}
#[async_trait]
impl Source for TestSource {
fn get_id(&self) -> &str {
&self.id
}
fn get_tilejson(&self) -> &TileJSON {
&self.tj
}
fn get_tile_info(&self) -> TileInfo {
TileInfo::new(Format::Mvt, Encoding::Uncompressed)
}
fn clone_source(&self) -> BoxedSource {
Box::new(self.clone())
}
fn cache_zoom(&self) -> CacheZoomRange {
CacheZoomRange::default()
}
async fn get_tile(
&self,
_xyz: TileCoord,
_url_query: Option<&UrlQuery>,
) -> MartinCoreResult<TileData> {
Ok(vec![1, 2, 3])
}
}
fn make_manager() -> TileSourceManager {
let cache = TileCache::new(1024 * 1024, None, None); TileSourceManager::new(Some(cache), OnInvalid::Abort)
}
fn new_source(name: &str) -> NewSource {
NewSource {
id: name.to_string(),
source: Ok(Box::new(TestSource {
id: name.to_string(),
tj: tilejson! { tiles: vec![] },
})),
}
}
fn sorted_source_names(mgr: &TileSourceManager) -> Vec<String> {
let mut names = mgr.tile_sources().source_names();
names.sort();
names
}
#[tokio::test]
async fn apply_additions() {
let mgr = make_manager();
let advisory = ReloadAdvisory {
additions: vec![new_source("src_a"), new_source("src_b")],
..Default::default()
};
mgr.apply_changes(advisory).await.unwrap();
assert_yaml_snapshot!(sorted_source_names(&mgr), @"
- src_a
- src_b
");
}
#[tokio::test]
async fn apply_removals() {
let mgr = make_manager();
let add = ReloadAdvisory {
additions: vec![new_source("src_a"), new_source("src_b")],
..Default::default()
};
mgr.apply_changes(add).await.unwrap();
assert_yaml_snapshot!(sorted_source_names(&mgr), @"
- src_a
- src_b
");
let mut removals = std::collections::BTreeSet::new();
removals.insert(DeletedSource {
id: "src_a".to_string(),
});
let remove = ReloadAdvisory {
removals,
..Default::default()
};
mgr.apply_changes(remove).await.unwrap();
assert_yaml_snapshot!(sorted_source_names(&mgr), @"- src_b");
}
#[tokio::test]
async fn apply_updates() {
let mgr = make_manager();
let add = ReloadAdvisory {
additions: vec![new_source("src_a")],
..Default::default()
};
mgr.apply_changes(add).await.unwrap();
let update = ReloadAdvisory {
updates: vec![new_source("src_a")],
..Default::default()
};
mgr.apply_changes(update).await.unwrap();
assert_yaml_snapshot!(sorted_source_names(&mgr), @"- src_a");
}
#[tokio::test]
async fn empty_advisory_is_noop() {
let mgr = make_manager();
mgr.apply_changes(ReloadAdvisory::default()).await.unwrap();
assert_yaml_snapshot!(sorted_source_names(&mgr), @"[]");
}
#[test]
fn from_sources_populates_map() {
let src = Box::new(TestSource {
id: "x".to_string(),
tj: tilejson! { tiles: vec![] },
}) as BoxedSource;
let mgr = TileSourceManager::from_sources(None, OnInvalid::Abort, vec![vec![src]]);
assert_yaml_snapshot!(sorted_source_names(&mgr), @"- x");
assert!(mgr.tile_cache().is_none());
}
#[tokio::test]
async fn apply_changes_without_cache() {
let mgr = TileSourceManager::new(None, OnInvalid::Abort);
let advisory = ReloadAdvisory {
additions: vec![new_source("a")],
..Default::default()
};
mgr.apply_changes(advisory).await.unwrap();
assert_yaml_snapshot!(sorted_source_names(&mgr), @"- a");
}
}