use super::progress_tracker::ProgressTracker;
use super::{Traversal, TraversalTranslationStep, translate_traversals};
use crate::{Tile, TileSource, TilesRuntime, TraversalCache};
use anyhow::Result;
use futures::{StreamExt, future::BoxFuture, stream};
use std::sync::Arc;
use versatiles_core::{TileBBox, TileCoord, TileStream};
pub trait TileSourceTraverseExt: TileSource {
fn traverse_all_tiles<'s, 'a, C>(
&'s self,
traversal_write: &'s Traversal,
mut callback: C,
runtime: TilesRuntime,
progress_message: Option<&str>,
) -> impl core::future::Future<Output = Result<()>> + Send + 'a
where
C: FnMut(TileBBox, TileStream<'a, Tile>) -> BoxFuture<'a, Result<()>> + Send + 'a,
's: 'a,
{
let progress_message = progress_message.unwrap_or("processing tiles").to_string();
async move {
let traversal_steps = translate_traversals(
&self.metadata().bbox_pyramid,
&self.metadata().traversal,
traversal_write,
)?;
use TraversalTranslationStep::{Pop, Push, Stream};
let mut tn_read = 0;
let mut tn_write = 0;
for step in &traversal_steps {
match step {
Push(bboxes_in, _) => {
tn_read += bboxes_in.iter().map(TileBBox::count_tiles).sum::<u64>();
}
Pop(_, bbox_out) => {
tn_write += bbox_out.count_tiles();
}
Stream(bboxes_in, bbox_out) => {
tn_read += bboxes_in.iter().map(TileBBox::count_tiles).sum::<u64>();
tn_write += bbox_out.count_tiles();
}
}
}
let progress = runtime.create_progress(&progress_message, u64::midpoint(tn_read, tn_write));
let tracker = Arc::new(ProgressTracker::new(progress));
let cache = Arc::new(TraversalCache::<(TileCoord, Tile)>::new(runtime.cache_type()));
for step in traversal_steps {
match step {
Push(bboxes, index) => {
log::trace!("Cache {bboxes:?} at index {index}");
let limits = versatiles_core::ConcurrencyLimits::default();
let read_operations = bboxes.iter().map(TileBBox::count_tiles).sum::<u64>();
stream::iter(bboxes)
.map(|bbox| {
let tracker = tracker.clone();
let c = cache.clone();
async move {
let vec = self
.get_tile_stream(bbox)
.await?
.inspect(move |_, _| tracker.inc(1))
.to_vec()
.await;
c.append(index, vec)?;
Ok::<_, anyhow::Error>(())
}
})
.buffer_unordered(limits.io_bound) .collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
tracker.inc_read(read_operations);
}
Pop(index, bbox) => {
log::trace!("Uncache {bbox:?} at index {index}");
let vec = cache.take(index)?.unwrap();
let tracker2 = tracker.clone();
let stream = TileStream::from_vec(vec).inspect(move |_, _| tracker2.inc(1));
callback(bbox, stream).await?;
tracker.inc_write(bbox.count_tiles());
}
Stream(bboxes, bbox) => {
log::trace!("Stream {bbox:?}");
let tracker2 = tracker.clone();
let read_operations = bboxes.iter().map(TileBBox::count_tiles).sum::<u64>();
let streams = stream::iter(bboxes).map(move |bbox| {
let tracker = tracker2.clone();
async move {
self.get_tile_stream(bbox).await.unwrap().inspect(move |_, _| {
tracker.inc(2);
})
}
});
callback(bbox, TileStream::from_streams(streams)).await?;
tracker.inc_read_write(read_operations, bbox.count_tiles());
}
}
}
tracker.finish();
Ok(())
}
}
}
impl<T: TileSource + ?Sized> TileSourceTraverseExt for T {}
#[cfg(test)]
#[allow(clippy::too_many_arguments)]
mod tests {
use super::*;
use crate::{SourceType, TileSourceMetadata, TilesRuntime, TraversalOrder};
use anyhow::Result;
use async_trait::async_trait;
use rstest::rstest;
use std::sync::atomic::{AtomicU64, Ordering};
use versatiles_core::{Blob, TileBBoxPyramid, TileCompression, TileFormat, TileJSON};
#[derive(Debug)]
struct TestReader {
metadata: TileSourceMetadata,
tilejson: TileJSON,
tile_delay_micros: u64,
}
impl TestReader {
fn new(traversal: Traversal, max_level: u8) -> Self {
TestReader {
metadata: TileSourceMetadata {
bbox_pyramid: TileBBoxPyramid::new_full_up_to(max_level),
tile_compression: TileCompression::Uncompressed,
tile_format: TileFormat::PNG,
traversal,
},
tilejson: TileJSON::default(),
tile_delay_micros: 0,
}
}
fn with_level_range(traversal: Traversal, min_level: u8, max_level: u8) -> Self {
use versatiles_core::GeoBBox;
let bbox = GeoBBox::new(-180.0, -85.05, 180.0, 85.05).unwrap();
TestReader {
metadata: TileSourceMetadata {
bbox_pyramid: TileBBoxPyramid::from_geo_bbox(min_level, max_level, &bbox),
tile_compression: TileCompression::Uncompressed,
tile_format: TileFormat::PNG,
traversal,
},
tilejson: TileJSON::default(),
tile_delay_micros: 0,
}
}
}
async fn sleep_micros(micros: u64) {
tokio::time::sleep(std::time::Duration::from_micros(micros)).await;
}
#[async_trait]
impl TileSource for TestReader {
fn source_type(&self) -> Arc<SourceType> {
SourceType::new_container("test", "test://")
}
fn metadata(&self) -> &TileSourceMetadata {
&self.metadata
}
fn tilejson(&self) -> &TileJSON {
&self.tilejson
}
async fn get_tile_stream(&self, bbox: TileBBox) -> Result<TileStream<'static, Tile>> {
let compression = self.metadata.tile_compression;
let format = self.metadata.tile_format;
let delay_micros = self.tile_delay_micros;
if delay_micros > 0 {
Ok(TileStream::from_bbox_async_parallel(bbox, move |coord| async move {
sleep_micros(delay_micros).await;
let data = format!("tile:{},{},{}", coord.level, coord.x, coord.y);
Some((coord, Tile::from_blob(Blob::from(data), compression, format)))
}))
} else {
Ok(TileStream::from_iter_coord(bbox.into_iter_coords(), move |coord| {
let data = format!("tile:{},{},{}", coord.level, coord.x, coord.y);
Some(Tile::from_blob(Blob::from(data), compression, format))
}))
}
}
}
#[tokio::test]
async fn test_traverse_streaming_mode() -> Result<()> {
let reader = TestReader::new(Traversal::ANY, 2);
let runtime = TilesRuntime::builder().with_memory_cache().build();
let tile_count = Arc::new(AtomicU64::new(0));
let count_clone = tile_count.clone();
reader
.traverse_all_tiles(
&Traversal::ANY,
|_bbox, stream| {
let count = count_clone.clone();
Box::pin(async move {
let tiles = stream.to_vec().await;
count.fetch_add(tiles.len() as u64, Ordering::SeqCst);
Ok(())
})
},
runtime,
Some("test streaming"),
)
.await?;
assert_eq!(tile_count.load(Ordering::SeqCst), 21);
Ok(())
}
#[tokio::test]
async fn test_traverse_with_reordering() -> Result<()> {
let source_traversal = Traversal::new(super::super::TraversalOrder::DepthFirst, 1, 256)?;
let write_traversal = Traversal::new(super::super::TraversalOrder::AnyOrder, 1, 256)?;
let reader = TestReader::new(source_traversal, 2);
let runtime = TilesRuntime::builder().with_memory_cache().build();
let tile_count = Arc::new(AtomicU64::new(0));
let count_clone = tile_count.clone();
reader
.traverse_all_tiles(
&write_traversal,
|_bbox, stream| {
let count = count_clone.clone();
Box::pin(async move {
let tiles = stream.to_vec().await;
count.fetch_add(tiles.len() as u64, Ordering::SeqCst);
Ok(())
})
},
runtime,
None, )
.await?;
assert_eq!(tile_count.load(Ordering::SeqCst), 21);
Ok(())
}
#[tokio::test]
async fn test_traverse_with_disk_cache() -> Result<()> {
let source_traversal = Traversal::new(super::super::TraversalOrder::DepthFirst, 1, 256)?;
let write_traversal = Traversal::new(super::super::TraversalOrder::AnyOrder, 1, 256)?;
let reader = TestReader::new(source_traversal, 2);
let temp_dir = tempfile::TempDir::new()?;
let runtime = TilesRuntime::builder().with_disk_cache(temp_dir.path()).build();
let tile_count = Arc::new(AtomicU64::new(0));
let count_clone = tile_count.clone();
reader
.traverse_all_tiles(
&write_traversal,
|_bbox, stream| {
let count = count_clone.clone();
Box::pin(async move {
let tiles = stream.to_vec().await;
count.fetch_add(tiles.len() as u64, Ordering::SeqCst);
Ok(())
})
},
runtime,
Some("disk cache test"),
)
.await?;
assert_eq!(tile_count.load(Ordering::SeqCst), 21);
Ok(())
}
#[tokio::test]
async fn test_traverse_verifies_tile_content() -> Result<()> {
let reader = TestReader::new(Traversal::ANY, 1);
let runtime = TilesRuntime::builder().with_memory_cache().build();
let received_coords = Arc::new(std::sync::Mutex::new(Vec::new()));
let coords_clone = received_coords.clone();
reader
.traverse_all_tiles(
&Traversal::ANY,
|_bbox, stream| {
let coords = coords_clone.clone();
Box::pin(async move {
let tiles = stream.to_vec().await;
for (coord, tile) in tiles {
let blob = tile.into_blob(TileCompression::Uncompressed)?;
let data = String::from_utf8_lossy(blob.as_slice());
let expected = format!("tile:{},{},{}", coord.level, coord.x, coord.y);
assert_eq!(data, expected);
coords.lock().unwrap().push(coord);
}
Ok(())
})
},
runtime,
None,
)
.await?;
let coords = received_coords.lock().unwrap();
assert_eq!(coords.len(), 5); Ok(())
}
#[tokio::test]
async fn test_traverse_empty_pyramid() -> Result<()> {
let reader = TestReader::new(Traversal::ANY, 0);
let runtime = TilesRuntime::builder().with_memory_cache().build();
let tile_count = Arc::new(AtomicU64::new(0));
let count_clone = tile_count.clone();
reader
.traverse_all_tiles(
&Traversal::ANY,
|_bbox, stream| {
let count = count_clone.clone();
Box::pin(async move {
let tiles = stream.to_vec().await;
count.fetch_add(tiles.len() as u64, Ordering::SeqCst);
Ok(())
})
},
runtime,
None,
)
.await?;
assert_eq!(tile_count.load(Ordering::SeqCst), 1);
Ok(())
}
#[tokio::test]
async fn test_traverse_callback_receives_correct_bbox() -> Result<()> {
let reader = TestReader::new(Traversal::ANY, 1);
let runtime = TilesRuntime::builder().with_memory_cache().build();
let bboxes_received = Arc::new(std::sync::Mutex::new(Vec::new()));
let bboxes_clone = bboxes_received.clone();
reader
.traverse_all_tiles(
&Traversal::ANY,
|bbox, stream| {
let bboxes = bboxes_clone.clone();
Box::pin(async move {
bboxes.lock().unwrap().push(bbox);
stream.drain_and_count().await;
Ok(())
})
},
runtime,
None,
)
.await?;
let bboxes = bboxes_received.lock().unwrap();
assert!(!bboxes.is_empty());
Ok(())
}
#[tokio::test]
async fn test_traverse_push_pop_caching_path() -> Result<()> {
let source_traversal = Traversal::new(TraversalOrder::DepthFirst, 1, 8)?;
let write_traversal = Traversal::new(TraversalOrder::AnyOrder, 16, 16)?;
let reader = TestReader::with_level_range(source_traversal, 4, 6);
let runtime = TilesRuntime::builder().with_memory_cache().build();
let tile_count = Arc::new(AtomicU64::new(0));
let count_clone = tile_count.clone();
reader
.traverse_all_tiles(
&write_traversal,
|_bbox, stream| {
let count = count_clone.clone();
Box::pin(async move {
let tiles = stream.to_vec().await;
count.fetch_add(tiles.len() as u64, Ordering::SeqCst);
Ok(())
})
},
runtime,
Some("push/pop cache test"),
)
.await?;
assert_eq!(tile_count.load(Ordering::SeqCst), 5376);
Ok(())
}
#[tokio::test]
async fn test_traverse_push_pop_with_disk_cache() -> Result<()> {
let source_traversal = Traversal::new(TraversalOrder::DepthFirst, 1, 8)?;
let write_traversal = Traversal::new(TraversalOrder::AnyOrder, 16, 16)?;
let reader = TestReader::with_level_range(source_traversal, 4, 5);
let temp_dir = tempfile::TempDir::new()?;
let runtime = TilesRuntime::builder().with_disk_cache(temp_dir.path()).build();
let tile_count = Arc::new(AtomicU64::new(0));
let count_clone = tile_count.clone();
reader
.traverse_all_tiles(
&write_traversal,
|_bbox, stream| {
let count = count_clone.clone();
Box::pin(async move {
let tiles = stream.to_vec().await;
count.fetch_add(tiles.len() as u64, Ordering::SeqCst);
Ok(())
})
},
runtime,
Some("push/pop disk cache test"),
)
.await?;
assert_eq!(tile_count.load(Ordering::SeqCst), 1280);
Ok(())
}
#[tokio::test]
async fn test_traverse_push_pop_verifies_tile_content() -> Result<()> {
let source_traversal = Traversal::new(TraversalOrder::DepthFirst, 1, 8)?;
let write_traversal = Traversal::new(TraversalOrder::AnyOrder, 16, 16)?;
let reader = TestReader::with_level_range(source_traversal, 4, 5);
let runtime = TilesRuntime::builder().with_memory_cache().build();
let received_coords = Arc::new(std::sync::Mutex::new(Vec::new()));
let coords_clone = received_coords.clone();
reader
.traverse_all_tiles(
&write_traversal,
|_bbox, stream| {
let coords = coords_clone.clone();
Box::pin(async move {
let tiles = stream.to_vec().await;
for (coord, tile) in tiles {
let blob = tile.into_blob(TileCompression::Uncompressed)?;
let data = String::from_utf8_lossy(blob.as_slice());
let expected = format!("tile:{},{},{}", coord.level, coord.x, coord.y);
assert_eq!(data, expected, "Tile content mismatch after cache");
coords.lock().unwrap().push(coord);
}
Ok(())
})
},
runtime,
None,
)
.await?;
let coords = received_coords.lock().unwrap();
assert_eq!(coords.len(), 1280);
Ok(())
}
#[tokio::test]
async fn test_traverse_pmtiles_order_to_any() -> Result<()> {
let source_traversal = Traversal::new(TraversalOrder::PMTiles, 1, 8)?;
let write_traversal = Traversal::new(TraversalOrder::AnyOrder, 16, 16)?;
let reader = TestReader::with_level_range(source_traversal, 4, 5);
let runtime = TilesRuntime::builder().with_memory_cache().build();
let tile_count = Arc::new(AtomicU64::new(0));
let count_clone = tile_count.clone();
reader
.traverse_all_tiles(
&write_traversal,
|_bbox, stream| {
let count = count_clone.clone();
Box::pin(async move {
let tiles = stream.to_vec().await;
count.fetch_add(tiles.len() as u64, Ordering::SeqCst);
Ok(())
})
},
runtime,
Some("pmtiles to any"),
)
.await?;
assert_eq!(tile_count.load(Ordering::SeqCst), 1280);
Ok(())
}
#[rstest]
#[case::any_to_depthfirst(TraversalOrder::AnyOrder, 1, 256, TraversalOrder::DepthFirst, 1, 256, 2)]
#[case::any_to_pmtiles(TraversalOrder::AnyOrder, 1, 256, TraversalOrder::PMTiles, 1, 256, 2)]
#[case::depthfirst_to_depthfirst(TraversalOrder::DepthFirst, 1, 256, TraversalOrder::DepthFirst, 1, 256, 3)]
#[case::pmtiles_to_pmtiles(TraversalOrder::PMTiles, 1, 256, TraversalOrder::PMTiles, 1, 256, 3)]
#[case::depthfirst_to_any(TraversalOrder::DepthFirst, 1, 64, TraversalOrder::AnyOrder, 1, 64, 3)]
#[case::pmtiles_to_any(TraversalOrder::PMTiles, 1, 64, TraversalOrder::AnyOrder, 1, 64, 3)]
#[tokio::test]
async fn test_traverse_order_verification(
#[case] source_order: TraversalOrder,
#[case] source_min_size: u32,
#[case] source_max_size: u32,
#[case] write_order: TraversalOrder,
#[case] write_min_size: u32,
#[case] write_max_size: u32,
#[case] max_level: u8,
) -> Result<()> {
let source_traversal = Traversal::new(source_order, source_min_size, source_max_size)?;
let write_traversal = Traversal::new(write_order, write_min_size, write_max_size)?;
let reader = TestReader::new(source_traversal, max_level);
let runtime = TilesRuntime::builder().with_memory_cache().build();
let bboxes_received = Arc::new(std::sync::Mutex::new(Vec::new()));
let bboxes_clone = bboxes_received.clone();
reader
.traverse_all_tiles(
&write_traversal,
|bbox, stream| {
let bboxes = bboxes_clone.clone();
Box::pin(async move {
bboxes.lock().unwrap().push(bbox);
stream.drain_and_count().await;
Ok(())
})
},
runtime,
None,
)
.await?;
let bboxes = bboxes_received.lock().unwrap();
assert!(!bboxes.is_empty(), "Should receive at least one bbox");
let write_size = write_traversal.max_size()?;
assert!(
write_order.verify_order(&bboxes, write_size),
"Bboxes should be in {write_order:?} order, but received: {:?}",
bboxes.iter().take(10).collect::<Vec<_>>()
);
Ok(())
}
#[rstest]
#[case::depthfirst_cached_to_any(TraversalOrder::DepthFirst, 1, 8, TraversalOrder::AnyOrder, 16, 16, 4, 5)]
#[case::pmtiles_cached_to_any(TraversalOrder::PMTiles, 1, 8, TraversalOrder::AnyOrder, 16, 16, 4, 5)]
#[tokio::test]
async fn test_traverse_order_with_push_pop_caching(
#[case] source_order: TraversalOrder,
#[case] source_min_size: u32,
#[case] source_max_size: u32,
#[case] write_order: TraversalOrder,
#[case] write_min_size: u32,
#[case] write_max_size: u32,
#[case] min_level: u8,
#[case] max_level: u8,
) -> Result<()> {
let source_traversal = Traversal::new(source_order, source_min_size, source_max_size)?;
let write_traversal = Traversal::new(write_order, write_min_size, write_max_size)?;
let reader = TestReader::with_level_range(source_traversal, min_level, max_level);
let runtime = TilesRuntime::builder().with_memory_cache().build();
let bboxes_received = Arc::new(std::sync::Mutex::new(Vec::new()));
let bboxes_clone = bboxes_received.clone();
reader
.traverse_all_tiles(
&write_traversal,
|bbox, stream| {
let bboxes = bboxes_clone.clone();
Box::pin(async move {
bboxes.lock().unwrap().push(bbox);
stream.drain_and_count().await;
Ok(())
})
},
runtime,
None,
)
.await?;
let bboxes = bboxes_received.lock().unwrap();
assert!(!bboxes.is_empty(), "Should receive at least one bbox");
if write_order != TraversalOrder::AnyOrder {
let write_size = write_traversal.max_size()?;
assert!(
write_order.verify_order(&bboxes, write_size),
"Bboxes should be in {write_order:?} order with Push/Pop caching"
);
}
Ok(())
}
}