use std::io::{Read, Seek, SeekFrom};
use rayon::prelude::*;
use crate::byteslice::ChunksIrregularMut;
use crate::las::selective::DecompressionSelection;
use crate::laszip::chunk_table::{ChunkTable, ChunkTableEntry};
use crate::laszip::details::record_decompressor_from_laz_items;
use crate::laszip::CompressorType;
use crate::{LasZipError, LazVlr};
pub struct ParLasZipDecompressor<R> {
vlr: LazVlr,
chunk_table: ChunkTable,
last_chunk_read: isize,
start_of_data: u64,
rest: std::io::Cursor<Vec<u8>>,
internal_buffer: Vec<u8>,
source: R,
selection: DecompressionSelection,
}
impl<R: Read + Seek> ParLasZipDecompressor<R> {
pub fn new(source: R, vlr: LazVlr) -> crate::Result<Self> {
Self::selective(source, vlr, DecompressionSelection::all())
}
pub fn selective(
mut source: R,
vlr: LazVlr,
selection: DecompressionSelection,
) -> crate::Result<Self> {
if vlr.compressor != CompressorType::PointWiseChunked
&& vlr.compressor != CompressorType::LayeredChunked
{
return Err(LasZipError::UnsupportedCompressorType(vlr.compressor));
}
let chunk_table = ChunkTable::read_from(&mut source, &vlr)?;
let start_of_data = source.seek(SeekFrom::Current(0))?;
let biggest_chunk = chunk_table
.as_ref()
.into_iter()
.map(|entry| entry.point_count)
.max()
.unwrap_or(0);
let vec = Vec::<u8>::with_capacity(biggest_chunk as usize);
let rest = std::io::Cursor::new(vec);
Ok(Self {
source,
vlr,
chunk_table,
rest,
internal_buffer: vec![],
last_chunk_read: -1,
start_of_data,
selection,
})
}
pub fn decompress_many(&mut self, out: &mut [u8]) -> crate::Result<()> {
let point_size = self.vlr.items_size() as usize;
assert_eq!(out.len() % point_size, 0);
let num_bytes_in_rest = self.rest.get_ref().len() - self.rest.position() as usize;
debug_assert!(num_bytes_in_rest % point_size == 0);
let (out_rest, out_decompress) = if num_bytes_in_rest >= out.len() {
(out, &mut [] as &mut [u8])
} else {
out.split_at_mut(num_bytes_in_rest)
};
if !out_rest.is_empty() {
self.rest.read(out_rest)?;
}
if out_decompress.is_empty() {
return Ok(());
}
debug_assert_eq!(
self.rest.position() as usize,
self.rest.get_mut().len(),
"Rest buffer not completely consumed"
);
self.rest.get_mut().clear();
self.rest.set_position(0);
let num_requested_points_left = out_decompress.len() / point_size;
let start_index = (self.last_chunk_read + 1) as usize;
let mut num_points = 0usize;
let mut num_chunks_to_decompress = 0;
let mut num_bytes_to_read = 0usize;
for entry in &self.chunk_table[start_index..] {
num_points += entry.point_count as usize;
num_chunks_to_decompress += 1;
num_bytes_to_read += entry.byte_count as usize;
if num_points >= num_requested_points_left {
break;
}
}
let end_index = start_index + num_chunks_to_decompress;
debug_assert!(num_chunks_to_decompress >= 1);
debug_assert!(num_points >= num_requested_points_left);
self.internal_buffer.resize(num_bytes_to_read, 0u8);
self.source.read_exact(&mut self.internal_buffer)?;
let head_chunks_table = &self.chunk_table[start_index..end_index - 1];
let tail_chunk_entry = self.chunk_table[end_index - 1];
let num_bytes_in_head_chunks = num_bytes_to_read - tail_chunk_entry.byte_count as usize;
let num_points_in_head_chunks = num_points - tail_chunk_entry.point_count as usize;
let (head_chunks, tail_chunk) = self.internal_buffer.split_at(num_bytes_in_head_chunks);
let (head_output, tail_output) =
out_decompress.split_at_mut(num_points_in_head_chunks * point_size);
let rest = &mut self.rest;
let vlr = &self.vlr;
let selection = self.selection;
let chunk_table_len = self.chunk_table.len();
let (res1, res2) = rayon::join(
|| -> crate::Result<()> {
par_decompress_selective(
head_chunks,
head_output,
&vlr,
head_chunks_table,
selection,
)
},
|| -> crate::Result<()> {
let mut last_src = std::io::Cursor::new(tail_chunk);
let mut decompressor =
record_decompressor_from_laz_items(&vlr.items(), &mut last_src)?;
decompressor.set_selection(selection);
decompressor.decompress_many(tail_output)?;
let num_bytes_left =
(tail_chunk_entry.point_count as usize * point_size) - tail_output.len();
if !vlr.uses_variable_size_chunks() && end_index == chunk_table_len {
rest.get_mut().resize(num_bytes_left, 0u8);
let num_actually_decompressed =
decompressor.decompress_until_end_of_file(rest.get_mut())?;
rest.get_mut().resize(num_actually_decompressed, 0u8);
} else {
rest.get_mut().resize(num_bytes_left, 0u8);
decompressor.decompress_many(rest.get_mut())?;
}
rest.set_position(0);
Ok(())
},
);
res1?;
res2?;
self.last_chunk_read += num_chunks_to_decompress as isize;
Ok(())
}
pub fn seek(&mut self, index: u64) -> crate::Result<()> {
self.rest.set_position(0);
self.rest.get_mut().clear();
let chunk_info = self.chunk_table.chunk_of_point(index);
let (chunk_of_point, byte_offset) = match chunk_info {
Some(v) => v,
None => {
let _ = self.source.seek(SeekFrom::End(0))?;
return Ok(());
}
};
let start_of_chunk_pos = self.start_of_data + byte_offset;
self.source.seek(SeekFrom::Start(start_of_chunk_pos))?;
self.internal_buffer
.resize(self.chunk_table[chunk_of_point].byte_count as usize, 0u8);
self.source.read(&mut self.internal_buffer)?;
let num_bytes = match self.vlr.num_bytes_in_decompressed_chunk() {
crate::laszip::vlr::DecompressedChunkSize::Fixed { num_bytes } => num_bytes,
crate::laszip::vlr::DecompressedChunkSize::Variable {
num_bytes_per_point,
} => num_bytes_per_point * self.chunk_table[chunk_of_point].point_count as usize,
};
self.rest.get_mut().resize(num_bytes, 0u8);
let mut decompressor = record_decompressor_from_laz_items(
self.vlr.items(),
std::io::Cursor::new(&self.internal_buffer),
)?;
let is_last_chunk = chunk_of_point == (self.chunk_table.len() - 1);
if is_last_chunk {
let num_bytes_decompressed =
decompressor.decompress_until_end_of_file(self.rest.get_mut())?;
let num_points_in_last_chunk = num_bytes_decompressed / self.vlr.items_size() as usize;
let pos_in_chunk = index % self.vlr.chunk_size() as u64;
if pos_in_chunk as usize >= num_points_in_last_chunk as usize {
self.rest.set_position(self.rest.get_ref().len() as u64);
return Ok(());
}
} else {
decompressor.decompress_many(self.rest.get_mut())?;
}
let pos_in_chunk = index % self.vlr.chunk_size() as u64;
self.rest.set_position(pos_in_chunk * self.vlr.items_size());
self.last_chunk_read = chunk_of_point as isize;
Ok(())
}
pub fn into_inner(self) -> R {
self.source
}
pub fn get_mut(&mut self) -> &mut R {
&mut self.source
}
pub fn get(&self) -> &R {
&self.source
}
}
impl<R: Read + Seek> crate::laszip::LazDecompressor for ParLasZipDecompressor<R> {
fn decompress_one(&mut self, point: &mut [u8]) -> crate::Result<()> {
self.decompress_many(point)
}
fn decompress_many(&mut self, points: &mut [u8]) -> crate::Result<()> {
self.decompress_many(points)
}
fn seek(&mut self, index: u64) -> crate::Result<()> {
self.seek(index)
}
}
pub fn par_decompress_buffer(
compressed_points_data: &[u8],
decompressed_points: &mut [u8],
laz_vlr: &LazVlr,
) -> crate::Result<()> {
debug_assert_eq!(decompressed_points.len() % laz_vlr.items_size() as usize, 0);
let mut cursor = std::io::Cursor::new(compressed_points_data);
let chunk_table = ChunkTable::read_from(&mut cursor, &laz_vlr)?;
let num_point_bytes = chunk_table
.as_ref()
.iter()
.map(|entry| entry.byte_count as usize)
.sum::<usize>();
let compressed_points = &compressed_points_data[std::mem::size_of::<i64>()..num_point_bytes];
par_decompress(
compressed_points,
decompressed_points,
laz_vlr,
chunk_table.as_ref(),
)
}
pub fn par_decompress_selective(
compressed_points: &[u8],
decompressed_points: &mut [u8],
laz_vlr: &LazVlr,
chunk_table: &[ChunkTableEntry],
selection: DecompressionSelection,
) -> crate::Result<()> {
use crate::byteslice::ChunksIrregular;
let sizes = chunk_table.iter().map(|entry| entry.byte_count as usize);
let counts = chunk_table
.iter()
.map(|entry| (entry.point_count * laz_vlr.items_size()) as usize);
let input_chunks_iter = ChunksIrregular::new(compressed_points, sizes);
let output_chunks_iter = ChunksIrregularMut::new(decompressed_points, counts);
let decompression_jobs: Vec<(&[u8], &mut [u8])> =
input_chunks_iter.zip(output_chunks_iter).collect();
decompression_jobs
.into_par_iter()
.map(|(chunk_in, chunk_out)| {
let src = std::io::Cursor::new(chunk_in);
let mut record_decompressor = record_decompressor_from_laz_items(laz_vlr.items(), src)?;
record_decompressor.set_selection(selection);
record_decompressor.decompress_many(chunk_out)?;
Ok(())
})
.collect::<crate::Result<()>>()?;
Ok(())
}
pub fn par_decompress(
compressed_points: &[u8],
decompressed_points: &mut [u8],
laz_vlr: &LazVlr,
chunk_table: &[ChunkTableEntry],
) -> crate::Result<()> {
par_decompress_selective(
compressed_points,
decompressed_points,
laz_vlr,
chunk_table,
DecompressionSelection::all(),
)
}