use std::cmp::min;
use std::io::Write;
use crate::chunk_config::ChunkConfig;
use crate::data_types::{Number, NumberType};
use crate::errors::PcoResult;
use crate::progress::Progress;
use crate::standalone::compressor::FileCompressor;
use crate::standalone::decompressor::{DecompressorItem, FileDecompressor};
use crate::{PagingSpec, FULL_BATCH_N};
pub fn simple_compress_into<T: Number, W: Write>(
nums: &[T],
config: &ChunkConfig,
mut dst: W,
) -> PcoResult<W> {
let file_compressor = FileCompressor::default()
.with_n_hint(nums.len())
.with_uniform_type(Some(NumberType::new::<T>()));
dst = file_compressor.write_header(dst)?;
let n_per_page = config.paging_spec.n_per_page(nums.len())?;
let mut start = 0;
let mut this_chunk_config = config.clone();
for &page_n in &n_per_page {
let end = start + page_n;
this_chunk_config.paging_spec = PagingSpec::Exact(vec![page_n]);
let mut chunk_compressor =
file_compressor.chunk_compressor(&nums[start..end], &this_chunk_config)?;
dst = chunk_compressor.write_chunk(dst)?;
start = end;
}
dst = file_compressor.write_footer(dst)?;
Ok(dst)
}
pub fn simple_compress<T: Number>(nums: &[T], config: &ChunkConfig) -> PcoResult<Vec<u8>> {
let mut dst = Vec::new();
let file_compressor = FileCompressor::default().with_n_hint(nums.len());
file_compressor.write_header(&mut dst)?;
let n_per_page = config.paging_spec.n_per_page(nums.len())?;
let mut start = 0;
let mut this_chunk_config = config.clone();
let mut hinted_size = false;
for &page_n in &n_per_page {
let end = start + page_n;
this_chunk_config.paging_spec = PagingSpec::Exact(vec![page_n]);
let mut chunk_compressor =
file_compressor.chunk_compressor(&nums[start..end], &this_chunk_config)?;
if !hinted_size {
let file_size_hint =
chunk_compressor.chunk_size_hint() as f64 * nums.len() as f64 / page_n as f64;
dst.reserve_exact(file_size_hint as usize + 10);
hinted_size = true;
}
chunk_compressor.write_chunk(&mut dst)?;
start = end;
}
file_compressor.write_footer(&mut dst)?;
Ok(dst)
}
pub fn simple_decompress_into<T: Number>(src: &[u8], mut dst: &mut [T]) -> PcoResult<Progress> {
let (file_decompressor, mut src) = FileDecompressor::new(src)?;
let mut incomplete_batch_buffer = vec![T::default(); FULL_BATCH_N];
let mut progress = Progress::default();
loop {
let maybe_cd = file_decompressor.chunk_decompressor(src)?;
let mut chunk_decompressor;
match maybe_cd {
DecompressorItem::Chunk(cd) => chunk_decompressor = cd,
DecompressorItem::EndOfData(_) => {
progress.finished = true;
break;
}
}
let (limit, is_limited) = if dst.len() < chunk_decompressor.n() {
(dst.len() / FULL_BATCH_N * FULL_BATCH_N, true)
} else {
(dst.len(), false)
};
let new_progress = chunk_decompressor.read(&mut dst[..limit])?;
dst = &mut dst[new_progress.n_processed..];
progress.n_processed += new_progress.n_processed;
if !dst.is_empty() {
let new_progress = chunk_decompressor.read(&mut incomplete_batch_buffer)?;
let n_processed = min(dst.len(), new_progress.n_processed);
dst[..n_processed].copy_from_slice(&incomplete_batch_buffer[..n_processed]);
dst = &mut dst[n_processed..];
progress.n_processed += n_processed;
}
if dst.is_empty() && is_limited {
break;
}
src = chunk_decompressor.into_src();
}
Ok(progress)
}
pub fn simple_decompress<T: Number>(src: &[u8]) -> PcoResult<Vec<T>> {
let (file_decompressor, src) = FileDecompressor::new(src)?;
file_decompressor.simple_decompress(src)
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use super::*;
use crate::chunk_config::DeltaSpec;
#[test]
fn test_simple_compress_into() -> PcoResult<()> {
let nums = (0..100).map(|x| x as i32).collect::<Vec<_>>();
let config = &ChunkConfig {
delta_spec: DeltaSpec::NoOp,
..Default::default()
};
let mut buffer = [77_u8];
assert!(simple_compress_into(&nums, config, buffer.as_mut_slice()).is_err());
let mut buffer = vec![0; 1000];
let cursor = simple_compress_into(&nums, config, Cursor::new(&mut buffer))?;
let bytes_written = cursor.position() as usize;
assert!(bytes_written >= 10);
for i in bytes_written..buffer.len() {
assert_eq!(buffer[i], 0);
}
let decompressed = simple_decompress::<i32>(&buffer[..bytes_written])?;
assert_eq!(decompressed, nums);
Ok(())
}
#[test]
fn test_simple_decompress_into() -> PcoResult<()> {
let max_n = 600;
let nums = (0..max_n).map(|x| x as i32).collect::<Vec<i32>>();
let src = simple_compress(
&nums,
&ChunkConfig {
compression_level: 0,
delta_spec: DeltaSpec::NoOp,
paging_spec: PagingSpec::Exact(vec![300, 300]),
..Default::default()
},
)?;
for possibly_overshooting_n in [0, 1, 256, 299, 300, 301, 556, 600, 601] {
let mut dst = vec![0; possibly_overshooting_n];
let progress = simple_decompress_into(&src, &mut dst)?;
let n = min(possibly_overshooting_n, max_n);
assert_eq!(progress.n_processed, n);
assert_eq!(progress.finished, n >= nums.len());
assert_eq!(
&dst[..n],
&(0..n).map(|x| x as i32).collect::<Vec<i32>>(),
"n={}",
n
);
}
Ok(())
}
}