pub mod lines;
pub mod samples;
pub mod chunk;
use crate::compression::{ByteVec, Compression};
use crate::math::*;
use crate::error::{Result, Error, usize_to_i32, UnitResult, u64_to_usize};
use crate::meta::{MetaData, Blocks, TileIndices, OffsetTables};
use crate::block::chunk::{Chunk, Block, TileBlock, ScanLineBlock, TileCoordinates};
use crate::meta::attribute::LineOrder;
use rayon::prelude::ParallelBridge;
use rayon::iter::ParallelIterator;
use smallvec::alloc::collections::BTreeMap;
use std::convert::TryFrom;
use crate::io::{Tracking, PeekRead};
use std::io::{Seek, Read};
use crate::image::{ReadOptions, OnReadProgress};
use crate::meta::header::Header;
#[derive(Clone, Copy, Eq, Hash, PartialEq, Debug)]
pub struct BlockIndex {
pub layer: usize,
pub pixel_position: Vec2<usize>,
pub pixel_size: Vec2<usize>,
pub level: Vec2<usize>,
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct UncompressedBlock {
pub index: BlockIndex,
pub data: ByteVec,
}
#[inline]
#[must_use]
pub fn read_all_blocks_from_buffered<T>(
read: impl Read + Send, new: impl Fn(&[Header]) -> Result<T>,
mut insert: impl FnMut(&mut T, &[Header], UncompressedBlock) -> UnitResult,
options: ReadOptions<impl OnReadProgress>,
) -> Result<T>
{
let (meta_data, chunk_count, mut read_chunk) = self::read_all_compressed_chunks_from_buffered(read, options.max_pixel_bytes, options.pedantic)?;
let meta_data_ref = &meta_data;
let read_chunks = std::iter::from_fn(move || read_chunk(meta_data_ref));
let mut result = new(meta_data.headers.as_slice())?;
for_decompressed_blocks_in_chunks(
read_chunks, &meta_data,
|meta, block| insert(&mut result, meta, block),
chunk_count, options
)?;
Ok(result)
}
#[inline]
#[must_use]
pub fn read_filtered_blocks_from_buffered<T>(
read: impl Read + Seek + Send, new: impl FnOnce(&[Header]) -> Result<T>, filter: impl Fn(&T, (usize, &Header), (usize, &TileIndices)) -> bool,
mut insert: impl FnMut(&mut T, &[Header], UncompressedBlock) -> UnitResult,
options: ReadOptions<impl OnReadProgress>,
) -> Result<T>
{
let (meta_data, mut value, chunk_count, mut read_chunk) = {
self::read_filtered_chunks_from_buffered(read, new, filter, options.max_pixel_bytes, options.pedantic)?
};
for_decompressed_blocks_in_chunks(
std::iter::from_fn(|| read_chunk(&meta_data)), &meta_data,
|meta, line| insert(&mut value, meta, line),
chunk_count, options
)?;
Ok(value)
}
#[inline]
#[must_use]
fn for_decompressed_blocks_in_chunks(
chunks: impl Send + Iterator<Item = Result<Chunk>>,
meta_data: &MetaData,
mut for_each: impl FnMut(&[Header], UncompressedBlock) -> UnitResult,
total_chunk_count: usize,
mut options: ReadOptions<impl OnReadProgress>,
) -> UnitResult
{
let pedantic = options.pedantic;
let has_compression = meta_data.headers.iter() .any(|header| header.compression != Compression::Uncompressed);
let mut processed_chunk_count = 0;
if options.parallel_decompression && has_compression {
let (sender, receiver) = std::sync::mpsc::channel();
chunks.par_bridge()
.map(|chunk| UncompressedBlock::decompress_chunk(chunk?, &meta_data, pedantic))
.try_for_each_with(sender, |sender, result| {
result.map(|block: UncompressedBlock| sender.send(block).expect("threading error"))
})?;
for decompressed in receiver {
options.on_progress.on_read_progressed(processed_chunk_count as f32 / total_chunk_count as f32)?;
processed_chunk_count += 1;
for_each(meta_data.headers.as_slice(), decompressed)?; }
}
else {
for chunk in chunks {
options.on_progress.on_read_progressed(processed_chunk_count as f32 / total_chunk_count as f32)?;
processed_chunk_count += 1;
let decompressed = UncompressedBlock::decompress_chunk(chunk?, &meta_data, options.pedantic)?;
for_each(meta_data.headers.as_slice(), decompressed)?; }
}
debug_assert_eq!(processed_chunk_count, total_chunk_count, "some chunks were not read");
Ok(())
}
#[inline]
#[must_use]
pub fn read_all_compressed_chunks_from_buffered<'m>(
read: impl Read + Send, max_pixel_bytes: Option<usize>,
pedantic: bool
) -> Result<(MetaData, usize, impl FnMut(&'m MetaData) -> Option<Result<Chunk>>)>
{
let mut read = PeekRead::new(Tracking::new(read));
let meta_data = MetaData::read_validated_from_buffered_peekable(&mut read, max_pixel_bytes, pedantic)?;
let mut remaining_chunk_count = {
if pedantic {
let offset_tables = MetaData::read_offset_tables(&mut read, &meta_data.headers)?;
validate_offset_tables(meta_data.headers.as_slice(), &offset_tables, read.byte_position())?;
offset_tables.iter().map(|table| table.len()).sum()
}
else {
usize::try_from(MetaData::skip_offset_tables(&mut read, &meta_data.headers)?)
.expect("too large chunk count for this machine")
}
};
Ok((meta_data, remaining_chunk_count, move |meta_data| {
if remaining_chunk_count > 0 {
remaining_chunk_count -= 1;
Some(Chunk::read(&mut read, meta_data))
}
else {
if pedantic && read.peek_u8().is_ok() {
return Some(Err(Error::invalid("end of file expected")));
}
None
}
}))
}
#[inline]
#[must_use]
pub fn read_filtered_chunks_from_buffered<'m, T>(
read: impl Read + Seek + Send, new: impl FnOnce(&[Header]) -> Result<T>,
filter: impl Fn(&T, (usize, &Header), (usize, &TileIndices)) -> bool,
max_pixel_bytes: Option<usize>,
pedantic: bool
) -> Result<(MetaData, T, usize, impl FnMut(&'m MetaData) -> Option<Result<Chunk>>)>
{
let skip_read = Tracking::new(read);
let mut read = PeekRead::new(skip_read);
let meta_data = MetaData::read_validated_from_buffered_peekable(&mut read, max_pixel_bytes, pedantic)?;
let value = new(meta_data.headers.as_slice())?;
let offset_tables = MetaData::read_offset_tables(&mut read, &meta_data.headers)?;
if pedantic {
validate_offset_tables(meta_data.headers.as_slice(), &offset_tables, read.byte_position())?;
}
let mut filtered_offsets = Vec::with_capacity((meta_data.headers.len() * 32).min(2*2048));
for (header_index, header) in meta_data.headers.iter().enumerate() { for (block_index, block) in header.blocks_increasing_y_order().enumerate() { if filter(&value, (header_index, header), (block_index, &block)) {
filtered_offsets.push(offset_tables[header_index][block_index]) }
};
}
filtered_offsets.sort(); let mut filtered_offsets = filtered_offsets.into_iter();
let block_count = filtered_offsets.len();
Ok((meta_data, value, block_count, move |meta_data| {
filtered_offsets.next().map(|offset|{
read.skip_to(usize::try_from(offset).expect("too large chunk position for this machine"))?; Chunk::read(&mut read, meta_data)
})
}))
}
fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult {
let max_pixel_bytes: usize = headers.iter() .map(|header| header.max_pixel_file_bytes())
.sum();
let end_byte = chunks_start_byte + max_pixel_bytes;
let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64))
.any(|chunk_start| chunk_start < chunks_start_byte || chunk_start > end_byte);
if is_invalid { Err(Error::invalid("offset table")) }
else { Ok(()) }
}
#[inline]
#[must_use]
pub fn uncompressed_image_blocks_ordered<'l>(
headers: &'l [Header],
get_block: &'l (impl 'l + Sync + (Fn(&[Header], BlockIndex) -> Vec<u8>)) ) -> impl 'l + Iterator<Item = Result<(usize, UncompressedBlock)>> + Send {
headers.iter().enumerate()
.flat_map(move |(layer_index, header)|{
header.enumerate_ordered_blocks().map(move |(chunk_index, tile)|{
let data_indices = header.get_absolute_block_pixel_coordinates(tile.location).expect("tile coordinate bug");
let block_indices = BlockIndex {
layer: layer_index, level: tile.location.level_index,
pixel_position: data_indices.position.to_usize("data indices start").expect("data index bug"),
pixel_size: data_indices.size,
};
let block_bytes = get_block(headers, block_indices);
Ok((chunk_index, UncompressedBlock {
index: block_indices,
data: block_bytes
}))
})
})
}
#[inline]
#[must_use]
pub fn for_compressed_blocks_in_image(
headers: &[Header], get_tile: impl Sync + Fn(&[Header], BlockIndex) -> Vec<u8>,
parallel: bool, mut write_chunk: impl FnMut(usize, Chunk) -> UnitResult
) -> UnitResult
{
let blocks = uncompressed_image_blocks_ordered(headers, &get_tile);
let parallel = parallel && headers.iter() .any(|header| header.compression != Compression::Uncompressed);
let requires_sorting = headers.iter()
.any(|header| header.line_order != LineOrder::Unspecified);
if parallel {
let (sender, receiver) = std::sync::mpsc::channel();
blocks.par_bridge()
.map(|result| Ok({
let (chunk_index, block) = result?;
let block = block.compress_to_chunk(headers)?;
(chunk_index, block)
}))
.try_for_each_with(sender, |sender, result: Result<(usize, Chunk)>| {
result.map(|block| sender.send(block).expect("threading error"))
})?;
if !requires_sorting {
for (chunk_index, compressed_chunk) in receiver {
write_chunk(chunk_index, compressed_chunk)?;
}
}
else {
let mut expected_id_order = headers.iter().enumerate()
.flat_map(|(layer, header)| header.enumerate_ordered_blocks().map(move |(chunk, _)| (layer, chunk)));
let mut next_id = expected_id_order.next();
let mut pending_blocks = BTreeMap::new();
for (chunk_index, compressed_chunk) in receiver {
pending_blocks.insert((compressed_chunk.layer_index, chunk_index), compressed_chunk);
while let Some(pending_chunk) = next_id.as_ref().and_then(|id| pending_blocks.remove(id)) {
let pending_chunk_index = next_id.unwrap().1; write_chunk(pending_chunk_index, pending_chunk)?;
next_id = expected_id_order.next();
}
}
assert!(expected_id_order.next().is_none(), "expected more blocks bug");
assert_eq!(pending_blocks.len(), 0, "pending blocks left after processing bug");
}
}
else {
for result in blocks {
let (chunk_index, uncompressed_block) = result?; let chunk = uncompressed_block.compress_to_chunk(headers)?;
write_chunk(chunk_index, chunk)?;
}
}
Ok(())
}
impl UncompressedBlock {
#[inline]
#[must_use]
pub fn decompress_chunk(chunk: Chunk, meta_data: &MetaData, pedantic: bool) -> Result<Self> {
let header: &Header = meta_data.headers.get(chunk.layer_index)
.ok_or(Error::invalid("chunk layer index"))?;
let tile_data_indices = header.get_block_data_indices(&chunk.block)?;
let absolute_indices = header.get_absolute_block_pixel_coordinates(tile_data_indices)?;
absolute_indices.validate(Some(header.layer_size))?;
match chunk.block {
Block::Tile(TileBlock { compressed_pixels, .. }) |
Block::ScanLine(ScanLineBlock { compressed_pixels, .. }) => Ok(UncompressedBlock {
data: header.compression.decompress_image_section(header, compressed_pixels, absolute_indices, pedantic)?,
index: BlockIndex {
layer: chunk.layer_index,
pixel_position: absolute_indices.position.to_usize("data indices start")?,
level: tile_data_indices.level_index,
pixel_size: absolute_indices.size,
}
}),
_ => return Err(Error::unsupported("deep data not supported yet"))
}
}
#[inline]
#[must_use]
pub fn compress_to_chunk(self, headers: &[Header]) -> Result<Chunk> {
let UncompressedBlock { data, index } = self;
let header: &Header = headers.get(index.layer)
.expect("block layer index bug");
let expected_byte_size = header.channels.bytes_per_pixel * self.index.pixel_size.area(); if expected_byte_size != data.len() {
panic!("get_line byte size should be {} but was {}", expected_byte_size, data.len());
}
let tile_coordinates = TileCoordinates {
tile_index: index.pixel_position / header.max_block_pixel_size(),
level_index: index.level,
};
let absolute_indices = header.get_absolute_block_pixel_coordinates(tile_coordinates)?;
absolute_indices.validate(Some(header.layer_size))?;
debug_assert_eq!(
&header.compression.decompress_image_section(
header,
header.compression.compress_image_section(header, data.clone(), absolute_indices)?,
absolute_indices,
true
).unwrap(),
&data, "compression method not round trippin'"
);
let compressed_data = header.compression.compress_image_section(header, data, absolute_indices)?;
Ok(Chunk {
layer_index: index.layer,
block : match header.blocks {
Blocks::ScanLines => Block::ScanLine(ScanLineBlock {
compressed_pixels: compressed_data,
y_coordinate: usize_to_i32(index.pixel_position.y()) + header.own_attributes.layer_position.y(),
}),
Blocks::Tiles(_) => Block::Tile(TileBlock {
compressed_pixels: compressed_data,
coordinates: tile_coordinates,
}),
}
})
}
}