use crate::ProgressHandle;
use std::sync::atomic::{AtomicU64, Ordering};
pub(crate) struct ProgressTracker {
progress: ProgressHandle,
ti_read: AtomicU64,
ti_write: AtomicU64,
ti_offset: AtomicU64,
}
impl ProgressTracker {
pub fn new(progress: ProgressHandle) -> Self {
Self {
progress,
ti_read: AtomicU64::new(0),
ti_write: AtomicU64::new(0),
ti_offset: AtomicU64::new(0),
}
}
pub fn inc(&self, count: u64) {
self.ti_offset.fetch_add(count, Ordering::Relaxed);
self.update_position();
}
pub fn inc_read(&self, value: u64) {
self.ti_read.fetch_add(value, Ordering::Relaxed);
self.ti_offset.store(0, Ordering::Relaxed);
self.update_position();
}
pub fn inc_write(&self, value: u64) {
self.ti_write.fetch_add(value, Ordering::Relaxed);
self.ti_offset.store(0, Ordering::Relaxed);
self.update_position();
}
pub fn inc_read_write(&self, read: u64, write: u64) {
self.ti_read.fetch_add(read, Ordering::Relaxed);
self.ti_write.fetch_add(write, Ordering::Relaxed);
self.ti_offset.store(0, Ordering::Relaxed);
self.update_position();
}
fn update_position(&self) {
let read = self.ti_read.load(Ordering::Relaxed);
let write = self.ti_write.load(Ordering::Relaxed);
let offset = self.ti_offset.load(Ordering::Relaxed);
self.progress.set_position((read + write + offset) / 2);
}
pub fn finish(&self) {
self.progress.finish();
}
}
#[cfg(test)]
#[allow(clippy::too_many_arguments)]
mod tests {
use crate::{SourceType, Tile, TileSource, TileSourceMetadata, TilesRuntime};
use anyhow::Result;
use async_trait::async_trait;
use std::sync::Arc;
use versatiles_core::{Blob, TileBBox, TileBBoxPyramid, TileCompression, TileFormat, TileJSON, TileStream};
use super::super::{TileSourceTraverseExt, Traversal};
#[derive(Debug)]
struct TestReader {
metadata: TileSourceMetadata,
tilejson: TileJSON,
tile_delay_micros: u64,
}
impl TestReader {
fn new(traversal: Traversal, max_level: u8) -> Self {
TestReader {
metadata: TileSourceMetadata {
bbox_pyramid: TileBBoxPyramid::new_full_up_to(max_level),
tile_compression: TileCompression::Uncompressed,
tile_format: TileFormat::PNG,
traversal,
},
tilejson: TileJSON::default(),
tile_delay_micros: 0,
}
}
}
async fn sleep_micros(micros: u64) {
tokio::time::sleep(std::time::Duration::from_micros(micros)).await;
}
#[async_trait]
impl TileSource for TestReader {
fn source_type(&self) -> Arc<SourceType> {
SourceType::new_container("test", "test://")
}
fn metadata(&self) -> &TileSourceMetadata {
&self.metadata
}
fn tilejson(&self) -> &TileJSON {
&self.tilejson
}
async fn get_tile_stream(&self, bbox: TileBBox) -> Result<TileStream<'static, Tile>> {
let compression = self.metadata.tile_compression;
let format = self.metadata.tile_format;
let delay_micros = self.tile_delay_micros;
if delay_micros > 0 {
Ok(TileStream::from_bbox_async_parallel(bbox, move |coord| async move {
sleep_micros(delay_micros).await;
let data = format!("tile:{},{},{}", coord.level, coord.x, coord.y);
Some((coord, Tile::from_blob(Blob::from(data), compression, format)))
}))
} else {
Ok(TileStream::from_iter_coord(bbox.into_iter_coords(), move |coord| {
let data = format!("tile:{},{},{}", coord.level, coord.x, coord.y);
Some(Tile::from_blob(Blob::from(data), compression, format))
}))
}
}
}
#[derive(Debug, Clone)]
struct ProgressEvent {
total: u64,
}
fn setup_progress_capture(runtime: &TilesRuntime) -> Arc<std::sync::Mutex<Vec<ProgressEvent>>> {
let events = Arc::new(std::sync::Mutex::new(Vec::new()));
let events_clone = events.clone();
runtime.events().subscribe(move |event| {
if let crate::Event::Progress { data } = event {
events_clone.lock().unwrap().push(ProgressEvent { total: data.total });
}
});
events
}
#[tokio::test]
async fn test_progress_position_matches_expected_formula() -> Result<()> {
let reader = TestReader::new(Traversal::ANY, 2);
let runtime = TilesRuntime::builder()
.with_memory_cache()
.silent_progress(true)
.build();
let positions = setup_progress_capture(&runtime);
reader
.traverse_all_tiles(
&Traversal::ANY,
|_bbox, stream| {
Box::pin(async move {
stream.for_each_async(|_, _| sleep_micros(100000)).await;
Ok(())
})
},
runtime,
None,
)
.await?;
let captured = positions.lock().unwrap();
let last = captured.last().unwrap();
assert_eq!(last.total, 21, "Total should be 21 for levels 0-2 in streaming mode");
Ok(())
}
}