use crate::{
error::TiffResult,
structs::{Offset, Tag, TagData},
util::fix_endianness,
ByteOrder,
};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::StreamExt, TryStreamExt};
use log::debug;
use std::{
collections::BTreeMap,
io::{self, BufRead, BufReader, Read, Take},
num::TryFromIntError,
ops::Range,
};
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[allow(clippy::single_range_in_vec_init)]
pub trait CogReader: Sync {
const IFD_REQ_SIZE: u64;
async fn get_range(&self, range: Range<u64>) -> TiffResult<Bytes>;
async fn get_ranges(&self, ranges: &[Range<u64>]) -> TiffResult<Vec<Bytes>> {
coalesce_ranges(
ranges,
|range| self.get_range(range),
OBJECT_STORE_COALESCE_DEFAULT,
)
.await
}
async fn get_tags(
&self,
tags: BTreeMap<Tag, Offset>,
byte_order: ByteOrder,
) -> TiffResult<BTreeMap<Tag, TagData>> {
let mut ranges = Vec::new();
for offset in tags.values() {
ranges.push(
offset.offset
..offset.offset + offset.count * u64::try_from(offset.tag_type.size())?,
);
}
let resp = self.get_ranges(&ranges).await?;
debug!(
"received data, lengths: {:?}",
resp.iter().map(|v| v.len()).collect::<Vec<_>>()
);
let mut res = BTreeMap::new();
for (i, (tag, offset)) in tags.iter().enumerate() {
let mut e = TagData::new(offset.tag_type, usize::try_from(offset.count)?);
e.buf_mut().copy_from_slice(&resp[i][..]);
fix_endianness(
e.buf_mut(),
byte_order,
offset.tag_type.primitive_size() * 8,
);
res.insert(*tag, e);
}
Ok(res)
}
async fn get_chunks(&self, chunks: &[Range<u64>]) -> TiffResult<Vec<Bytes>> {
self.get_ranges(chunks).await
}
}
pub const OBJECT_STORE_COALESCE_DEFAULT: u64 = 1024 * 1024;
pub(crate) const OBJECT_STORE_COALESCE_PARALLEL: usize = 10;
pub async fn coalesce_ranges<F, E, Fut>(
ranges: &[Range<u64>],
fetch: F,
coalesce: u64,
) -> Result<Vec<Bytes>, E>
where
F: Send + FnMut(Range<u64>) -> Fut,
E: Send + From<TryFromIntError>,
Fut: std::future::Future<Output = Result<Bytes, E>> + Send,
{
let fetch_ranges = merge_ranges(ranges, coalesce);
let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned())
.map(fetch)
.buffered(OBJECT_STORE_COALESCE_PARALLEL)
.try_collect()
.await?;
ranges
.iter()
.map(|range| {
let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
let fetch_range = &fetch_ranges[idx];
let fetch_bytes = &fetched[idx];
let start = range.start - fetch_range.start;
let end = range.end - fetch_range.start;
Ok(fetch_bytes
.slice(usize::try_from(start)?..usize::try_from(end)?.min(fetch_bytes.len())))
})
.collect::<Result<_, _>>()
}
fn merge_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
if ranges.is_empty() {
return vec![];
}
let mut ranges = ranges.to_vec();
ranges.sort_unstable_by_key(|range| range.start);
let mut ret = Vec::with_capacity(ranges.len());
let mut start_idx = 0;
let mut end_idx = 1;
while start_idx != ranges.len() {
let mut range_end = ranges[start_idx].end;
while end_idx != ranges.len()
&& ranges[end_idx]
.start
.checked_sub(range_end)
.map(|delta| delta <= coalesce)
.unwrap_or(true)
{
range_end = range_end.max(ranges[end_idx].end);
end_idx += 1;
}
let start = ranges[start_idx].start;
let end = range_end;
ret.push(start..end);
start_idx = end_idx;
end_idx += 1;
}
ret
}
pub struct EndianReader<R> {
pub(super) reader: R,
pub byte_order: ByteOrder,
}
impl<R: io::Read> io::Read for EndianReader<R> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.reader.read(buf)
}
}
impl<R: io::Seek> io::Seek for EndianReader<R> {
#[inline]
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
self.reader.seek(pos)
}
}
macro_rules! read_fn {
($name:ident, $type:ty) => {
#[inline(always)]
pub fn $name(&mut self) -> Result<$type, io::Error> {
let mut n = [0u8; std::mem::size_of::<$type>()];
self.read_exact(&mut n)?;
Ok(match self.byte_order() {
ByteOrder::LittleEndian => <$type>::from_le_bytes(n),
ByteOrder::BigEndian => <$type>::from_be_bytes(n),
})
}
};
}
impl<R: io::Read> EndianReader<R> {
pub fn wrap(reader: R, byte_order: ByteOrder) -> Self {
EndianReader { reader, byte_order }
}
fn byte_order(&self) -> ByteOrder {
self.byte_order
}
read_fn!(read_u8, u8);
read_fn!(read_i8, i8);
read_fn!(read_u16, u16);
read_fn!(read_i16, i16);
read_fn!(read_u32, u32);
read_fn!(read_i32, i32);
read_fn!(read_u64, u64);
read_fn!(read_i64, i64);
read_fn!(read_f32, f32);
read_fn!(read_f64, f64);
}
pub type DeflateReader<R> = flate2::read::ZlibDecoder<R>;
pub struct LZWReader<R: Read> {
reader: BufReader<Take<R>>,
decoder: weezl::decode::Decoder,
}
impl<R: Read> LZWReader<R> {
pub fn new(reader: R, compressed_length: usize) -> LZWReader<R> {
Self {
reader: BufReader::with_capacity(
(32 * 1024).min(compressed_length),
reader.take(u64::try_from(compressed_length).unwrap()),
),
decoder: weezl::decode::Decoder::with_tiff_size_switch(weezl::BitOrder::Msb, 8),
}
}
}
impl<R: Read> Read for LZWReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
loop {
let result = self.decoder.decode_bytes(self.reader.fill_buf()?, buf);
self.reader.consume(result.consumed_in);
match result.status {
Ok(weezl::LzwStatus::Ok) => {
if result.consumed_out == 0 {
continue;
} else {
return Ok(result.consumed_out);
}
}
Ok(weezl::LzwStatus::NoProgress) => {
assert_eq!(result.consumed_in, 0);
assert_eq!(result.consumed_out, 0);
assert!(self.reader.buffer().is_empty());
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"no lzw end code found",
));
}
Ok(weezl::LzwStatus::Done) => {
return Ok(result.consumed_out);
}
Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidData, err)),
}
}
}
}
enum PackBitsReaderState {
Header,
Literal,
Repeat { value: u8 },
}
pub struct PackBitsReader<R: Read> {
reader: Take<R>,
state: PackBitsReaderState,
count: usize,
}
impl<R: Read> PackBitsReader<R> {
pub fn new(reader: R, length: u64) -> Self {
Self {
reader: reader.take(length),
state: PackBitsReaderState::Header,
count: 0,
}
}
}
impl<R: Read> Read for PackBitsReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
while let PackBitsReaderState::Header = self.state {
if self.reader.limit() == 0 {
return Ok(0);
}
let mut header: [u8; 1] = [0];
self.reader.read_exact(&mut header)?;
let h = header[0] as i8;
if (-127..=-1).contains(&h) {
let mut data: [u8; 1] = [0];
self.reader.read_exact(&mut data)?;
self.state = PackBitsReaderState::Repeat { value: data[0] };
self.count = (1 - h as isize) as usize;
} else if h >= 0 {
self.state = PackBitsReaderState::Literal;
self.count = h as usize + 1;
} else {
}
}
let length = buf.len().min(self.count);
let actual = match self.state {
PackBitsReaderState::Literal => self.reader.read(&mut buf[..length])?,
PackBitsReaderState::Repeat { value } => {
for b in &mut buf[..length] {
*b = value;
}
length
}
PackBitsReaderState::Header => unreachable!(),
};
self.count -= actual;
if self.count == 0 {
self.state = PackBitsReaderState::Header;
}
Ok(actual)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_packbits() {
let encoded = vec![
0xFE, 0xAA, 0x02, 0x80, 0x00, 0x2A, 0xFD, 0xAA, 0x03, 0x80, 0x00, 0x2A, 0x22, 0xF7,
0xAA,
];
let encoded_len = encoded.len();
let buff = io::Cursor::new(encoded);
let mut decoder = PackBitsReader::new(buff, encoded_len as u64);
let mut decoded = Vec::new();
decoder.read_to_end(&mut decoded).unwrap();
let expected = vec![
0xAA, 0xAA, 0xAA, 0x80, 0x00, 0x2A, 0xAA, 0xAA, 0xAA, 0xAA, 0x80, 0x00, 0x2A, 0x22,
0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
];
assert_eq!(decoded, expected);
}
}