use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use thiserror::Error;
#[cfg(test)]
use crate::observe::NoopObserver;
use crate::observe::{EngineEvent, EngineObserver, MemoryTracker};
use crate::planner::{PyramidPlan, TileCoord};
use crate::raster::{Raster, RasterError};
use crate::resize;
use crate::resume::{JobCheckpoint, JobMetadata, ResumeError, SCHEMA_VERSION, compute_plan_hash};
use crate::retry::FailurePolicy;
use crate::sink::{SinkError, Tile, TileSink};
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum EngineError {
#[error("raster error: {0}")]
Raster(#[from] RasterError),
#[error("sink error: {0}")]
Sink(#[from] SinkError),
#[error("engine cancelled")]
Cancelled,
#[error("worker panicked")]
WorkerPanic,
#[error("checksum mismatch for tile {tile:?} (expected {expected}, got {got})")]
ChecksumMismatch {
tile: TileCoord,
expected: String,
got: String,
},
#[error("plan hash mismatch (expected {expected}, got {got})")]
PlanHashMismatch { expected: String, got: String },
#[error("resume failed: {0}")]
ResumeFailed(#[from] ResumeError),
#[error("Verify mode requires an on-disk sink or EngineConfig::checkpoint_root")]
VerifyRequiresOnDiskSink,
#[error("budget exceeded: worst-case strip {strip_bytes} bytes > budget {budget_bytes} bytes")]
BudgetExceeded { strip_bytes: u64, budget_bytes: u64 },
#[error("engine kind {kind:?} incompatible with supplied source: {reason}")]
IncompatibleSource {
kind: crate::EngineKind,
reason: &'static str,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum BlankTileStrategy {
Emit,
Placeholder,
PlaceholderWithTolerance { max_channel_delta: u8 },
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct EngineConfig {
pub concurrency: usize,
pub buffer_size: usize,
pub background_rgb: [u8; 3],
pub blank_tile_strategy: BlankTileStrategy,
pub failure_policy: FailurePolicy,
pub checkpoint_every: u64,
pub dedupe_strategy: Option<crate::dedupe::DedupeStrategy>,
pub checkpoint_root: Option<PathBuf>,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
concurrency: 0,
buffer_size: 64,
background_rgb: [255, 255, 255],
blank_tile_strategy: BlankTileStrategy::Emit,
failure_policy: FailurePolicy::default(),
checkpoint_every: 0,
dedupe_strategy: None,
checkpoint_root: None,
}
}
}
impl EngineConfig {
pub fn with_concurrency(mut self, n: usize) -> Self {
self.concurrency = n;
self
}
pub fn with_buffer_size(mut self, n: usize) -> Self {
self.buffer_size = n;
self
}
pub fn with_blank_tile_strategy(mut self, strategy: BlankTileStrategy) -> Self {
self.blank_tile_strategy = strategy;
self
}
pub fn with_failure_policy(mut self, policy: FailurePolicy) -> Self {
self.failure_policy = policy;
self
}
pub fn with_checkpoint_every(mut self, n: u64) -> Self {
self.checkpoint_every = n;
self
}
pub fn with_dedupe_strategy(mut self, strategy: crate::dedupe::DedupeStrategy) -> Self {
self.dedupe_strategy = Some(strategy);
self
}
pub fn with_checkpoint_root(mut self, root: PathBuf) -> Self {
self.checkpoint_root = Some(root);
self
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[non_exhaustive]
pub struct StageDurations {
pub planning: Duration,
pub decode: Duration,
pub resize: Duration,
pub encode: Duration,
pub sink: Duration,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct EngineResult {
pub tiles_produced: u64,
pub tiles_skipped: u64,
pub levels_processed: u32,
pub peak_memory_bytes: u64,
pub bytes_read: u64,
pub bytes_written: u64,
pub retry_count: u64,
pub queue_pressure_peak: u32,
pub duration: Duration,
pub stage_durations: StageDurations,
pub skipped_due_to_failure: u64,
}
pub(crate) fn generate_pyramid_observed(
source: &Raster,
plan: &PyramidPlan,
sink: &dyn TileSink,
config: &EngineConfig,
observer: &dyn EngineObserver,
) -> Result<EngineResult, EngineError> {
run_pyramid(source, plan, sink, config, observer, None, None)
}
fn run_pyramid(
source: &Raster,
plan: &PyramidPlan,
sink: &dyn TileSink,
config: &EngineConfig,
observer: &dyn EngineObserver,
skip_coords: Option<&HashSet<TileCoord>>,
checkpoint_state: Option<&CheckpointState>,
) -> Result<EngineResult, EngineError> {
let started = Instant::now();
#[cfg(feature = "tracing")]
let _pipeline_span = tracing::info_span!(target: "libviprs", "pipeline").entered();
sink.record_engine_config(config);
sink.init_level_count(plan.levels.len());
let top_level = plan.levels.len() - 1;
let mut tiles_produced: u64 = 0;
let mut tiles_skipped: u64 = 0;
let bytes_read = source.data().len() as u64;
let tracker = MemoryTracker::new();
let bytes_written = AtomicU64::new(0);
let queue_pressure_peak = AtomicU32::new(0);
let stage_planning = Duration::ZERO;
let stage_decode_start = Instant::now();
let stage_resize = AtomicU64::new(0); let stage_encode = AtomicU64::new(0);
let stage_sink = AtomicU64::new(0);
let mut current = if plan.centre && (plan.centre_offset_x > 0 || plan.centre_offset_y > 0) {
let canvas = embed_in_canvas(source, plan, config.background_rgb)?;
let canvas_bytes = canvas.data().len() as u64;
tracker.alloc(canvas_bytes);
canvas
} else {
let source_bytes = source.data().len() as u64;
tracker.alloc(source_bytes);
source.clone()
};
let stage_decode_done: Instant = Instant::now();
let ctx = EmitContext {
bytes_written: &bytes_written,
queue_pressure_peak: &queue_pressure_peak,
stage_encode: &stage_encode,
stage_sink: &stage_sink,
skip_coords,
checkpoint_state,
};
for level_idx in (0..plan.levels.len()).rev() {
let level = &plan.levels[level_idx];
#[cfg(feature = "tracing")]
let _level_span = tracing::info_span!(
target: "libviprs",
"level",
level_index = level.level
)
.entered();
observer.on_event(EngineEvent::LevelStarted {
level: level.level,
width: level.width,
height: level.height,
tile_count: level.tile_count(),
});
if level_idx < top_level {
let old_bytes = current.data().len() as u64;
let resize_start = Instant::now();
current = resize::downscale_half(¤t)?;
stage_resize.fetch_add(resize_start.elapsed().as_nanos() as u64, Ordering::Relaxed);
let new_bytes = current.data().len() as u64;
tracker.dealloc(old_bytes);
tracker.alloc(new_bytes);
}
let (level_tiles, level_skipped) = extract_and_emit_level(
¤t,
plan,
level_idx as u32,
sink,
config,
observer,
&ctx,
)?;
tiles_produced += level_tiles;
tiles_skipped += level_skipped;
if let Some(cp) = checkpoint_state {
cp.mark_level_completed(level.level);
}
observer.on_event(EngineEvent::LevelCompleted {
level: level.level,
tiles_produced: level_tiles,
});
}
tracker.dealloc(current.data().len() as u64);
let sink_finish_start = Instant::now();
match sink.finish() {
Ok(()) => {}
Err(e) => return Err(promote_sink_error(e)),
}
stage_sink.fetch_add(
sink_finish_start.elapsed().as_nanos() as u64,
Ordering::Relaxed,
);
if let Some(cp) = checkpoint_state {
cp.flush().map_err(EngineError::from)?;
}
observer.on_event(EngineEvent::Finished {
total_tiles: tiles_produced,
levels: plan.levels.len() as u32,
});
let decode_elapsed = stage_decode_done.saturating_duration_since(stage_decode_start);
let stage_durations = StageDurations {
planning: stage_planning,
decode: decode_elapsed,
resize: Duration::from_nanos(stage_resize.load(Ordering::Relaxed)),
encode: Duration::from_nanos(stage_encode.load(Ordering::Relaxed)),
sink: Duration::from_nanos(stage_sink.load(Ordering::Relaxed)),
};
let retry_count = sink.sink_retry_count();
let skipped_due_to_failure = sink.sink_skipped_due_to_failure();
Ok(EngineResult {
tiles_produced,
tiles_skipped,
levels_processed: plan.levels.len() as u32,
peak_memory_bytes: tracker.peak_bytes(),
bytes_read,
bytes_written: bytes_written.load(Ordering::Relaxed),
retry_count,
queue_pressure_peak: queue_pressure_peak.load(Ordering::Relaxed),
duration: started.elapsed(),
stage_durations,
skipped_due_to_failure,
})
}
struct EmitContext<'a> {
bytes_written: &'a AtomicU64,
queue_pressure_peak: &'a AtomicU32,
stage_encode: &'a AtomicU64,
stage_sink: &'a AtomicU64,
skip_coords: Option<&'a HashSet<TileCoord>>,
checkpoint_state: Option<&'a CheckpointState>,
}
pub(crate) struct CheckpointState {
root: std::path::PathBuf,
meta: std::sync::Mutex<JobMetadata>,
pending_since_flush: std::sync::atomic::AtomicU64,
checkpoint_every: u64,
}
impl CheckpointState {
fn new(
root: std::path::PathBuf,
meta: JobMetadata,
_plan: &PyramidPlan,
checkpoint_every: u64,
) -> Self {
Self {
root,
meta: std::sync::Mutex::new(meta),
pending_since_flush: AtomicU64::new(0),
checkpoint_every,
}
}
pub(crate) fn mark_tile_completed(&self, coord: TileCoord) -> Result<(), ResumeError> {
{
let mut meta = self.meta.lock().unwrap();
meta.completed_tiles.push(coord);
}
if self.checkpoint_every > 0 {
let n = self.pending_since_flush.fetch_add(1, Ordering::Relaxed) + 1;
if n >= self.checkpoint_every {
self.pending_since_flush.store(0, Ordering::Relaxed);
self.flush()?;
}
}
Ok(())
}
fn mark_level_completed(&self, level: u32) {
let mut meta = self.meta.lock().unwrap();
if !meta.levels_completed.contains(&level) {
meta.levels_completed.push(level);
}
}
pub(crate) fn flush(&self) -> Result<(), ResumeError> {
let snapshot = {
let mut meta = self.meta.lock().unwrap();
meta.last_checkpoint_at = now_rfc3339_engine();
meta.clone()
};
JobCheckpoint::save(&self.root, &snapshot).map_err(ResumeError::from)
}
}
fn now_rfc3339_engine() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let (year, month, day, hour, minute, second) = secs_to_ymd_hms_engine(secs as i64);
format!("{year:04}-{month:02}-{day:02}T{hour:02}:{minute:02}:{second:02}Z")
}
fn secs_to_ymd_hms_engine(secs: i64) -> (i32, u32, u32, u32, u32, u32) {
let mut z = secs.div_euclid(86_400);
let time_of_day = secs.rem_euclid(86_400);
let second = (time_of_day % 60) as u32;
let minute = ((time_of_day / 60) % 60) as u32;
let hour = (time_of_day / 3600) as u32;
z += 719_468;
let era = if z >= 0 {
z / 146_097
} else {
(z - 146_096) / 146_097
};
let doe = (z - era * 146_097) as u64;
let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
let y = yoe as i64 + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let day = (doy - (153 * mp + 2) / 5 + 1) as u32;
let month = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
let year = (y + if month <= 2 { 1 } else { 0 }) as i32;
(year, month, day, hour, minute, second)
}
fn promote_sink_error(err: SinkError) -> EngineError {
match err {
SinkError::ChecksumMismatch {
tile_rel_path,
expected,
got,
} => {
let tile =
parse_tile_rel_path(&tile_rel_path).unwrap_or_else(|| TileCoord::new(0, 0, 0));
EngineError::ChecksumMismatch {
tile,
expected,
got,
}
}
other => EngineError::Sink(other),
}
}
fn parse_tile_rel_path(rel: &str) -> Option<TileCoord> {
let normalized = rel.replace('\\', "/");
let no_ext = normalized
.rsplit_once('.')
.map(|(s, _)| s)
.unwrap_or(&normalized);
let parts: Vec<&str> = no_ext.split('/').collect();
match parts.as_slice() {
[level, last] => {
let level: u32 = level.parse().ok()?;
let (col, row) = last.split_once('_')?;
let col: u32 = col.parse().ok()?;
let row: u32 = row.parse().ok()?;
Some(TileCoord::new(level, col, row))
}
[level, col, row] => {
let level: u32 = level.parse().ok()?;
let col: u32 = col.parse().ok()?;
let row: u32 = row.parse().ok()?;
Some(TileCoord::new(level, col, row))
}
_ => None,
}
}
pub(crate) fn resolve_checkpoint_root(cfg: &EngineConfig, sink: &dyn TileSink) -> Option<PathBuf> {
cfg.checkpoint_root
.clone()
.or_else(|| sink.checkpoint_root().map(|p| p.to_path_buf()))
}
pub(crate) fn cp_for_sink(
sink: &dyn TileSink,
plan: &PyramidPlan,
config: &EngineConfig,
completed_tiles: Vec<TileCoord>,
levels_completed: Vec<u32>,
) -> Option<CheckpointState> {
let root = resolve_checkpoint_root(config, sink)?;
let now = now_rfc3339_engine();
let meta = JobMetadata {
schema_version: SCHEMA_VERSION.to_string(),
plan_hash: compute_plan_hash(plan),
completed_tiles,
levels_completed,
started_at: now.clone(),
last_checkpoint_at: now,
};
Some(CheckpointState::new(
root,
meta,
plan,
config.checkpoint_every,
))
}
pub(crate) fn raster_verify(
source: &Raster,
plan: &PyramidPlan,
sink: &dyn TileSink,
config: &EngineConfig,
observer: &dyn EngineObserver,
) -> Result<EngineResult, EngineError> {
let started = Instant::now();
let root_buf =
resolve_checkpoint_root(config, sink).ok_or(EngineError::VerifyRequiresOnDiskSink)?;
let root = root_buf.as_path();
if let Some(meta) = JobCheckpoint::load(root)? {
let expected = compute_plan_hash(plan);
if meta.plan_hash != expected {
return Err(EngineError::PlanHashMismatch {
expected: meta.plan_hash,
got: expected,
});
}
}
let candidate_exts = ["raw", "png", "jpeg", "jpg"];
for coord in plan.tile_coords() {
let mut found: Option<std::path::PathBuf> = None;
for ext in &candidate_exts {
if let Some(rel) = plan.tile_path(coord, ext) {
let abs = root.join(&rel);
if abs.is_file() {
found = Some(abs);
break;
}
}
}
match found {
Some(_abs) => {}
None => {
return Err(EngineError::Sink(SinkError::Other(format!(
"Verify: missing tile for coord {:?}",
coord
))));
}
}
}
if let Some(manifest) = read_manifest(root) {
if let Some(checksums) = manifest.get("checksums") {
if let (Some(algo_str), Some(per_tile)) = (
checksums.get("algo").and_then(|v| v.as_str()),
checksums.get("per_tile").and_then(|v| v.as_object()),
) {
let algo = match algo_str {
"blake3" => Some(crate::manifest::ChecksumAlgo::Blake3),
"sha256" => Some(crate::manifest::ChecksumAlgo::Sha256),
_ => None,
};
if let Some(algo) = algo {
for (rel, expected) in per_tile {
let expected_s = match expected.as_str() {
Some(s) => s,
None => continue,
};
let abs = root.join(rel);
let bytes = match std::fs::read(&abs) {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
Err(e) => return Err(EngineError::Sink(SinkError::Io(e))),
};
let got = crate::checksum::hash_tile(&bytes, algo);
if !got.eq_ignore_ascii_case(expected_s) {
return Err(EngineError::ChecksumMismatch {
tile: parse_tile_rel_path(rel)
.unwrap_or_else(|| TileCoord::new(0, 0, 0)),
expected: expected_s.to_string(),
got,
});
}
}
}
}
}
}
let bg = config.background_rgb;
let mut current = if plan.centre && (plan.centre_offset_x > 0 || plan.centre_offset_y > 0) {
embed_in_canvas(source, plan, bg)?
} else {
source.clone()
};
let top_level = plan.levels.len() - 1;
for level_idx in (0..plan.levels.len()).rev() {
let level = &plan.levels[level_idx];
if level_idx < top_level {
current = resize::downscale_half(¤t)?;
}
observer.on_event(EngineEvent::LevelStarted {
level: level.level,
width: level.width,
height: level.height,
tile_count: level.tile_count(),
});
for row in 0..level.rows {
for col in 0..level.cols {
let coord = TileCoord::new(level_idx as u32, col, row);
observer.on_event(EngineEvent::TileCompleted { coord });
let expected = extract_tile(¤t, plan, coord, bg)?;
let expected_bytes = expected.data();
let mut found: Option<(std::path::PathBuf, String)> = None;
for ext in &candidate_exts {
if let Some(rel) = plan.tile_path(coord, ext) {
let abs = root.join(&rel);
if abs.is_file() {
found = Some((abs, (*ext).to_string()));
break;
}
}
}
let (abs, ext) = match found {
Some(f) => f,
None => {
return Err(EngineError::Sink(SinkError::Other(format!(
"Verify: missing tile for coord {:?}",
coord
))));
}
};
let on_disk =
std::fs::read(&abs).map_err(|e| EngineError::Sink(SinkError::Io(e)))?;
if ext == "raw" {
if on_disk != expected_bytes {
return Err(EngineError::ChecksumMismatch {
tile: coord,
expected: format!("{} bytes (raw)", expected_bytes.len()),
got: format!(
"{} bytes on disk differ from regenerated tile",
on_disk.len()
),
});
}
}
}
}
observer.on_event(EngineEvent::LevelCompleted {
level: level.level,
tiles_produced: level.tile_count(),
});
}
observer.on_event(EngineEvent::Finished {
total_tiles: plan.total_tile_count(),
levels: plan.levels.len() as u32,
});
Ok(EngineResult {
tiles_produced: 0,
tiles_skipped: 0,
levels_processed: plan.levels.len() as u32,
peak_memory_bytes: 0,
bytes_read: 0,
bytes_written: 0,
retry_count: 0,
queue_pressure_peak: 0,
duration: started.elapsed(),
stage_durations: StageDurations::default(),
skipped_due_to_failure: 0,
})
}
fn read_manifest(root: &std::path::Path) -> Option<serde_json::Value> {
if let (Some(parent), Some(stem)) = (root.parent(), root.file_name()) {
let mut name = stem.to_os_string();
name.push(".manifest.json");
let sibling = parent.join(name);
if let Ok(bytes) = std::fs::read(&sibling) {
if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&bytes) {
return Some(v);
}
}
}
let inside = root.join("manifest.json");
if let Ok(bytes) = std::fs::read(&inside) {
return serde_json::from_slice::<serde_json::Value>(&bytes).ok();
}
None
}
pub(crate) fn wipe_directory(dir: &std::path::Path) -> std::io::Result<()> {
if !dir.exists() {
std::fs::create_dir_all(dir)?;
return Ok(());
}
if !dir.is_dir() {
return Ok(());
}
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let p = entry.path();
if p.is_dir() {
let _ = std::fs::remove_dir_all(&p);
} else {
let _ = std::fs::remove_file(&p);
}
}
Ok(())
}
fn embed_in_canvas(
source: &Raster,
plan: &PyramidPlan,
background_rgb: [u8; 3],
) -> Result<Raster, RasterError> {
let cw = plan.canvas_width;
let ch = plan.canvas_height;
let bpp = source.format().bytes_per_pixel();
let mut canvas = make_background_tile(cw, bpp, background_rgb);
let ox = plan.centre_offset_x as usize;
let oy = plan.centre_offset_y as usize;
let iw = source.width() as usize;
let src_stride = iw * bpp;
let dst_stride = cw as usize * bpp;
for row in 0..source.height() as usize {
let src_start = row * src_stride;
let dst_start = (row + oy) * dst_stride + ox * bpp;
canvas[dst_start..dst_start + src_stride]
.copy_from_slice(&source.data()[src_start..src_start + src_stride]);
}
Raster::new(cw, ch, source.format(), canvas)
}
fn extract_and_emit_level(
raster: &Raster,
plan: &PyramidPlan,
level: u32,
sink: &dyn TileSink,
config: &EngineConfig,
observer: &dyn EngineObserver,
ctx: &EmitContext,
) -> Result<(u64, u64), EngineError> {
let level_plan = &plan.levels[level as usize];
let blank_strategy = config.blank_tile_strategy;
if config.concurrency == 0 {
let mut count = 0u64;
let mut skipped = 0u64;
for row in 0..level_plan.rows {
for col in 0..level_plan.cols {
let coord = TileCoord::new(level, col, row);
if let Some(skip) = ctx.skip_coords {
if skip.contains(&coord) {
continue;
}
}
let encode_start = Instant::now();
let tile_raster = extract_tile(raster, plan, coord, config.background_rgb)?;
ctx.stage_encode
.fetch_add(encode_start.elapsed().as_nanos() as u64, Ordering::Relaxed);
let blank = is_blank_for_strategy(&tile_raster, blank_strategy);
if blank {
skipped += 1;
}
let tile_bytes = tile_raster.data().len() as u64;
let tile = Tile {
coord,
raster: tile_raster,
blank,
};
let sink_start = Instant::now();
match sink.write_tile(&tile) {
Ok(()) => {
ctx.stage_sink
.fetch_add(sink_start.elapsed().as_nanos() as u64, Ordering::Relaxed);
ctx.bytes_written.fetch_add(tile_bytes, Ordering::Relaxed);
if let Some(cp) = ctx.checkpoint_state {
cp.mark_tile_completed(coord).map_err(EngineError::from)?;
}
}
Err(e) => {
ctx.stage_sink
.fetch_add(sink_start.elapsed().as_nanos() as u64, Ordering::Relaxed);
match &config.failure_policy {
FailurePolicy::RetryThenSkip(_) => {
sink.note_sink_skipped();
observer.on_event(EngineEvent::TileCompleted { coord });
continue;
}
_ => return Err(promote_sink_error(e)),
}
}
}
observer.on_event(EngineEvent::TileCompleted { coord });
#[cfg(feature = "tracing")]
if tracing::enabled!(target: "libviprs::tile", tracing::Level::TRACE) {
tracing::trace!(
target: "libviprs::tile",
x = coord.col,
y = coord.row,
level = coord.level,
"tile done"
);
}
count += 1;
}
}
let _ = ctx.queue_pressure_peak.fetch_max(1, Ordering::Relaxed);
Ok((count, skipped))
} else {
extract_and_emit_parallel(raster, plan, level, sink, config, observer, ctx)
}
}
fn is_blank_for_strategy(raster: &Raster, strategy: BlankTileStrategy) -> bool {
match strategy {
BlankTileStrategy::Emit => false,
BlankTileStrategy::Placeholder => is_blank_tile(raster),
BlankTileStrategy::PlaceholderWithTolerance { max_channel_delta } => {
is_blank_tile_with_tolerance(raster, max_channel_delta)
}
}
}
fn extract_and_emit_parallel(
raster: &Raster,
plan: &PyramidPlan,
level: u32,
sink: &dyn TileSink,
config: &EngineConfig,
observer: &dyn EngineObserver,
ctx: &EmitContext,
) -> Result<(u64, u64), EngineError> {
let level_plan = &plan.levels[level as usize];
let total_tiles = level_plan.tile_count();
if total_tiles == 0 {
return Ok((0, 0));
}
let blank_strategy = config.blank_tile_strategy;
let (tx, rx) = std::sync::mpsc::sync_channel::<Result<Tile, EngineError>>(config.buffer_size);
let in_flight = Arc::new(AtomicU32::new(0));
let raster = Arc::new(raster.clone());
let plan = Arc::new(plan.clone());
let coords: Vec<TileCoord> = (0..level_plan.rows)
.flat_map(|row| (0..level_plan.cols).map(move |col| TileCoord::new(level, col, row)))
.filter(|coord| match ctx.skip_coords {
Some(skip) => !skip.contains(coord),
None => true,
})
.collect();
if coords.is_empty() {
return Ok((0, 0));
}
let concurrency = config.concurrency.min(coords.len());
let chunk_size = coords.len().div_ceil(concurrency);
let stage_encode: &AtomicU64 = ctx.stage_encode;
let queue_peak: &AtomicU32 = ctx.queue_pressure_peak;
std::thread::scope(|s| {
for chunk in coords.chunks(chunk_size) {
let tx = tx.clone();
let raster = Arc::clone(&raster);
let plan = Arc::clone(&plan);
let in_flight = Arc::clone(&in_flight);
let chunk = chunk.to_vec();
let bg = config.background_rgb;
s.spawn(move || {
for coord in chunk {
let cur = in_flight.fetch_add(1, Ordering::Relaxed) + 1;
let _ = queue_peak.fetch_max(cur, Ordering::Relaxed);
let encode_start = Instant::now();
let result = extract_tile(&raster, &plan, coord, bg)
.map(|tile_raster| {
let blank = is_blank_for_strategy(&tile_raster, blank_strategy);
Tile {
coord,
raster: tile_raster,
blank,
}
})
.map_err(EngineError::from);
stage_encode
.fetch_add(encode_start.elapsed().as_nanos() as u64, Ordering::Relaxed);
let send_failed = tx.send(result).is_err();
in_flight.fetch_sub(1, Ordering::Relaxed);
if send_failed {
break; }
}
});
}
drop(tx);
let mut count = 0u64;
let mut skipped = 0u64;
for result in rx {
let tile = result?;
let coord = tile.coord;
if tile.blank {
skipped += 1;
}
let tile_bytes = tile.raster.data().len() as u64;
let sink_start = Instant::now();
match sink.write_tile(&tile) {
Ok(()) => {
ctx.stage_sink
.fetch_add(sink_start.elapsed().as_nanos() as u64, Ordering::Relaxed);
ctx.bytes_written.fetch_add(tile_bytes, Ordering::Relaxed);
if let Some(cp) = ctx.checkpoint_state {
cp.mark_tile_completed(coord).map_err(EngineError::from)?;
}
}
Err(e) => {
ctx.stage_sink
.fetch_add(sink_start.elapsed().as_nanos() as u64, Ordering::Relaxed);
match &config.failure_policy {
FailurePolicy::RetryThenSkip(_) => {
sink.note_sink_skipped();
observer.on_event(EngineEvent::TileCompleted { coord });
continue;
}
_ => return Err(promote_sink_error(e)),
}
}
}
observer.on_event(EngineEvent::TileCompleted { coord });
#[cfg(feature = "tracing")]
if tracing::enabled!(target: "libviprs::tile", tracing::Level::TRACE) {
tracing::trace!(
target: "libviprs::tile",
x = coord.col,
y = coord.row,
level = coord.level,
"tile done"
);
}
count += 1;
}
Ok((count, skipped))
})
}
fn make_background_tile(ts: u32, bpp: usize, background_rgb: [u8; 3]) -> Vec<u8> {
let mut padded = vec![0u8; ts as usize * ts as usize * bpp];
let bg_pixel: Vec<u8> = match bpp {
1 => vec![background_rgb[0]],
3 => background_rgb.to_vec(),
4 => vec![background_rgb[0], background_rgb[1], background_rgb[2], 255],
_ => vec![background_rgb[0]; bpp],
};
for pixel in padded.chunks_exact_mut(bpp) {
pixel.copy_from_slice(&bg_pixel);
}
padded
}
fn extract_tile(
raster: &Raster,
plan: &PyramidPlan,
coord: TileCoord,
background_rgb: [u8; 3],
) -> Result<Raster, RasterError> {
let rect = plan
.tile_rect(coord)
.expect("tile_rect returned None for valid coord");
let ts = plan.tile_size;
let bpp = raster.format().bytes_per_pixel();
if plan.layout == crate::planner::Layout::Google {
let rw = raster.width();
let rh = raster.height();
let inter_right = (rect.x + rect.width).min(rw);
let inter_bottom = (rect.y + rect.height).min(rh);
if rect.x >= rw || rect.y >= rh {
let padded = make_background_tile(ts, bpp, background_rgb);
return Raster::new(ts, ts, raster.format(), padded);
}
let inter_w = inter_right - rect.x;
let inter_h = inter_bottom - rect.y;
if inter_w == ts && inter_h == ts {
return raster.extract(rect.x, rect.y, ts, ts);
}
let content = raster.extract(rect.x, rect.y, inter_w, inter_h)?;
let mut padded = make_background_tile(ts, bpp, background_rgb);
let src_stride = inter_w as usize * bpp;
let dst_stride = ts as usize * bpp;
for row in 0..inter_h as usize {
let src_start = row * src_stride;
let dst_start = row * dst_stride;
padded[dst_start..dst_start + src_stride]
.copy_from_slice(&content.data()[src_start..src_start + src_stride]);
}
return Raster::new(ts, ts, raster.format(), padded);
}
let content = raster.extract(rect.x, rect.y, rect.width, rect.height)?;
if plan.overlap == 0 && (content.width() < ts || content.height() < ts) {
let mut padded = make_background_tile(ts, bpp, background_rgb);
let src_stride = content.width() as usize * bpp;
let dst_stride = ts as usize * bpp;
for row in 0..content.height() as usize {
let src_start = row * src_stride;
let dst_start = row * dst_stride;
padded[dst_start..dst_start + src_stride]
.copy_from_slice(&content.data()[src_start..src_start + src_stride]);
}
Raster::new(ts, ts, content.format(), padded)
} else {
Ok(content)
}
}
pub fn is_blank_tile(raster: &Raster) -> bool {
let data = raster.data();
let bpp = raster.format().bytes_per_pixel();
if data.len() <= bpp {
return true;
}
let first_pixel = &data[..bpp];
data.chunks(bpp).all(|px| px == first_pixel)
}
pub fn is_blank_tile_with_tolerance(raster: &Raster, max_channel_delta: u8) -> bool {
if max_channel_delta == 0 {
return is_blank_tile(raster);
}
let data = raster.data();
let bpp = raster.format().bytes_per_pixel();
if data.len() <= bpp {
return true;
}
let first_pixel = &data[..bpp];
data.chunks(bpp).all(|px| {
px.iter().zip(first_pixel.iter()).all(|(a, b)| {
let d = a.abs_diff(*b);
d <= max_channel_delta
})
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::observe::CollectingObserver;
use crate::pixel::PixelFormat;
use crate::planner::{Layout, PyramidPlanner};
use crate::sink::MemorySink;
fn gradient_raster(w: u32, h: u32) -> Raster {
let bpp = PixelFormat::Rgb8.bytes_per_pixel();
let mut data = vec![0u8; w as usize * h as usize * bpp];
for y in 0..h {
for x in 0..w {
let off = (y as usize * w as usize + x as usize) * bpp;
data[off] = (x % 256) as u8;
data[off + 1] = (y % 256) as u8;
data[off + 2] = ((x + y) % 256) as u8;
}
}
Raster::new(w, h, PixelFormat::Rgb8, data).unwrap()
}
fn solid_raster(w: u32, h: u32, val: u8) -> Raster {
let data = vec![val; w as usize * h as usize * 3];
Raster::new(w, h, PixelFormat::Rgb8, data).unwrap()
}
#[test]
fn single_threaded_produces_all_tiles() {
let src = gradient_raster(512, 512);
let planner = PyramidPlanner::new(512, 512, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let config = EngineConfig::default();
let result = generate_pyramid_observed(&src, &plan, &sink, &config, &NoopObserver).unwrap();
assert_eq!(result.tiles_produced, plan.total_tile_count());
assert_eq!(sink.tile_count() as u64, plan.total_tile_count());
}
#[test]
fn parallel_produces_all_tiles() {
let src = gradient_raster(512, 512);
let planner = PyramidPlanner::new(512, 512, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let config = EngineConfig::default().with_concurrency(4);
let result = generate_pyramid_observed(&src, &plan, &sink, &config, &NoopObserver).unwrap();
assert_eq!(result.tiles_produced, plan.total_tile_count());
assert_eq!(sink.tile_count() as u64, plan.total_tile_count());
}
#[test]
fn all_tile_coords_present() {
let src = gradient_raster(600, 400);
let planner = PyramidPlanner::new(600, 400, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let config = EngineConfig::default().with_concurrency(2);
generate_pyramid_observed(&src, &plan, &sink, &config, &NoopObserver).unwrap();
let tiles = sink.tiles();
let mut coords: Vec<_> = tiles.iter().map(|t| t.coord).collect();
coords.sort_by_key(|c| (c.level, c.row, c.col));
let mut expected: Vec<_> = plan.tile_coords().collect();
expected.sort_by_key(|c| (c.level, c.row, c.col));
assert_eq!(coords, expected);
}
#[test]
fn tile_dimensions_match_plan() {
let src = gradient_raster(500, 300);
let planner = PyramidPlanner::new(500, 300, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let config = EngineConfig::default();
generate_pyramid_observed(&src, &plan, &sink, &config, &NoopObserver).unwrap();
for tile in sink.tiles() {
let rect = plan.tile_rect(tile.coord).unwrap();
let expected_w = if rect.width < 256 { 256 } else { rect.width };
let expected_h = if rect.height < 256 { 256 } else { rect.height };
assert_eq!(tile.width, expected_w, "Width mismatch at {:?}", tile.coord);
assert_eq!(
tile.height, expected_h,
"Height mismatch at {:?}",
tile.coord
);
}
}
#[test]
fn deterministic_across_concurrency_levels() {
let src = gradient_raster(256, 256);
let planner = PyramidPlanner::new(256, 256, 64, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let ref_sink = MemorySink::new();
generate_pyramid_observed(
&src,
&plan,
&ref_sink,
&EngineConfig::default(),
&NoopObserver,
)
.unwrap();
let mut ref_tiles = ref_sink.tiles();
ref_tiles.sort_by_key(|t| (t.coord.level, t.coord.row, t.coord.col));
for concurrency in [1, 2, 4, 8, 16] {
let sink = MemorySink::new();
let config = EngineConfig::default().with_concurrency(concurrency);
generate_pyramid_observed(&src, &plan, &sink, &config, &NoopObserver).unwrap();
let mut tiles = sink.tiles();
tiles.sort_by_key(|t| (t.coord.level, t.coord.row, t.coord.col));
assert_eq!(
tiles.len(),
ref_tiles.len(),
"Tile count mismatch at concurrency={concurrency}"
);
for (ref_t, t) in ref_tiles.iter().zip(tiles.iter()) {
assert_eq!(ref_t.coord, t.coord);
assert_eq!(
ref_t.data, t.data,
"Tile data diverged at {:?} with concurrency={concurrency}",
t.coord
);
}
}
}
#[test]
fn levels_processed_matches_plan() {
let src = gradient_raster(64, 64);
let planner = PyramidPlanner::new(64, 64, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let result =
generate_pyramid_observed(&src, &plan, &sink, &EngineConfig::default(), &NoopObserver)
.unwrap();
assert_eq!(result.levels_processed, plan.level_count() as u32);
}
#[test]
fn small_image_single_tile() {
let src = gradient_raster(10, 10);
let planner = PyramidPlanner::new(10, 10, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let result =
generate_pyramid_observed(&src, &plan, &sink, &EngineConfig::default(), &NoopObserver)
.unwrap();
assert_eq!(result.tiles_produced, plan.level_count() as u64);
}
#[test]
fn backpressure_small_buffer() {
let src = gradient_raster(512, 512);
let planner = PyramidPlanner::new(512, 512, 128, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let config = EngineConfig::default()
.with_concurrency(4)
.with_buffer_size(1);
let result = generate_pyramid_observed(&src, &plan, &sink, &config, &NoopObserver).unwrap();
assert_eq!(result.tiles_produced, plan.total_tile_count());
}
#[test]
fn is_blank_tile_solid() {
let r = solid_raster(8, 8, 128);
assert!(is_blank_tile(&r));
}
#[test]
fn is_blank_tile_not_blank() {
let mut data = vec![128u8; 8 * 8 * 3];
data[0] = 0;
let r = Raster::new(8, 8, PixelFormat::Rgb8, data).unwrap();
assert!(!is_blank_tile(&r));
}
#[test]
fn is_blank_tile_single_pixel() {
let r = Raster::new(1, 1, PixelFormat::Rgb8, vec![1, 2, 3]).unwrap();
assert!(is_blank_tile(&r));
}
#[test]
fn overlap_tiles_have_correct_size() {
let src = gradient_raster(600, 400);
let planner = PyramidPlanner::new(600, 400, 256, 2, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let config = EngineConfig::default();
generate_pyramid_observed(&src, &plan, &sink, &config, &NoopObserver).unwrap();
for tile in sink.tiles() {
let rect = plan.tile_rect(tile.coord).unwrap();
assert_eq!(tile.width, rect.width);
assert_eq!(tile.height, rect.height);
}
}
#[test]
fn concurrent_with_slow_sink() {
use crate::sink::SlowSink;
use std::time::Duration;
let src = gradient_raster(128, 128);
let planner = PyramidPlanner::new(128, 128, 64, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = SlowSink::new(Duration::from_millis(1));
let config = EngineConfig::default()
.with_concurrency(4)
.with_buffer_size(2);
let result = generate_pyramid_observed(&src, &plan, &sink, &config, &NoopObserver).unwrap();
assert_eq!(result.tiles_produced, plan.total_tile_count());
assert_eq!(sink.tile_count() as u64, plan.total_tile_count());
}
#[test]
fn observer_receives_all_tile_events() {
let src = gradient_raster(128, 128);
let planner = PyramidPlanner::new(128, 128, 64, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let obs = CollectingObserver::new();
generate_pyramid_observed(&src, &plan, &sink, &EngineConfig::default(), &obs).unwrap();
let tile_events = obs
.events()
.iter()
.filter(|e| matches!(e, EngineEvent::TileCompleted { .. }))
.count();
assert_eq!(tile_events as u64, plan.total_tile_count());
}
#[test]
fn observer_receives_level_events_in_order() {
let src = gradient_raster(64, 64);
let planner = PyramidPlanner::new(64, 64, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let obs = CollectingObserver::new();
generate_pyramid_observed(&src, &plan, &sink, &EngineConfig::default(), &obs).unwrap();
let events = obs.events();
let level_starts: Vec<u32> = events
.iter()
.filter_map(|e| match e {
EngineEvent::LevelStarted { level, .. } => Some(*level),
_ => None,
})
.collect();
let expected_levels: Vec<u32> = (0..plan.level_count() as u32).rev().collect();
assert_eq!(level_starts, expected_levels);
assert!(matches!(events.last(), Some(EngineEvent::Finished { .. })));
}
#[test]
fn observer_finished_event_has_correct_totals() {
let src = gradient_raster(256, 256);
let planner = PyramidPlanner::new(256, 256, 128, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let obs = CollectingObserver::new();
generate_pyramid_observed(&src, &plan, &sink, &EngineConfig::default(), &obs).unwrap();
let events = obs.events();
let finished = events.last().unwrap();
match finished {
EngineEvent::Finished {
total_tiles,
levels,
} => {
assert_eq!(*total_tiles, plan.total_tile_count());
assert_eq!(*levels, plan.level_count() as u32);
}
_ => panic!("Last event should be Finished"),
}
}
#[test]
fn observer_works_with_concurrency() {
let src = gradient_raster(256, 256);
let planner = PyramidPlanner::new(256, 256, 64, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let obs = CollectingObserver::new();
generate_pyramid_observed(
&src,
&plan,
&sink,
&EngineConfig::default().with_concurrency(4),
&obs,
)
.unwrap();
let tile_events = obs
.events()
.iter()
.filter(|e| matches!(e, EngineEvent::TileCompleted { .. }))
.count();
assert_eq!(tile_events as u64, plan.total_tile_count());
}
#[test]
fn peak_memory_is_reported() {
let src = gradient_raster(512, 512);
let planner = PyramidPlanner::new(512, 512, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let result =
generate_pyramid_observed(&src, &plan, &sink, &EngineConfig::default(), &NoopObserver)
.unwrap();
let source_bytes = 512 * 512 * 3;
assert!(
result.peak_memory_bytes >= source_bytes,
"Peak {} < source {source_bytes}",
result.peak_memory_bytes
);
}
#[test]
fn peak_memory_is_bounded() {
let src = gradient_raster(1024, 1024);
let planner = PyramidPlanner::new(1024, 1024, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let result =
generate_pyramid_observed(&src, &plan, &sink, &EngineConfig::default(), &NoopObserver)
.unwrap();
let source_bytes = 1024u64 * 1024 * 3;
assert!(
result.peak_memory_bytes < source_bytes * 2,
"Peak {} >= 2x source {source_bytes}",
result.peak_memory_bytes
);
}
#[test]
fn google_centre_produces_all_tiles() {
let src = gradient_raster(500, 300);
let planner = PyramidPlanner::new(500, 300, 256, 0, Layout::Google)
.unwrap()
.with_centre(true);
let plan = planner.plan();
let sink = MemorySink::new();
let config = EngineConfig::default();
let result = generate_pyramid_observed(&src, &plan, &sink, &config, &NoopObserver).unwrap();
assert_eq!(result.tiles_produced, plan.total_tile_count());
assert_eq!(sink.tile_count() as u64, plan.total_tile_count());
}
#[test]
fn google_centre_all_tiles_full_size() {
let src = gradient_raster(500, 300);
let planner = PyramidPlanner::new(500, 300, 256, 0, Layout::Google)
.unwrap()
.with_centre(true);
let plan = planner.plan();
let sink = MemorySink::new();
generate_pyramid_observed(&src, &plan, &sink, &EngineConfig::default(), &NoopObserver)
.unwrap();
for tile in sink.tiles() {
assert_eq!(tile.width, 256, "Width mismatch at {:?}", tile.coord);
assert_eq!(tile.height, 256, "Height mismatch at {:?}", tile.coord);
}
}
#[test]
fn google_centre_edge_tiles_have_background() {
let src = solid_raster(10, 10, 200);
let planner = PyramidPlanner::new(10, 10, 256, 0, Layout::Google)
.unwrap()
.with_centre(true);
let plan = planner.plan();
let sink = MemorySink::new();
generate_pyramid_observed(&src, &plan, &sink, &EngineConfig::default(), &NoopObserver)
.unwrap();
let tiles = sink.tiles();
let level0: Vec<_> = tiles.iter().filter(|t| t.coord.level == 0).collect();
assert_eq!(level0.len(), 1);
let tile = &level0[0];
assert_eq!(tile.width, 256);
assert_eq!(tile.height, 256);
assert!(
!is_blank_tile(
&Raster::new(
tile.width,
tile.height,
PixelFormat::Rgb8,
tile.data.clone()
)
.unwrap()
) || tile.data.chunks(3).all(|px| px == [255, 255, 255])
);
}
#[test]
fn google_centre_deterministic_across_concurrency() {
let src = gradient_raster(400, 300);
let planner = PyramidPlanner::new(400, 300, 128, 0, Layout::Google)
.unwrap()
.with_centre(true);
let plan = planner.plan();
let ref_sink = MemorySink::new();
generate_pyramid_observed(
&src,
&plan,
&ref_sink,
&EngineConfig::default(),
&NoopObserver,
)
.unwrap();
let mut ref_tiles = ref_sink.tiles();
ref_tiles.sort_by_key(|t| (t.coord.level, t.coord.row, t.coord.col));
for concurrency in [1, 2, 4] {
let sink = MemorySink::new();
let config = EngineConfig::default().with_concurrency(concurrency);
generate_pyramid_observed(&src, &plan, &sink, &config, &NoopObserver).unwrap();
let mut tiles = sink.tiles();
tiles.sort_by_key(|t| (t.coord.level, t.coord.row, t.coord.col));
assert_eq!(tiles.len(), ref_tiles.len());
for (ref_t, t) in ref_tiles.iter().zip(tiles.iter()) {
assert_eq!(ref_t.coord, t.coord);
assert_eq!(
ref_t.data, t.data,
"Tile {:?} diverged at concurrency={concurrency}",
t.coord
);
}
}
}
#[test]
fn google_no_centre_produces_all_tiles() {
let src = gradient_raster(500, 300);
let planner = PyramidPlanner::new(500, 300, 256, 0, Layout::Google).unwrap();
let plan = planner.plan();
let sink = MemorySink::new();
let result =
generate_pyramid_observed(&src, &plan, &sink, &EngineConfig::default(), &NoopObserver)
.unwrap();
assert_eq!(result.tiles_produced, plan.total_tile_count());
}
#[test]
#[cfg_attr(miri, ignore)] fn resumable_emits_observer_events() {
use std::sync::Arc;
use crate::observe::CollectingObserver;
use crate::resume::ResumePolicy;
use crate::{EngineBuilder, EngineKind};
use tempfile::tempdir;
let src = gradient_raster(128, 96);
let planner = PyramidPlanner::new(128, 96, 64, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let dir = tempdir().unwrap();
let sink = crate::sink::FsSink::new(dir.path().join("tiles"), plan.clone())
.with_format(crate::sink::TileFormat::Raw);
let obs = Arc::new(CollectingObserver::new());
EngineBuilder::new(&src, plan.clone(), &sink)
.with_engine(EngineKind::Monolithic)
.with_resume(ResumePolicy::overwrite())
.with_observer_arc(obs.clone())
.run()
.unwrap();
let events = obs.events();
let tile_events = events
.iter()
.filter(|e| matches!(e, EngineEvent::TileCompleted { .. }))
.count();
let finished = events
.iter()
.filter(|e| matches!(e, EngineEvent::Finished { .. }))
.count();
assert_eq!(
tile_events as u64,
plan.total_tile_count(),
"Overwrite: tile events"
);
assert_eq!(finished, 1, "Overwrite: finished event");
let obs = Arc::new(CollectingObserver::new());
EngineBuilder::new(&src, plan.clone(), &sink)
.with_engine(EngineKind::Monolithic)
.with_resume(ResumePolicy::resume())
.with_observer_arc(obs.clone())
.run()
.unwrap();
assert!(
!obs.events().is_empty(),
"Resume mode produced no observer events"
);
let obs = Arc::new(CollectingObserver::new());
EngineBuilder::new(&src, plan.clone(), &sink)
.with_engine(EngineKind::Monolithic)
.with_resume(ResumePolicy::verify())
.with_observer_arc(obs.clone())
.run()
.unwrap();
let events = obs.events();
let tile_events = events
.iter()
.filter(|e| matches!(e, EngineEvent::TileCompleted { .. }))
.count();
let finished = events
.iter()
.filter(|e| matches!(e, EngineEvent::Finished { .. }))
.count();
assert_eq!(
tile_events as u64,
plan.total_tile_count(),
"Verify: tile events"
);
assert_eq!(finished, 1, "Verify: finished event");
}
}