pub mod full;
pub mod simple;
pub mod rgba;
use crate::meta::attributes::*;
use crate::compression::{Compression, ByteVec};
use crate::math::*;
use std::io::{Read, Seek, Write, Cursor};
use crate::error::{Result, Error, UnitResult, usize_to_i32};
use crate::meta::{MetaData, Header, TileIndices, Blocks};
use crate::chunks::{Chunk, Block, TileBlock, ScanLineBlock, TileCoordinates};
use crate::io::{PeekRead, Tracking};
use rayon::iter::{ParallelIterator, ParallelBridge};
use crate::io::Data;
use smallvec::SmallVec;
use std::ops::Range;
use std::convert::TryFrom;
use std::collections::BTreeMap;
#[derive(Debug)]
pub struct WriteOptions<P: OnWriteProgress> {
pub parallel_compression: bool,
pub pedantic: bool,
pub on_progress: P,
}
#[derive(Debug)]
pub struct ReadOptions<P: OnReadProgress> {
pub parallel_decompression: bool,
pub on_progress: P,
}
pub mod write_options {
use super::*;
pub fn default() -> WriteOptions<()> { self::high() }
pub fn higher() -> WriteOptions<()> {
WriteOptions {
parallel_compression: true,
pedantic: false,
on_progress: (),
}
}
pub fn high() -> WriteOptions<()> {
WriteOptions {
parallel_compression: true, pedantic: true,
on_progress: (),
}
}
pub fn low() -> WriteOptions<()> {
WriteOptions {
parallel_compression: false, pedantic: true,
on_progress: (),
}
}
}
pub mod read_options {
use super::*;
pub fn default() -> ReadOptions<()> { self::high() }
pub fn high() -> ReadOptions<()> {
ReadOptions {
parallel_decompression: true,
on_progress: (),
}
}
pub fn low() -> ReadOptions<()> {
ReadOptions {
parallel_decompression: false,
on_progress: (),
}
}
}
#[derive(Clone, Copy, Eq, Hash, PartialEq, Debug)]
pub struct BlockIndex {
pub layer: usize,
pub pixel_position: Vec2<usize>,
pub pixel_size: Vec2<usize>,
pub level: Vec2<usize>,
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct UncompressedBlock {
pub index: BlockIndex,
pub data: ByteVec,
}
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub struct LineSlice<T> {
pub location: LineIndex,
pub value: T,
}
pub type LineRef<'s> = LineSlice<&'s [u8]>;
pub type LineRefMut<'s> = LineSlice<&'s mut [u8]>;
#[derive(Clone, Copy, Eq, PartialEq, Debug, Hash)]
pub struct LineIndex {
pub layer: usize,
pub channel: usize,
pub level: Vec2<usize>,
pub position: Vec2<usize>,
pub sample_count: usize,
}
pub trait OnWriteProgress {
#[must_use]
fn on_write_progressed(&mut self, progress: f32, bytes_written: usize) -> UnitResult;
}
pub trait OnReadProgress {
#[must_use]
fn on_read_progressed(&mut self, progress: f32) -> UnitResult;
}
impl<F> OnWriteProgress for F where F: FnMut(f32, usize) -> UnitResult {
#[inline] fn on_write_progressed(&mut self, progress: f32, bytes_written: usize) -> UnitResult { self(progress, bytes_written) }
}
impl<F> OnReadProgress for F where F: FnMut(f32) -> UnitResult {
#[inline] fn on_read_progressed(&mut self, progress: f32) -> UnitResult { self(progress) }
}
impl OnWriteProgress for () {
#[inline] fn on_write_progressed(&mut self, _progress: f32, _bytes_written: usize) -> UnitResult { Ok(()) }
}
impl OnReadProgress for () {
#[inline] fn on_read_progressed(&mut self, _progress: f32) -> UnitResult { Ok(()) }
}
impl<'s> LineRefMut<'s> {
#[inline]
#[must_use]
pub fn write_samples_from_slice<T: crate::io::Data>(self, slice: &[T]) -> UnitResult {
debug_assert_eq!(slice.len(), self.location.sample_count, "slice size does not match the line width");
debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size");
T::write_slice(&mut Cursor::new(self.value), slice)
}
#[inline]
#[must_use]
pub fn write_samples<T: crate::io::Data>(self, mut get_sample: impl FnMut(usize) -> T) -> UnitResult {
debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size");
let mut write = Cursor::new(self.value);
for index in 0..self.location.sample_count {
T::write(get_sample(index), &mut write)?;
}
Ok(())
}
}
impl LineRef<'_> {
pub fn read_samples_into_slice<T: crate::io::Data>(self, slice: &mut [T]) -> UnitResult {
debug_assert_eq!(slice.len(), self.location.sample_count, "slice size does not match the line width");
debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size");
T::read_slice(&mut Cursor::new(self.value), slice)
}
pub fn read_samples<T: crate::io::Data>(&self) -> impl Iterator<Item = Result<T>> + '_ {
debug_assert_eq!(self.value.len(), self.location.sample_count * T::BYTE_SIZE, "sample type size does not match line byte size");
let mut read = self.value.clone();
(0..self.location.sample_count).map(move |_| T::read(&mut read))
}
}
#[inline]
#[must_use]
pub fn read_all_lines_from_buffered<T>(
read: impl Read + Send,
new: impl Fn(&[Header]) -> Result<T>,
mut insert: impl FnMut(&mut T, &[Header], LineRef<'_>) -> UnitResult,
options: ReadOptions<impl OnReadProgress>,
) -> Result<T>
{
let (meta_data, chunk_count, mut read_chunk) = self::read_all_compressed_chunks_from_buffered(read)?;
let meta_data_ref = &meta_data;
let read_chunks = std::iter::from_fn(move || read_chunk(meta_data_ref));
let mut result = new(meta_data.headers.as_slice())?;
for_lines_in_chunks(
read_chunks, &meta_data,
|meta, line| insert(&mut result, meta, line),
chunk_count, options
)?;
Ok(result)
}
#[inline]
#[must_use]
pub fn read_filtered_lines_from_buffered<T>(
read: impl Read + Seek + Send,
new: impl Fn(&[Header]) -> Result<T>,
filter: impl Fn(&T, &Header, &TileIndices) -> bool,
mut insert: impl FnMut(&mut T, &[Header], LineRef<'_>) -> UnitResult,
options: ReadOptions<impl OnReadProgress>,
) -> Result<T>
{
let (meta_data, mut value, chunk_count, mut read_chunk) = self::read_filtered_chunks_from_buffered(read, new, filter)?;
let read_chunks = std::iter::from_fn(|| read_chunk(&meta_data));
for_lines_in_chunks(
read_chunks, &meta_data,
|meta, line| insert(&mut value, meta, line),
chunk_count, options
)?;
Ok(value)
}
#[inline]
#[must_use]
fn for_lines_in_chunks(
chunks: impl Send + Iterator<Item = Result<Chunk>>,
meta_data: &MetaData,
mut for_each: impl FnMut(&[Header], LineRef<'_>) -> UnitResult,
total_chunk_count: usize,
mut options: ReadOptions<impl OnReadProgress>,
) -> UnitResult
{
let has_compression = meta_data.headers.iter()
.find(|header| header.compression != Compression::Uncompressed).is_some();
let mut processed_chunk_count = 0;
if options.parallel_decompression && has_compression {
let (sender, receiver) = std::sync::mpsc::channel();
chunks.par_bridge()
.map(|chunk| UncompressedBlock::decompress_chunk(chunk?, &meta_data))
.try_for_each_with(sender, |sender, result| {
result.map(|block: UncompressedBlock| sender.send(block).expect("threading error"))
})?;
for decompressed in receiver {
options.on_progress.on_read_progressed(processed_chunk_count as f32 / total_chunk_count as f32)?;
processed_chunk_count += 1;
let header = meta_data.headers.get(decompressed.index.layer)
.ok_or(Error::invalid("chunk index"))?;
for (bytes, line) in decompressed.index.line_indices(header) {
for_each(meta_data.headers.as_slice(), LineSlice { location: line, value: &decompressed.data[bytes] })?;
}
}
Ok(())
}
else {
for chunk in chunks {
options.on_progress.on_read_progressed(processed_chunk_count as f32 / total_chunk_count as f32)?;
processed_chunk_count += 1;
let decompressed = UncompressedBlock::decompress_chunk(chunk?, &meta_data)?;
let header = meta_data.headers.get(decompressed.index.layer)
.ok_or(Error::invalid("chunk index"))?;
for (bytes, line) in decompressed.index.line_indices(header) {
for_each(meta_data.headers.as_slice(), LineSlice { location: line, value: &decompressed.data[bytes] })?;
}
}
Ok(())
}
}
#[inline]
#[must_use]
pub fn read_all_compressed_chunks_from_buffered<'m>(
read: impl Read + Send,
) -> Result<(MetaData, usize, impl FnMut(&'m MetaData) -> Option<Result<Chunk>>)>
{
let mut read = PeekRead::new(read);
let meta_data = MetaData::read_from_buffered_peekable(&mut read)?;
let mut remaining_chunk_count = usize::try_from(MetaData::skip_offset_tables(&mut read, &meta_data.headers)?)
.expect("too large chunk count for this machine");
Ok((meta_data, remaining_chunk_count, move |meta_data| {
if remaining_chunk_count > 0 {
remaining_chunk_count -= 1;
Some(Chunk::read(&mut read, meta_data))
}
else {
None
}
}))
}
#[inline]
#[must_use]
pub fn read_filtered_chunks_from_buffered<'m, T>(
read: impl Read + Seek + Send,
new: impl Fn(&[Header]) -> Result<T>,
filter: impl Fn(&T, &Header, &TileIndices) -> bool,
) -> Result<(MetaData, T, usize, impl FnMut(&'m MetaData) -> Option<Result<Chunk>>)>
{
let skip_read = Tracking::new(read);
let mut read = PeekRead::new(skip_read);
let meta_data = MetaData::read_from_buffered_peekable(&mut read)?;
let value = new(meta_data.headers.as_slice())?;
let offset_tables = MetaData::read_offset_tables(&mut read, &meta_data.headers)?;
let mut offsets = Vec::with_capacity(meta_data.headers.len() * 32);
for (header_index, header) in meta_data.headers.iter().enumerate() {
for (block_index, block) in header.blocks_increasing_y_order().enumerate() {
if filter(&value, header, &block) {
offsets.push(offset_tables[header_index][block_index])
}
};
}
offsets.sort();
let mut offsets = offsets.into_iter();
let block_count = offsets.len();
Ok((meta_data, value, block_count, move |meta_data| {
offsets.next().map(|offset|{
read.skip_to(usize::try_from(offset).expect("too large chunk position for this machine"))?;
Chunk::read(&mut read, meta_data)
})
}))
}
#[inline]
#[must_use]
pub fn uncompressed_image_blocks_ordered<'l>(
meta_data: &'l MetaData,
get_line: &'l (impl Sync + 'l + (Fn(&[Header], LineRefMut<'_>) -> UnitResult))
) -> impl Iterator<Item = Result<(usize, UncompressedBlock)>> + 'l + Send
{
meta_data.headers.iter().enumerate()
.flat_map(move |(layer_index, header)|{
header.enumerate_ordered_blocks().map(move |(chunk_index, tile)|{
let data_indices = header.get_absolute_block_indices(tile.location).expect("tile coordinate bug");
let block_indices = BlockIndex {
layer: layer_index, level: tile.location.level_index,
pixel_position: data_indices.position.to_usize("data indices start").expect("data index bug"),
pixel_size: data_indices.size,
};
let mut block_bytes = vec![0_u8; header.max_block_byte_size()];
let mut written_block_byte_count = 0;
for (byte_range, line_index) in block_indices.line_indices(header) {
written_block_byte_count = byte_range.end;
let line_mut = LineRefMut {
value: &mut block_bytes[byte_range],
location: line_index,
};
get_line(meta_data.headers.as_slice(), line_mut)?;
}
block_bytes.truncate(written_block_byte_count);
Ok((chunk_index, UncompressedBlock {
index: block_indices,
data: block_bytes
}))
})
})
}
#[inline]
#[must_use]
pub fn for_compressed_blocks_in_image(
meta_data: &MetaData, get_line: impl Sync + Fn(&[Header], LineRefMut<'_>) -> UnitResult,
parallel: bool, mut write_chunk: impl FnMut(usize, Chunk) -> UnitResult
) -> UnitResult
{
let blocks = uncompressed_image_blocks_ordered(meta_data, &get_line);
let parallel = parallel && meta_data.headers.iter()
.any(|header| header.compression != Compression::Uncompressed);
let requires_sorting = meta_data.headers.iter()
.any(|header| header.line_order != LineOrder::Unspecified);
if parallel {
let (sender, receiver) = std::sync::mpsc::channel();
blocks.par_bridge()
.map(|result| Ok({
let (chunk_index, block) = result?;
let block = block.compress_to_chunk(meta_data)?;
(chunk_index, block)
}))
.try_for_each_with(sender, |sender, result: Result<(usize, Chunk)>| {
result.map(|block| sender.send(block).expect("threading error"))
})?;
if !requires_sorting {
for (chunk_index, compressed_chunk) in receiver {
write_chunk(chunk_index, compressed_chunk)?;
}
}
else {
let mut expected_id_order = meta_data.headers.iter().enumerate()
.flat_map(|(layer, header)| header.enumerate_ordered_blocks().map(move |(chunk, _)| (layer, chunk)));
let mut next_id = expected_id_order.next();
let mut pending_blocks = BTreeMap::new();
for (chunk_index, compressed_chunk) in receiver {
pending_blocks.insert((compressed_chunk.layer_index, chunk_index), compressed_chunk);
while let Some(pending_chunk) = next_id.as_ref().and_then(|id| pending_blocks.remove(id)) {
let pending_chunk_index = next_id.unwrap().1;
write_chunk(pending_chunk_index, pending_chunk)?;
next_id = expected_id_order.next();
}
}
assert!(expected_id_order.next().is_none(), "expected more blocks bug");
assert_eq!(pending_blocks.len(), 0, "pending blocks left after processing bug");
}
}
else {
for result in blocks {
let (chunk_index, uncompressed_block) = result?;
let chunk = uncompressed_block.compress_to_chunk(meta_data)?;
write_chunk(chunk_index, chunk)?;
}
}
Ok(())
}
#[inline]
#[must_use]
pub fn write_all_lines_to_buffered(
write: impl Write + Seek,
mut meta_data: MetaData,
get_line: impl Sync + Fn(&[Header], LineRefMut<'_>) -> UnitResult,
mut options: WriteOptions<impl OnWriteProgress>,
) -> UnitResult
{
let has_compression = meta_data.headers.iter()
.any(|header| header.compression != Compression::Uncompressed);
if !options.parallel_compression || !has_compression {
for header in &mut meta_data.headers {
if header.line_order == LineOrder::Unspecified {
header.line_order = LineOrder::Increasing;
}
}
}
let mut write = Tracking::new(write);
meta_data.write_validating_to_buffered(&mut write, options.pedantic)?;
let offset_table_start_byte = write.byte_position();
let offset_table_size: usize = meta_data.headers.iter()
.map(|header| header.chunk_count).sum();
write.seek_write_to(write.byte_position() + offset_table_size * std::mem::size_of::<u64>())?;
let mut offset_tables: Vec<Vec<u64>> = meta_data.headers.iter()
.map(|header| vec![0; header.chunk_count]).collect();
let total_chunk_count = offset_table_size as f32;
let mut processed_chunk_count = 0;
for_compressed_blocks_in_image(&meta_data, get_line, options.parallel_compression, |chunk_index, chunk|{
offset_tables[chunk.layer_index][chunk_index] = write.byte_position() as u64;
chunk.write(&mut write, meta_data.headers.as_slice())?;
options.on_progress.on_write_progressed(
processed_chunk_count as f32 / total_chunk_count, write.byte_position()
)?;
processed_chunk_count += 1;
Ok(())
})?;
write.seek_write_to(offset_table_start_byte)?;
for offset_table in offset_tables {
u64::write_slice(&mut write, offset_table.as_slice())?;
}
Ok(())
}
impl BlockIndex {
#[inline]
#[must_use]
pub fn line_indices(&self, header: &Header) -> impl Iterator<Item=(Range<usize>, LineIndex)> {
struct LineIter {
layer: usize, level: Vec2<usize>, width: usize,
end_y: usize, x: usize, channel_sizes: SmallVec<[usize; 8]>,
byte: usize, channel: usize, y: usize,
};
impl Iterator for LineIter {
type Item = (Range<usize>, LineIndex);
fn next(&mut self) -> Option<Self::Item> {
if self.y < self.end_y {
let byte_len = self.channel_sizes[self.channel];
let return_value = (
(self.byte .. self.byte + byte_len),
LineIndex {
channel: self.channel,
layer: self.layer,
level: self.level,
position: Vec2(self.x, self.y),
sample_count: self.width,
}
);
{
self.byte += byte_len;
self.channel += 1;
if self.channel == self.channel_sizes.len() {
self.channel = 0;
self.y += 1;
}
}
Some(return_value)
}
else {
None
}
}
}
let channel_line_sizes: SmallVec<[usize; 8]> = header.channels.list.iter()
.map(move |channel| self.pixel_size.0 * channel.pixel_type.bytes_per_sample())
.collect();
LineIter {
layer: self.layer,
level: self.level,
width: self.pixel_size.0,
x: self.pixel_position.0,
end_y: self.pixel_position.1 + self.pixel_size.1,
channel_sizes: channel_line_sizes,
byte: 0,
channel: 0,
y: self.pixel_position.1
}
}
}
impl UncompressedBlock {
#[inline]
#[must_use]
pub fn decompress_chunk(chunk: Chunk, meta_data: &MetaData) -> Result<Self> {
let header: &Header = meta_data.headers.get(chunk.layer_index)
.ok_or(Error::invalid("chunk layer index"))?;
let tile_data_indices = header.get_block_data_indices(&chunk.block)?;
let absolute_indices = header.get_absolute_block_indices(tile_data_indices)?;
absolute_indices.validate(header.data_size)?;
match chunk.block {
Block::Tile(TileBlock { compressed_pixels, .. }) |
Block::ScanLine(ScanLineBlock { compressed_pixels, .. }) => Ok(UncompressedBlock {
data: header.compression.decompress_image_section(header, compressed_pixels, absolute_indices)?,
index: BlockIndex {
layer: chunk.layer_index,
pixel_position: absolute_indices.position.to_usize("data indices start")?,
level: tile_data_indices.level_index,
pixel_size: absolute_indices.size,
}
}),
_ => return Err(Error::unsupported("deep data not supported yet"))
}
}
#[inline]
#[must_use]
pub fn compress_to_chunk(self, meta_data: &MetaData) -> Result<Chunk> {
let UncompressedBlock { data, index } = self;
let header: &Header = meta_data.headers.get(index.layer)
.expect("block layer index bug");
let expected_byte_size = header.channels.bytes_per_pixel * self.index.pixel_size.area();
if expected_byte_size != data.len() {
panic!("get_line byte size should be {} but was {}", expected_byte_size, data.len());
}
let compressed_data = header.compression.compress_image_section(data)?;
Ok(Chunk {
layer_index: index.layer,
block : match header.blocks {
Blocks::ScanLines => Block::ScanLine(ScanLineBlock {
compressed_pixels: compressed_data,
y_coordinate: usize_to_i32(index.pixel_position.1) + header.own_attributes.data_position.1,
}),
Blocks::Tiles(tiles) => Block::Tile(TileBlock {
compressed_pixels: compressed_data,
coordinates: TileCoordinates {
level_index: index.level,
tile_index: index.pixel_position / tiles.tile_size,
},
}),
}
})
}
}