use std::sync::atomic::AtomicU64;
use crate::{CompressionLevel, Error, Flush, Processor, Result};
use once_cell::sync::Lazy;
use xz2::stream::Status;
pub const DEFAULT_XZ_MEM_SIZE: u64 = 1_000_000_000;
static DECOMPRESS_XZ_MEM_SIZE: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(DEFAULT_XZ_MEM_SIZE));
pub fn set_default_decompress_mem_size(mem_size: u64) {
DECOMPRESS_XZ_MEM_SIZE.swap(mem_size, std::sync::atomic::Ordering::Relaxed);
}
pub type XzCompressWriter<W> = crate::io::ProcessorWriter<XzCompress, W>;
pub type XzDecompressReader<R> = crate::io::ProcessorReader<XzDecompress, R>;
#[cfg(feature = "tokio")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "tokio")))]
pub type AsyncXzDecompressReader<R> = crate::io::AsyncProcessorReader<XzDecompress, R>;
#[cfg(feature = "tokio")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "tokio")))]
pub type AsyncXzCompressWriter<W> = crate::io::AsyncProcessorWriter<XzCompress, W>;
pub struct XzCompress {
inner: xz2::stream::Stream,
preset: u32,
}
impl XzCompress {
pub fn new(preset: u32) -> Result<Self> {
let inner = xz2::stream::Stream::new_easy_encoder(preset, xz2::stream::Check::Crc64)
.map_err(|e| Error::CompressError(e.to_string()))?;
Ok(XzCompress { inner, preset })
}
}
impl Default for XzCompress {
fn default() -> Self {
Self::new(CompressionLevel::default().xz()).unwrap()
}
}
impl Processor for XzCompress {
fn total_in(&self) -> u64 {
self.inner.total_in()
}
fn total_out(&self) -> u64 {
self.inner.total_out()
}
fn process(&mut self, input: &[u8], output: &mut [u8], flush: Flush) -> Result<crate::Status> {
match self.inner.process(
input,
output,
match flush {
Flush::Finish => xz2::stream::Action::Finish,
Flush::None => xz2::stream::Action::Run,
},
) {
Ok(Status::Ok) | Ok(Status::MemNeeded) => Ok(crate::Status::Ok),
Ok(Status::GetCheck) => Err(Error::DecompressError("GetCheck".to_string())),
Ok(Status::StreamEnd) => Ok(crate::Status::StreamEnd),
Err(e) => Err(Error::DecompressError(e.to_string())),
}
}
fn reset(&mut self) {
self.inner =
xz2::stream::Stream::new_easy_encoder(self.preset, xz2::stream::Check::Crc64).unwrap();
}
}
pub struct XzDecompress {
memlimit: u64,
inner: xz2::stream::Stream,
}
impl XzDecompress {
pub fn new(memlimit: u64) -> Result<Self> {
Ok(Self {
memlimit,
inner: xz2::stream::Stream::new_stream_decoder(memlimit, xz2::stream::TELL_NO_CHECK)
.map_err(|e| Error::CompressError(e.to_string()))?,
})
}
}
impl Default for XzDecompress {
fn default() -> Self {
Self::new(DECOMPRESS_XZ_MEM_SIZE.load(std::sync::atomic::Ordering::Relaxed)).unwrap()
}
}
impl Processor for XzDecompress {
fn total_in(&self) -> u64 {
self.inner.total_in()
}
fn total_out(&self) -> u64 {
self.inner.total_out()
}
fn process(&mut self, input: &[u8], output: &mut [u8], flush: Flush) -> Result<crate::Status> {
match self.inner.process(input, output, xz2::stream::Action::Run) {
Ok(Status::Ok) | Ok(Status::MemNeeded) => match flush {
Flush::Finish => Err(Error::MoreDataRequired),
Flush::None => Ok(crate::Status::Ok),
},
Ok(Status::GetCheck) => Err(Error::DecompressError("GetCheck".to_string())),
Ok(Status::StreamEnd) => Ok(crate::Status::StreamEnd),
Err(e) => Err(Error::DecompressError(e.to_string())),
}
}
fn reset(&mut self) {
self.inner =
xz2::stream::Stream::new_stream_decoder(self.memlimit, xz2::stream::TELL_NO_CHECK)
.unwrap();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::test_compress;
use crate::tests::test_decompress;
use anyhow::Context;
#[test]
fn test_xz_decompress() -> anyhow::Result<()> {
let data = include_bytes!("../testfiles/pg2701.txt.xz");
let decompress = XzDecompress::new(10_000_000)?;
test_decompress(decompress, &data[..])?;
Ok(())
}
#[test]
fn test_xz() -> crate::error::Result<()> {
let data = include_bytes!("../testfiles/pg2701.txt.xz");
test_decompress(XzDecompress::new(10_000_000)?, data)?;
Ok(())
}
#[test]
fn test_xz_multistream() -> crate::error::Result<()> {
let data = include_bytes!("../testfiles/pg2701.txt.multistream.xz");
test_decompress(XzDecompress::new(10_000_000)?, data)?;
Ok(())
}
#[test]
fn test_xz_compress() -> anyhow::Result<()> {
test_compress(
|| XzCompress::new(6).context("zstd compress"),
|| XzDecompress::new(10_000_000).context("zstd decompress"),
)
}
}