use std::path::PathBuf;
use std::time::Instant;
use crate::engine::{EngineConfig, EngineError, EngineResult, StageDurations};
use crate::observe::{EngineEvent, EngineObserver};
use crate::planner::{PyramidPlan, TileCoord};
use crate::raster::Raster;
use crate::resize;
use crate::sink::{SinkError, TileSink};
use crate::streaming::{StripSource, obtain_canvas_strip};
const CANDIDATE_EXTS: [&str; 4] = ["raw", "png", "jpeg", "jpg"];
pub(crate) fn verify_from_strip_source(
source: &dyn StripSource,
plan: &PyramidPlan,
sink: &dyn TileSink,
config: &EngineConfig,
observer: &dyn EngineObserver,
) -> Result<EngineResult, EngineError> {
let started = Instant::now();
let root_buf = resolve_root(config, sink).ok_or(EngineError::VerifyRequiresOnDiskSink)?;
let root = root_buf.as_path();
let bg = config.background_rgb;
if let Some(meta) = crate::resume::JobCheckpoint::load(root)? {
let expected = crate::resume::compute_plan_hash(plan);
if meta.plan_hash != expected {
return Err(EngineError::PlanHashMismatch {
expected: meta.plan_hash,
got: expected,
});
}
}
for coord in plan.tile_coords() {
if find_tile_on_disk(root, plan, coord).is_none() {
return Err(EngineError::Sink(SinkError::Other(format!(
"Verify: missing tile for coord {coord:?}"
))));
}
}
if let Some(manifest) = read_manifest(root) {
if let Some(checksums) = manifest.get("checksums") {
let algo_str = checksums.get("algo").and_then(|v| v.as_str());
let per_tile = checksums.get("per_tile").and_then(|v| v.as_object());
if let (Some(algo_str), Some(per_tile)) = (algo_str, per_tile) {
let algo = match algo_str {
"blake3" => Some(crate::manifest::ChecksumAlgo::Blake3),
"sha256" => Some(crate::manifest::ChecksumAlgo::Sha256),
_ => None,
};
if let Some(algo) = algo {
for (rel, expected) in per_tile {
let Some(expected_s) = expected.as_str() else {
continue;
};
let abs = root.join(rel);
let bytes = match std::fs::read(&abs) {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
Err(e) => return Err(EngineError::Sink(SinkError::Io(e))),
};
let got = crate::checksum::hash_tile(&bytes, algo);
if !got.eq_ignore_ascii_case(expected_s) {
return Err(EngineError::ChecksumMismatch {
tile: parse_tile_rel_path(rel)
.unwrap_or_else(|| TileCoord::new(0, 0, 0)),
expected: expected_s.to_string(),
got,
});
}
}
}
}
}
}
let top_level_idx = plan.levels.len() - 1;
let top = &plan.levels[top_level_idx];
let format = source.format();
let bpp = format.bytes_per_pixel();
let cw = plan.canvas_width;
let ch = plan.canvas_height;
let dst_stride = cw as usize * bpp;
let mut canvas = vec![0u8; dst_stride * ch as usize];
let strip_h = (2 * plan.tile_size).min(ch).max(1);
let mut y: u32 = 0;
while y < ch {
let sh = strip_h.min(ch - y);
let strip = obtain_canvas_strip(source, plan, y, sh, config)?;
debug_assert_eq!(strip.width(), cw);
debug_assert_eq!(strip.format(), format);
let strip_rows = strip.height() as usize;
let src_row_bytes = strip.width() as usize * bpp;
let src_stride = strip.stride();
let data = strip.data();
for row in 0..strip_rows {
let src_start = row * src_stride;
let dst_start = (y as usize + row) * dst_stride;
canvas[dst_start..dst_start + src_row_bytes]
.copy_from_slice(&data[src_start..src_start + src_row_bytes]);
}
y += sh;
}
let mut current = Raster::new(cw, ch, format, canvas)?;
debug_assert_eq!(current.width(), top.width);
debug_assert_eq!(current.height(), top.height);
for level_idx in (0..plan.levels.len()).rev() {
let level = &plan.levels[level_idx];
if level_idx < top_level_idx {
current = resize::downscale_half(¤t)?;
}
observer.on_event(EngineEvent::LevelStarted {
level: level.level,
width: level.width,
height: level.height,
tile_count: level.tile_count(),
});
for row in 0..level.rows {
for col in 0..level.cols {
let coord = TileCoord::new(level_idx as u32, col, row);
observer.on_event(EngineEvent::TileCompleted { coord });
let expected =
crate::streaming::extract_tile_from_strip(¤t, plan, coord, 0, bg)?;
let expected_bytes = expected.data();
let (abs, ext) = match find_tile_on_disk(root, plan, coord) {
Some(found) => found,
None => {
return Err(EngineError::Sink(SinkError::Other(format!(
"Verify: missing tile for coord {coord:?}"
))));
}
};
let on_disk =
std::fs::read(&abs).map_err(|e| EngineError::Sink(SinkError::Io(e)))?;
if ext == "raw" && on_disk != expected_bytes {
return Err(EngineError::ChecksumMismatch {
tile: coord,
expected: format!("{} bytes (raw)", expected_bytes.len()),
got: format!(
"{} bytes on disk differ from regenerated tile",
on_disk.len()
),
});
}
}
}
observer.on_event(EngineEvent::LevelCompleted {
level: level.level,
tiles_produced: level.tile_count(),
});
}
observer.on_event(EngineEvent::Finished {
total_tiles: plan.total_tile_count(),
levels: plan.levels.len() as u32,
});
Ok(EngineResult {
tiles_produced: 0,
tiles_skipped: 0,
levels_processed: plan.levels.len() as u32,
peak_memory_bytes: 0,
bytes_read: 0,
bytes_written: 0,
retry_count: 0,
queue_pressure_peak: 0,
duration: started.elapsed(),
stage_durations: StageDurations::default(),
skipped_due_to_failure: 0,
})
}
fn find_tile_on_disk(
root: &std::path::Path,
plan: &PyramidPlan,
coord: TileCoord,
) -> Option<(PathBuf, &'static str)> {
for ext in &CANDIDATE_EXTS {
if let Some(rel) = plan.tile_path(coord, ext) {
let abs = root.join(&rel);
if abs.is_file() {
return Some((abs, *ext));
}
}
}
None
}
fn resolve_root(cfg: &EngineConfig, sink: &dyn TileSink) -> Option<PathBuf> {
crate::engine::resolve_checkpoint_root(cfg, sink)
}
fn parse_tile_rel_path(rel: &str) -> Option<TileCoord> {
let normalized = rel.replace('\\', "/");
let no_ext = normalized
.rsplit_once('.')
.map(|(s, _)| s)
.unwrap_or(&normalized);
let parts: Vec<&str> = no_ext.split('/').collect();
match parts.as_slice() {
[level, last] => {
let level: u32 = level.parse().ok()?;
let (col, row) = last.split_once('_')?;
let col: u32 = col.parse().ok()?;
let row: u32 = row.parse().ok()?;
Some(TileCoord::new(level, col, row))
}
[level, col, row] => {
let level: u32 = level.parse().ok()?;
let col: u32 = col.parse().ok()?;
let row: u32 = row.parse().ok()?;
Some(TileCoord::new(level, col, row))
}
_ => None,
}
}
fn read_manifest(root: &std::path::Path) -> Option<serde_json::Value> {
if let (Some(parent), Some(stem)) = (root.parent(), root.file_name()) {
let mut name = stem.to_os_string();
name.push(".manifest.json");
let sibling = parent.join(name);
if let Ok(bytes) = std::fs::read(&sibling) {
if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&bytes) {
return Some(v);
}
}
}
let inside = root.join("manifest.json");
if let Ok(bytes) = std::fs::read(&inside) {
return serde_json::from_slice::<serde_json::Value>(&bytes).ok();
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use crate::observe::NoopObserver;
use crate::pixel::PixelFormat;
use crate::planner::{Layout, PyramidPlanner};
use crate::sink::{FsSink, TileFormat};
use crate::streaming::RasterStripSource;
fn gradient(w: u32, h: u32) -> Raster {
let bpp = PixelFormat::Rgb8.bytes_per_pixel();
let mut data = vec![0u8; w as usize * h as usize * bpp];
for y in 0..h {
for x in 0..w {
let off = (y as usize * w as usize + x as usize) * bpp;
data[off] = (x % 256) as u8;
data[off + 1] = (y % 256) as u8;
data[off + 2] = ((x.wrapping_add(y)) % 256) as u8;
}
}
Raster::new(w, h, PixelFormat::Rgb8, data).unwrap()
}
fn build_raw_pyramid(
dir: &std::path::Path,
w: u32,
h: u32,
tile_size: u32,
) -> (FsSink, PyramidPlan, Raster) {
let src = gradient(w, h);
let plan = PyramidPlanner::new(w, h, tile_size, 0, Layout::DeepZoom)
.unwrap()
.plan();
let sink = FsSink::new(dir, plan.clone()).with_format(TileFormat::Raw);
crate::engine::generate_pyramid_observed(
&src,
&plan,
&sink,
&EngineConfig::default(),
&NoopObserver,
)
.unwrap();
(sink, plan, src)
}
#[test]
#[cfg_attr(miri, ignore)] fn stream_verify_happy_path_raw() {
let tmp = tempfile::tempdir().unwrap();
let out = tmp.path().join("tiles");
let (sink, plan, src) = build_raw_pyramid(&out, 256, 256, 128);
let strip_src = RasterStripSource::new(&src);
let res = verify_from_strip_source(
&strip_src,
&plan,
&sink,
&EngineConfig::default(),
&NoopObserver,
)
.expect("verify should succeed on an untouched pyramid");
assert_eq!(res.tiles_produced, 0, "verify must not write tiles");
assert_eq!(res.levels_processed, plan.levels.len() as u32);
}
#[test]
#[cfg_attr(miri, ignore)] fn stream_verify_reports_missing_tile() {
let tmp = tempfile::tempdir().unwrap();
let out = tmp.path().join("tiles");
let (sink, plan, src) = build_raw_pyramid(&out, 256, 256, 128);
let victim = plan
.tile_coords()
.next()
.expect("plan has at least one tile");
for ext in &CANDIDATE_EXTS {
if let Some(rel) = plan.tile_path(victim, ext) {
let abs = out.join(&rel);
let _ = std::fs::remove_file(abs);
}
}
let strip_src = RasterStripSource::new(&src);
let err = verify_from_strip_source(
&strip_src,
&plan,
&sink,
&EngineConfig::default(),
&NoopObserver,
)
.expect_err("verify should fail when a tile is missing");
match err {
EngineError::Sink(SinkError::Other(msg)) => {
assert!(
msg.starts_with("Verify: missing tile"),
"unexpected missing-tile message: {msg}"
);
}
other => panic!("expected SinkError::Other for missing tile, got {other:?}"),
}
}
#[test]
#[cfg_attr(miri, ignore)] fn stream_verify_detects_raw_corruption() {
let tmp = tempfile::tempdir().unwrap();
let out = tmp.path().join("tiles");
let (sink, plan, src) = build_raw_pyramid(&out, 256, 256, 128);
let mut corrupted: Option<TileCoord> = None;
'outer: for coord in plan.tile_coords() {
if let Some(rel) = plan.tile_path(coord, "raw") {
let abs = out.join(&rel);
if let Ok(mut bytes) = std::fs::read(&abs) {
if !bytes.is_empty() {
bytes[0] ^= 0xFF;
std::fs::write(&abs, &bytes).unwrap();
corrupted = Some(coord);
break 'outer;
}
}
}
}
let corrupted = corrupted.expect("pyramid should contain at least one raw tile");
let strip_src = RasterStripSource::new(&src);
let err = verify_from_strip_source(
&strip_src,
&plan,
&sink,
&EngineConfig::default(),
&NoopObserver,
)
.expect_err("verify should fail on byte-corrupted raw tile");
match err {
EngineError::ChecksumMismatch { tile, .. } => {
assert_eq!(tile, corrupted, "mismatch reported on wrong tile");
}
other => panic!("expected ChecksumMismatch, got {other:?}"),
}
}
}