use std::collections::{BTreeMap, HashMap};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use crate::planner::{PyramidPlan, TileCoord};
use crate::raster::Raster;
use thiserror::Error;
pub use crate::dedupe::DedupeStrategy;
pub use crate::retry::{FailurePolicy, RetryPolicy, RetryingSink};
#[cfg(feature = "s3")]
pub use crate::sink_object_store::{ObjectStore, ObjectStoreConfig, ObjectStoreSink};
#[cfg(feature = "packfile")]
pub use crate::sink_packfile::{PackfileFormat, PackfileSink};
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum SinkError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("image encode error: {0}")]
EncodeMsg(String),
#[error("encoding tile to {format:?} failed: {source}")]
Encode {
format: String,
#[source]
source: image::ImageError,
},
#[error("sink error: {0}")]
Other(String),
#[error("tile coord {coord:?} is outside level bounds")]
InvalidCoord { coord: TileCoord },
#[error("engine config not available on sink (construct via with_engine_config)")]
MissingEngineConfig,
#[error("checksum mismatch for {tile_rel_path}: expected {expected}, got {got}")]
ChecksumMismatch {
tile_rel_path: String,
expected: String,
got: String,
},
#[error("required builder field not set: {0}")]
MissingField(&'static str),
}
pub const BLANK_TILE_MARKER: u8 = 0x00;
#[derive(Debug)]
pub struct Tile {
pub coord: TileCoord,
pub raster: Raster,
pub blank: bool,
}
pub trait TileSink: Send + Sync {
fn write_tile(&self, tile: &Tile) -> Result<(), SinkError>;
fn finish(&self) -> Result<(), SinkError> {
Ok(())
}
fn record_engine_config(&self, _config: &crate::engine::EngineConfig) {}
fn sink_retry_count(&self) -> u64 {
0
}
fn sink_skipped_due_to_failure(&self) -> u64 {
0
}
fn note_sink_skipped(&self) {}
fn checkpoint_root(&self) -> Option<&Path> {
None
}
fn init_level_count(&self, _levels: usize) {}
}
impl<T: TileSink + ?Sized> TileSink for Box<T> {
fn write_tile(&self, tile: &Tile) -> Result<(), SinkError> {
(**self).write_tile(tile)
}
fn finish(&self) -> Result<(), SinkError> {
(**self).finish()
}
fn record_engine_config(&self, config: &crate::engine::EngineConfig) {
(**self).record_engine_config(config)
}
fn sink_retry_count(&self) -> u64 {
(**self).sink_retry_count()
}
fn sink_skipped_due_to_failure(&self) -> u64 {
(**self).sink_skipped_due_to_failure()
}
fn note_sink_skipped(&self) {
(**self).note_sink_skipped()
}
fn checkpoint_root(&self) -> Option<&Path> {
(**self).checkpoint_root()
}
fn init_level_count(&self, levels: usize) {
(**self).init_level_count(levels)
}
}
impl<T: TileSink + ?Sized> TileSink for &T {
fn write_tile(&self, tile: &Tile) -> Result<(), SinkError> {
(*self).write_tile(tile)
}
fn finish(&self) -> Result<(), SinkError> {
(*self).finish()
}
fn record_engine_config(&self, config: &crate::engine::EngineConfig) {
(*self).record_engine_config(config)
}
fn sink_retry_count(&self) -> u64 {
(*self).sink_retry_count()
}
fn sink_skipped_due_to_failure(&self) -> u64 {
(*self).sink_skipped_due_to_failure()
}
fn note_sink_skipped(&self) {
(*self).note_sink_skipped()
}
fn checkpoint_root(&self) -> Option<&Path> {
(*self).checkpoint_root()
}
fn init_level_count(&self, levels: usize) {
(*self).init_level_count(levels)
}
}
#[derive(Debug)]
pub struct MemorySink {
tiles: std::sync::Mutex<Vec<CollectedTile>>,
}
#[derive(Debug, Clone)]
pub struct CollectedTile {
pub coord: TileCoord,
pub width: u32,
pub height: u32,
pub data: Vec<u8>,
pub raster: Raster,
}
impl MemorySink {
pub fn new() -> Self {
Self {
tiles: std::sync::Mutex::new(Vec::new()),
}
}
pub fn tiles(&self) -> Vec<CollectedTile> {
self.tiles.lock().unwrap().clone()
}
pub fn tile_count(&self) -> usize {
self.tiles.lock().unwrap().len()
}
}
impl Default for MemorySink {
fn default() -> Self {
Self::new()
}
}
impl TileSink for MemorySink {
fn write_tile(&self, tile: &Tile) -> Result<(), SinkError> {
self.tiles.lock().unwrap().push(CollectedTile {
coord: tile.coord,
width: tile.raster.width(),
height: tile.raster.height(),
data: tile.raster.data().to_vec(),
raster: tile.raster.clone(),
});
Ok(())
}
}
#[derive(Debug)]
pub struct SlowSink {
inner: MemorySink,
delay: std::time::Duration,
}
impl SlowSink {
pub fn new(delay: std::time::Duration) -> Self {
Self {
inner: MemorySink::new(),
delay,
}
}
pub fn tile_count(&self) -> usize {
self.inner.tile_count()
}
pub fn tiles(&self) -> Vec<CollectedTile> {
self.inner.tiles()
}
}
impl TileSink for SlowSink {
fn write_tile(&self, tile: &Tile) -> Result<(), SinkError> {
std::thread::sleep(self.delay);
self.inner.write_tile(tile)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TileFormat {
Png,
Jpeg {
quality: u8,
},
Raw,
}
impl TileFormat {
pub fn extension(&self) -> &'static str {
match self {
Self::Png => "png",
Self::Jpeg { .. } => "jpeg",
Self::Raw => "raw",
}
}
}
pub struct FsSink {
base_dir: PathBuf,
plan: PyramidPlan,
format: TileFormat,
manifest_builder: Option<crate::manifest::ManifestBuilder>,
checksums: crate::checksum::ChecksumMode,
checksum_algo: Option<crate::manifest::ChecksumAlgo>,
dedupe: Option<crate::dedupe::DedupeStrategy>,
dedupe_index: Option<crate::dedupe::DedupeIndex>,
resume_enabled: bool,
tile_digests: Mutex<BTreeMap<String, [u8; 32]>>,
manifest_refs: Mutex<HashMap<String, String>>,
pending_first: Mutex<HashMap<String, PendingFirst>>,
per_level_counts: Vec<[AtomicU64; 2]>,
pixel_format: OnceLock<crate::pixel::PixelFormat>,
completed_tiles: Mutex<Vec<TileCoord>>,
saw_blank: AtomicBool,
engine_config: Mutex<Option<crate::engine::EngineConfig>>,
}
#[derive(Debug, Clone)]
struct PendingFirst {
tile_abs_path: PathBuf,
tile_rel_path: String,
#[allow(dead_code)]
shared_abs_path: PathBuf,
#[allow(dead_code)]
shared_rel_path: String,
bytes: Vec<u8>,
}
impl std::fmt::Debug for FsSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FsSink")
.field("base_dir", &self.base_dir)
.field("format", &self.format)
.field("checksums", &self.checksums)
.field("checksum_algo", &self.checksum_algo)
.field("dedupe", &self.dedupe)
.field("resume_enabled", &self.resume_enabled)
.finish()
}
}
impl FsSink {
pub fn new(base_dir: impl Into<PathBuf>, plan: PyramidPlan) -> Self {
let format = TileFormat::Png;
let base_dir = base_dir.into();
let level_slots = plan.levels.len().max(1);
let mut per_level_counts: Vec<[AtomicU64; 2]> = Vec::with_capacity(level_slots);
for _ in 0..level_slots {
per_level_counts.push([AtomicU64::new(0), AtomicU64::new(0)]);
}
let total_tiles: u64 = plan
.levels
.iter()
.map(|lp| (lp.cols as u64) * (lp.rows as u64))
.sum();
let completed_cap = usize::try_from(total_tiles).unwrap_or(0);
Self {
base_dir,
plan,
format,
manifest_builder: None,
checksums: crate::checksum::ChecksumMode::None,
checksum_algo: None,
dedupe: None,
dedupe_index: None,
resume_enabled: false,
tile_digests: Mutex::new(BTreeMap::new()),
manifest_refs: Mutex::new(HashMap::new()),
pending_first: Mutex::new(HashMap::new()),
per_level_counts,
pixel_format: OnceLock::new(),
completed_tiles: Mutex::new(Vec::with_capacity(completed_cap)),
saw_blank: AtomicBool::new(false),
engine_config: Mutex::new(None),
}
}
pub fn with_manifest(mut self, builder: crate::manifest::ManifestBuilder) -> Self {
if let Some(algo) = builder.checksum_algo() {
self.checksum_algo = Some(algo);
if self.checksums == crate::checksum::ChecksumMode::None {
self.checksums = crate::checksum::ChecksumMode::EmitOnly;
}
}
self.manifest_builder = Some(builder);
self
}
pub fn with_checksums(
mut self,
mode: crate::checksum::ChecksumMode,
algo: crate::manifest::ChecksumAlgo,
) -> Self {
self.checksum_algo = Some(algo);
self.checksums = mode;
self
}
pub fn with_checksum_mode(mut self, mode: crate::checksum::ChecksumMode) -> Self {
self.checksums = mode;
self
}
pub fn with_dedupe(mut self, strategy: crate::dedupe::DedupeStrategy) -> Self {
if strategy != crate::dedupe::DedupeStrategy::None {
self.dedupe_index = Some(crate::dedupe::DedupeIndex::new(strategy));
} else {
self.dedupe_index = None;
}
self.dedupe = Some(strategy);
self
}
pub fn with_resume(mut self, enabled: bool) -> Self {
self.resume_enabled = enabled;
self
}
pub fn with_format(mut self, format: TileFormat) -> Self {
self.format = format;
self
}
pub fn base_dir(&self) -> &Path {
&self.base_dir
}
#[allow(dead_code)]
fn tile_path(&self, coord: TileCoord) -> Option<PathBuf> {
let rel = self.plan.tile_path(coord, self.format.extension())?;
Some(self.base_dir.join(rel))
}
fn encode_tile(&self, raster: &Raster) -> Result<Vec<u8>, SinkError> {
match self.format {
TileFormat::Raw => Ok(raster.data().to_vec()),
TileFormat::Png => encode_png(raster),
TileFormat::Jpeg { quality } => encode_jpeg(raster, quality),
}
}
fn dedupe_active(&self) -> bool {
matches!(
self.dedupe,
Some(crate::dedupe::DedupeStrategy::Blanks)
| Some(crate::dedupe::DedupeStrategy::All { .. })
)
}
fn should_dedupe_tile(&self, tile: &Tile) -> bool {
match self.dedupe {
None | Some(crate::dedupe::DedupeStrategy::None) => false,
Some(crate::dedupe::DedupeStrategy::Blanks) => {
tile.blank || crate::engine::is_blank_tile(&tile.raster)
}
Some(crate::dedupe::DedupeStrategy::All { .. }) => {
tile.blank || crate::engine::is_blank_tile(&tile.raster)
}
}
}
}
impl TileSink for FsSink {
fn write_tile(&self, tile: &Tile) -> Result<(), SinkError> {
let rel_string = self
.plan
.tile_path(tile.coord, self.format.extension())
.ok_or(SinkError::InvalidCoord { coord: tile.coord })?;
let abs_path = self.base_dir.join(&rel_string);
if let Some(parent) = abs_path.parent() {
std::fs::create_dir_all(parent)?;
}
let bytes: Vec<u8> = if tile.blank {
vec![BLANK_TILE_MARKER]
} else {
self.encode_tile(&tile.raster)?
};
let _ = self.pixel_format.set(tile.raster.format());
if tile.blank {
self.saw_blank.store(true, Ordering::Relaxed);
}
let dedup_used = if self.should_dedupe_tile(tile) {
self.saw_blank.store(true, Ordering::Relaxed);
self.dedupe_write(&rel_string, &abs_path, &bytes)?;
true
} else {
std::fs::write(&abs_path, &bytes)?;
false
};
if let Some(slot) = self.per_level_counts.get(tile.coord.level as usize) {
slot[0].fetch_add(1, Ordering::Relaxed);
if tile.blank || dedup_used {
slot[1].fetch_add(1, Ordering::Relaxed);
}
}
if self.checksums != crate::checksum::ChecksumMode::None {
if let Some(algo) = self.checksum_algo {
let digest = hash_tile_raw(&bytes, algo);
let mut map = self.tile_digests.lock().unwrap();
map.insert(rel_string.clone(), digest);
}
}
if self.resume_enabled {
self.completed_tiles.lock().unwrap().push(tile.coord);
}
Ok(())
}
fn finish(&self) -> Result<(), SinkError> {
if let Some(manifest) = self.plan.dzi_manifest(self.format.extension()) {
let dzi_path = self.base_dir.with_extension("dzi");
std::fs::write(&dzi_path, manifest)?;
}
if self.checksums == crate::checksum::ChecksumMode::Verify {
self.verify_digests_on_disk()?;
}
if self.manifest_builder.is_some() || self.dedupe_active() {
self.write_manifest_json()?;
}
if self.resume_enabled {
self.write_resume_checkpoint()?;
}
Ok(())
}
fn record_engine_config(&self, config: &crate::engine::EngineConfig) {
match config.blank_tile_strategy {
crate::engine::BlankTileStrategy::Placeholder
| crate::engine::BlankTileStrategy::PlaceholderWithTolerance { .. } => {
self.saw_blank.store(true, Ordering::Relaxed);
}
crate::engine::BlankTileStrategy::Emit => {}
}
*self.engine_config.lock().unwrap() = Some(config.clone());
}
fn checkpoint_root(&self) -> Option<&Path> {
Some(&self.base_dir)
}
}
impl FsSink {
fn dedupe_write(
&self,
rel_string: &str,
abs_path: &Path,
bytes: &[u8],
) -> Result<(), SinkError> {
use crate::dedupe::DedupeDecision;
let idx = self
.dedupe_index
.as_ref()
.expect("dedupe_write called without a dedupe index");
let decision = idx.record(rel_string, bytes);
match decision {
DedupeDecision::WriteNew {
shared_key,
shared_path,
} => {
std::fs::write(abs_path, bytes)?;
let shared_rel_string = shared_path.to_string_lossy().replace('\\', "/");
let shared_abs_path = self.base_dir.join(&shared_path);
idx.forget_reference(rel_string);
self.pending_first.lock().unwrap().insert(
shared_key,
PendingFirst {
tile_abs_path: abs_path.to_path_buf(),
tile_rel_path: rel_string.to_string(),
shared_abs_path,
shared_rel_path: shared_rel_string,
bytes: bytes.to_vec(),
},
);
}
DedupeDecision::Reference {
shared_key,
shared_path,
} => {
let shared_abs_path = self.base_dir.join(&shared_path);
let shared_rel_string = shared_path.to_string_lossy().replace('\\', "/");
let pending = self.pending_first.lock().unwrap().remove(&shared_key);
if let Some(p) = pending {
if let Some(parent) = shared_abs_path.parent() {
std::fs::create_dir_all(parent)?;
}
if !shared_abs_path.exists() {
if std::fs::rename(&p.tile_abs_path, &shared_abs_path).is_err() {
std::fs::write(&shared_abs_path, &p.bytes)?;
}
} else if p.tile_abs_path.exists() {
let _ = std::fs::remove_file(&p.tile_abs_path);
}
if p.tile_abs_path.exists() || p.tile_abs_path.is_symlink() {
let _ = std::fs::remove_file(&p.tile_abs_path);
}
match std::fs::hard_link(&shared_abs_path, &p.tile_abs_path) {
Ok(()) => {
}
Err(_) => {
let _ = std::fs::write(&p.tile_abs_path, [0u8]);
self.manifest_refs
.lock()
.unwrap()
.insert(p.tile_rel_path, shared_rel_string.clone());
}
}
}
if !shared_abs_path.exists() {
if let Some(parent) = shared_abs_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&shared_abs_path, bytes)?;
}
if let Some(parent) = abs_path.parent() {
std::fs::create_dir_all(parent)?;
}
if abs_path.exists() || abs_path.is_symlink() {
let _ = std::fs::remove_file(abs_path);
}
std::fs::write(abs_path, [0u8])?;
self.manifest_refs
.lock()
.unwrap()
.insert(rel_string.to_string(), shared_rel_string);
}
}
Ok(())
}
fn verify_digests_on_disk(&self) -> Result<(), SinkError> {
let snapshot = self.tile_digests.lock().unwrap().clone();
let Some(algo) = self.checksum_algo else {
return Ok(());
};
for (rel, expected_bytes) in &snapshot {
let abs = self.base_dir.join(rel);
let bytes = match std::fs::read(&abs) {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
Err(e) => return Err(SinkError::Io(e)),
};
let got_bytes = hash_tile_raw(&bytes, algo);
if got_bytes != *expected_bytes {
return Err(SinkError::ChecksumMismatch {
tile_rel_path: rel.clone(),
expected: hex_encode_32(expected_bytes),
got: hex_encode_32(&got_bytes),
});
}
}
Ok(())
}
fn write_manifest_json(&self) -> Result<(), SinkError> {
use crate::manifest::{
Checksums, GenerationSettings, LevelMetadata, ManifestV1, SourceMetadata, SparsePolicy,
};
let builder = self.manifest_builder.clone();
let eng_cfg = self.engine_config.lock().unwrap().clone();
let generation = GenerationSettings {
tile_size: self.plan.tile_size,
overlap: self.plan.overlap,
layout: self.plan.layout,
format: self.format,
concurrency: eng_cfg.as_ref().map(|c| c.concurrency).unwrap_or(0),
background_rgb: eng_cfg
.as_ref()
.map(|c| c.background_rgb)
.unwrap_or([255, 255, 255]),
blank_strategy: eng_cfg
.as_ref()
.map(|c| c.blank_tile_strategy)
.unwrap_or(crate::engine::BlankTileStrategy::Emit),
};
let pixel_format = self
.pixel_format
.get()
.copied()
.unwrap_or(crate::pixel::PixelFormat::Rgb8);
let source = SourceMetadata {
width: self.plan.image_width,
height: self.plan.image_height,
pixel_format,
bytes_hash: None,
};
let levels: Vec<LevelMetadata> = self
.plan
.levels
.iter()
.map(|lp| {
let (produced_raw, skipped_raw) = self
.per_level_counts
.get(lp.level as usize)
.map(|slot| {
(
slot[0].load(Ordering::Relaxed),
slot[1].load(Ordering::Relaxed),
)
})
.unwrap_or((0, 0));
let level_total = (lp.cols as u64) * (lp.rows as u64);
let skipped = skipped_raw.min(produced_raw);
let produced = produced_raw.saturating_sub(skipped);
let accounted = produced + skipped;
let skipped = if accounted < level_total {
skipped + (level_total - accounted)
} else {
skipped
};
LevelMetadata {
level_index: lp.level,
width: lp.width,
height: lp.height,
tiles_produced: produced,
tiles_skipped: skipped,
}
})
.collect();
let sparse_dedupe = builder
.as_ref()
.and_then(|b| b.dedupe_override())
.unwrap_or_else(|| self.saw_blank.load(Ordering::Relaxed));
let tolerance = builder
.as_ref()
.and_then(|b| b.tolerance_override())
.unwrap_or(0);
let sparse_policy = SparsePolicy {
tolerance,
dedupe: sparse_dedupe,
};
let emit_checksums =
self.checksum_algo.is_some() && self.checksums != crate::checksum::ChecksumMode::None;
let checksums = if emit_checksums {
let raw = self.tile_digests.lock().unwrap();
let per_tile: BTreeMap<String, String> = raw
.iter()
.map(|(k, v)| (k.clone(), hex_encode_32(v)))
.collect();
self.checksum_algo.map(|algo| Checksums { algo, per_tile })
} else {
None
};
let blank_references: std::collections::BTreeMap<String, String> = self
.manifest_refs
.lock()
.unwrap()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let manifest_v1 = ManifestV1 {
generation,
source,
levels,
sparse_policy,
checksums,
created_at: now_rfc3339(),
blank_references,
};
let json = serde_json::to_vec(&manifest_v1.into_manifest())
.expect("Manifest serialization must not fail");
if let (Some(parent), Some(stem)) = (self.base_dir.parent(), self.base_dir.file_name()) {
std::fs::create_dir_all(parent)?;
let mut sibling_name = stem.to_os_string();
sibling_name.push(".manifest.json");
let sibling_path = parent.join(sibling_name);
std::fs::write(&sibling_path, &json)?;
}
let inside_path = self.base_dir.join("manifest.json");
if let Some(parent) = inside_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&inside_path, &json)?;
Ok(())
}
fn write_resume_checkpoint(&self) -> Result<(), SinkError> {
use crate::resume::{JobCheckpoint, JobMetadata, SCHEMA_VERSION, compute_plan_hash};
let completed = self.completed_tiles.lock().unwrap().clone();
let plan_hash = compute_plan_hash(&self.plan);
let timestamp = now_rfc3339();
let meta = JobMetadata {
schema_version: SCHEMA_VERSION.to_string(),
plan_hash,
completed_tiles: completed,
levels_completed: self
.plan
.levels
.iter()
.filter_map(|lp| {
let (produced, skipped) = self
.per_level_counts
.get(lp.level as usize)
.map(|slot| {
(
slot[0].load(Ordering::Relaxed),
slot[1].load(Ordering::Relaxed),
)
})
.unwrap_or((0, 0));
let level_total = (lp.cols as u64) * (lp.rows as u64);
if produced + skipped >= level_total {
Some(lp.level)
} else {
None
}
})
.collect(),
started_at: timestamp.clone(),
last_checkpoint_at: timestamp,
};
JobCheckpoint::save(&self.base_dir, &meta).map_err(SinkError::Io)?;
Ok(())
}
}
fn now_rfc3339() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let (year, month, day, hour, minute, second) = secs_to_ymd_hms(secs as i64);
format!("{year:04}-{month:02}-{day:02}T{hour:02}:{minute:02}:{second:02}Z")
}
fn secs_to_ymd_hms(secs: i64) -> (i32, u32, u32, u32, u32, u32) {
let mut z = secs.div_euclid(86_400);
let time_of_day = secs.rem_euclid(86_400);
let second = (time_of_day % 60) as u32;
let minute = ((time_of_day / 60) % 60) as u32;
let hour = (time_of_day / 3600) as u32;
z += 719_468;
let era = if z >= 0 {
z / 146_097
} else {
(z - 146_096) / 146_097
};
let doe = (z - era * 146_097) as u64; let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365; let y = yoe as i64 + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); let mp = (5 * doy + 2) / 153; let day = (doy - (153 * mp + 2) / 5 + 1) as u32;
let month = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
let year = (y + if month <= 2 { 1 } else { 0 }) as i32;
(year, month, day, hour, minute, second)
}
fn hash_tile_raw(bytes: &[u8], algo: crate::manifest::ChecksumAlgo) -> [u8; 32] {
use crate::manifest::ChecksumAlgo;
match algo {
ChecksumAlgo::Blake3 => *blake3::hash(bytes).as_bytes(),
ChecksumAlgo::Sha256 => {
use sha2::Digest;
let mut hasher = sha2::Sha256::new();
hasher.update(bytes);
let out = hasher.finalize();
let mut buf = [0u8; 32];
buf.copy_from_slice(&out);
buf
}
}
}
fn hex_encode_32(bytes: &[u8; 32]) -> String {
let mut s = String::with_capacity(64);
use std::fmt::Write;
for b in bytes {
let _ = write!(s, "{:02x}", b);
}
s
}
fn color_type_for_format(fmt: crate::pixel::PixelFormat) -> Result<image::ColorType, SinkError> {
use crate::pixel::PixelFormat;
match fmt {
PixelFormat::Gray8 => Ok(image::ColorType::L8),
PixelFormat::Gray16 => Ok(image::ColorType::L16),
PixelFormat::Rgb8 => Ok(image::ColorType::Rgb8),
PixelFormat::Rgba8 => Ok(image::ColorType::Rgba8),
PixelFormat::Rgb16 => Ok(image::ColorType::Rgb16),
PixelFormat::Rgba16 => Ok(image::ColorType::Rgba16),
}
}
pub fn encode_png(raster: &Raster) -> Result<Vec<u8>, SinkError> {
let mut buf = Vec::new();
let encoder = image::codecs::png::PngEncoder::new(std::io::Cursor::new(&mut buf));
let ct = color_type_for_format(raster.format())?;
image::ImageEncoder::write_image(
encoder,
raster.data(),
raster.width(),
raster.height(),
ct.into(),
)
.map_err(|e| SinkError::Encode {
format: "png".to_string(),
source: e,
})?;
Ok(buf)
}
fn encode_jpeg(raster: &Raster, quality: u8) -> Result<Vec<u8>, SinkError> {
let mut buf = Vec::new();
let encoder =
image::codecs::jpeg::JpegEncoder::new_with_quality(std::io::Cursor::new(&mut buf), quality);
let ct = color_type_for_format(raster.format())?;
image::ImageEncoder::write_image(
encoder,
raster.data(),
raster.width(),
raster.height(),
ct.into(),
)
.map_err(|e| SinkError::Encode {
format: "jpeg".to_string(),
source: e,
})?;
Ok(buf)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pixel::PixelFormat;
use crate::planner::{Layout, PyramidPlanner};
fn make_tile(level: u32, col: u32, row: u32) -> Tile {
Tile {
coord: TileCoord::new(level, col, row),
raster: Raster::zeroed(8, 8, PixelFormat::Rgb8).unwrap(),
blank: false,
}
}
#[test]
fn memory_sink_collects_tiles() {
let sink = MemorySink::new();
sink.write_tile(&make_tile(0, 0, 0)).unwrap();
sink.write_tile(&make_tile(1, 0, 0)).unwrap();
sink.write_tile(&make_tile(1, 1, 0)).unwrap();
assert_eq!(sink.tile_count(), 3);
}
#[test]
fn memory_sink_preserves_coords() {
let sink = MemorySink::new();
sink.write_tile(&make_tile(3, 2, 5)).unwrap();
let tiles = sink.tiles();
assert_eq!(tiles[0].coord, TileCoord::new(3, 2, 5));
}
#[test]
fn memory_sink_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<MemorySink>();
}
#[test]
fn fs_sink_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<FsSink>();
}
#[test]
fn fs_sink_writes_tile_to_disk() {
let planner = PyramidPlanner::new(8, 8, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let top = plan.levels.last().unwrap();
let rel = plan
.tile_path(TileCoord::new(top.level, 0, 0), "raw")
.unwrap();
assert!(rel.ends_with("0_0.raw"), "unexpected path: {rel}");
let raster = Raster::zeroed(8, 8, PixelFormat::Rgb8).unwrap();
assert_eq!(raster.data().len(), 8 * 8 * 3);
#[cfg(not(miri))]
{
let dir = tempfile::tempdir().unwrap();
let sink = FsSink::new(dir.path().join("output_files"), plan.clone())
.with_format(TileFormat::Raw);
let tile = Tile {
coord: TileCoord::new(top.level, 0, 0),
raster,
blank: false,
};
sink.write_tile(&tile).unwrap();
let expected_path = dir.path().join("output_files").join(&rel);
assert!(
expected_path.exists(),
"Tile file not found at {expected_path:?}"
);
let contents = std::fs::read(&expected_path).unwrap();
assert_eq!(contents.len(), 8 * 8 * 3);
}
}
#[test]
fn fs_sink_creates_directory_structure() {
let planner = PyramidPlanner::new(512, 512, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let top = plan.levels.last().unwrap();
for col in 0..top.cols {
for row in 0..top.rows {
let path = plan.tile_path(TileCoord::new(top.level, col, row), "raw");
assert!(path.is_some(), "tile_path returned None for ({col}, {row})");
}
}
#[cfg(not(miri))]
{
let dir = tempfile::tempdir().unwrap();
let sink =
FsSink::new(dir.path().join("tiles"), plan.clone()).with_format(TileFormat::Raw);
for col in 0..top.cols {
for row in 0..top.rows {
let rect = plan.tile_rect(TileCoord::new(top.level, col, row)).unwrap();
let tile = Tile {
coord: TileCoord::new(top.level, col, row),
raster: Raster::zeroed(rect.width, rect.height, PixelFormat::Rgb8).unwrap(),
blank: false,
};
sink.write_tile(&tile).unwrap();
}
}
assert!(dir.path().join(format!("tiles/{}", top.level)).is_dir());
}
}
#[test]
fn fs_sink_writes_dzi_manifest() {
let planner = PyramidPlanner::new(1024, 768, 256, 1, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let manifest = plan
.dzi_manifest("png")
.expect("DeepZoom should produce a DZI manifest");
assert!(manifest.contains("Format=\"png\""));
assert!(manifest.contains("TileSize=\"256\""));
assert!(manifest.contains("Overlap=\"1\""));
assert!(manifest.contains("Width=\"1024\""));
assert!(manifest.contains("Height=\"768\""));
#[cfg(not(miri))]
{
let dir = tempfile::tempdir().unwrap();
let sink = FsSink::new(dir.path().join("output_files"), plan);
sink.finish().unwrap();
let dzi_path = dir.path().join("output_files.dzi");
assert!(dzi_path.exists(), "DZI manifest not found");
let on_disk = std::fs::read_to_string(&dzi_path).unwrap();
assert_eq!(on_disk, manifest);
}
}
#[test]
fn fs_sink_no_dzi_for_xyz() {
let planner = PyramidPlanner::new(256, 256, 256, 0, Layout::Xyz).unwrap();
let plan = planner.plan();
assert!(
plan.dzi_manifest("raw").is_none(),
"DZI should not exist for XYZ layout"
);
#[cfg(not(miri))]
{
let dir = tempfile::tempdir().unwrap();
let sink = FsSink::new(dir.path().join("tiles"), plan).with_format(TileFormat::Raw);
sink.finish().unwrap();
let dzi_path = dir.path().join("tiles.dzi");
assert!(
!dzi_path.exists(),
"DZI should not be written for XYZ layout"
);
}
}
#[test]
fn fs_sink_xyz_path_structure() {
let planner = PyramidPlanner::new(512, 512, 256, 0, Layout::Xyz).unwrap();
let plan = planner.plan();
let top = plan.levels.last().unwrap();
let rel = plan
.tile_path(TileCoord::new(top.level, 1, 0), "raw")
.unwrap();
let expected_suffix = format!("{}/1/0.raw", top.level);
assert!(
rel.ends_with(&expected_suffix),
"expected XYZ path ending with {expected_suffix}, got {rel}"
);
#[cfg(not(miri))]
{
let dir = tempfile::tempdir().unwrap();
let sink =
FsSink::new(dir.path().join("tiles"), plan.clone()).with_format(TileFormat::Raw);
let rect = plan.tile_rect(TileCoord::new(top.level, 1, 0)).unwrap();
let tile = Tile {
coord: TileCoord::new(top.level, 1, 0),
raster: Raster::zeroed(rect.width, rect.height, PixelFormat::Rgb8).unwrap(),
blank: false,
};
sink.write_tile(&tile).unwrap();
let expected = dir.path().join("tiles").join(&rel);
assert!(expected.exists(), "XYZ tile not found at {expected:?}");
}
}
#[test]
fn fs_sink_encodes_png() {
let raster = Raster::zeroed(8, 8, PixelFormat::Rgb8).unwrap();
let bytes = encode_png(&raster).unwrap();
assert_eq!(&bytes[..4], &[0x89, b'P', b'N', b'G']);
#[cfg(not(miri))]
{
let dir = tempfile::tempdir().unwrap();
let planner = PyramidPlanner::new(8, 8, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let top_level = plan.levels.last().unwrap().level;
let sink = FsSink::new(dir.path().join("out"), plan);
let tile = Tile {
coord: TileCoord::new(top_level, 0, 0),
raster,
blank: false,
};
sink.write_tile(&tile).unwrap();
let path = dir.path().join(format!("out/{top_level}/0_0.png"));
let on_disk = std::fs::read(&path).unwrap();
assert_eq!(&on_disk[..4], &[0x89, b'P', b'N', b'G']);
}
}
#[test]
fn fs_sink_encodes_jpeg() {
let raster = Raster::zeroed(8, 8, PixelFormat::Rgb8).unwrap();
let bytes = encode_jpeg(&raster, 85).unwrap();
assert_eq!(&bytes[..2], &[0xFF, 0xD8]);
#[cfg(not(miri))]
{
let dir = tempfile::tempdir().unwrap();
let planner = PyramidPlanner::new(8, 8, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let top_level = plan.levels.last().unwrap().level;
let sink = FsSink::new(dir.path().join("out"), plan)
.with_format(TileFormat::Jpeg { quality: 85 });
let tile = Tile {
coord: TileCoord::new(top_level, 0, 0),
raster,
blank: false,
};
sink.write_tile(&tile).unwrap();
let path = dir.path().join(format!("out/{top_level}/0_0.jpeg"));
let on_disk = std::fs::read(&path).unwrap();
assert_eq!(&on_disk[..2], &[0xFF, 0xD8]);
}
}
#[test]
fn fs_sink_deterministic_paths() {
let data = vec![42u8; 256 * 256 * 3];
let raster = Raster::new(256, 256, PixelFormat::Rgb8, data).unwrap();
let enc1 = encode_png(&raster).unwrap();
let enc2 = encode_png(&raster).unwrap();
assert_eq!(enc1, enc2);
#[cfg(not(miri))]
{
let planner = PyramidPlanner::new(512, 512, 256, 0, Layout::DeepZoom).unwrap();
let plan = planner.plan();
let top = plan.levels.last().unwrap();
let dir1 = tempfile::tempdir().unwrap();
let dir2 = tempfile::tempdir().unwrap();
let sink1 =
FsSink::new(dir1.path().join("out"), plan.clone()).with_format(TileFormat::Raw);
let sink2 =
FsSink::new(dir2.path().join("out"), plan.clone()).with_format(TileFormat::Raw);
let tile = Tile {
coord: TileCoord::new(top.level, 0, 0),
raster,
blank: false,
};
sink1.write_tile(&tile).unwrap();
sink2.write_tile(&tile).unwrap();
let bytes1 =
std::fs::read(dir1.path().join(format!("out/{}/0_0.raw", top.level))).unwrap();
let bytes2 =
std::fs::read(dir2.path().join(format!("out/{}/0_0.raw", top.level))).unwrap();
assert_eq!(bytes1, bytes2);
}
}
#[test]
fn encode_png_gray8() {
let raster = Raster::zeroed(4, 4, PixelFormat::Gray8).unwrap();
let bytes = encode_png(&raster).unwrap();
assert_eq!(&bytes[..4], &[0x89, b'P', b'N', b'G']);
}
#[test]
fn encode_png_rgba8() {
let raster = Raster::zeroed(4, 4, PixelFormat::Rgba8).unwrap();
let bytes = encode_png(&raster).unwrap();
assert_eq!(&bytes[..4], &[0x89, b'P', b'N', b'G']);
}
#[test]
fn encode_jpeg_rgb8() {
let raster = Raster::zeroed(4, 4, PixelFormat::Rgb8).unwrap();
let bytes = encode_jpeg(&raster, 90).unwrap();
assert_eq!(&bytes[..2], &[0xFF, 0xD8]);
}
}