use std::io;
use zstd::zstd_safe;
use crate::{
Result, SequencingRecord,
cbq::core::{BlockHeader, ColumnarBlock, FileHeader, Index, IndexFooter, IndexHeader},
};
pub struct ColumnarBlockWriter<W: io::Write> {
inner: W,
block: ColumnarBlock,
headers: Vec<BlockHeader>,
cctx: zstd_safe::CCtx<'static>,
}
impl<W: io::Write + Clone> Clone for ColumnarBlockWriter<W> {
fn clone(&self) -> Self {
let mut writer = Self {
inner: self.inner.clone(),
block: self.block.clone(),
headers: self.headers.clone(),
cctx: zstd_safe::CCtx::create(),
};
writer
.init_compressor()
.expect("Failed to set compression level in writer clone");
writer
}
}
impl<W: io::Write> ColumnarBlockWriter<W> {
pub fn new(inner: W, header: FileHeader) -> Result<Self> {
let mut writer = Self::new_headless(inner, header)?;
writer.inner.write_all(header.as_bytes())?;
Ok(writer)
}
pub fn new_headless(inner: W, header: FileHeader) -> Result<Self> {
let mut writer = Self {
inner,
block: ColumnarBlock::new(header),
headers: Vec::default(),
cctx: zstd_safe::CCtx::create(),
};
writer.init_compressor()?;
Ok(writer)
}
fn init_compressor(&mut self) -> Result<()> {
self.cctx
.set_parameter(zstd_safe::CParameter::CompressionLevel(
self.block.header.compression_level as i32,
))
.map_err(|e| io::Error::other(zstd_safe::get_error_name(e)))?;
self.cctx
.set_parameter(zstd_safe::CParameter::EnableLongDistanceMatching(true))
.map_err(|e| io::Error::other(zstd_safe::get_error_name(e)))?;
Ok(())
}
pub fn header(&self) -> FileHeader {
self.block.header
}
pub fn usage(&self) -> f64 {
self.block.usage()
}
pub fn push(&mut self, record: SequencingRecord) -> Result<bool> {
if !self.block.can_fit(&record) {
self.flush()?;
}
self.block.push(record)?;
Ok(true)
}
pub fn flush(&mut self) -> Result<()> {
if let Some(header) = self.block.flush_to(&mut self.inner, &mut self.cctx)? {
self.headers.push(header);
}
Ok(())
}
pub fn finish(&mut self) -> Result<()> {
self.flush()?;
self.write_index()?;
Ok(())
}
fn write_index(&mut self) -> Result<()> {
let index = Index::from_block_headers(&self.headers);
let z_index = index.encoded()?;
let header = IndexHeader::new(index.size(), z_index.len() as u64);
let footer = IndexFooter::new(z_index.len() as u64);
{
self.inner.write_all(header.as_bytes())?;
self.inner.write_all(&z_index)?;
self.inner.write_all(footer.as_bytes())?;
}
Ok(())
}
pub fn ingest(&mut self, other: &mut ColumnarBlockWriter<Vec<u8>>) -> Result<()> {
self.inner.write_all(other.inner_data())?;
self.headers.extend_from_slice(&other.headers);
if self.block.can_ingest(&other.block) {
self.block.take_incomplete(&other.block)?;
} else {
self.flush()?;
self.block.take_incomplete(&other.block)?;
}
other.clear_inner_data();
Ok(())
}
}
impl ColumnarBlockWriter<Vec<u8>> {
#[must_use]
pub fn inner_data(&self) -> &[u8] {
&self.inner
}
pub fn clear_inner_data(&mut self) {
self.inner.clear();
self.headers.clear();
self.block.clear();
}
#[must_use]
pub fn bytes_written(&self) -> usize {
self.inner.len()
}
}