use std::io;
use common::BinarySerializable;
use super::compressors::Compressor;
use super::StoreReader;
use crate::directory::WritePtr;
use crate::schema::document::{BinaryDocumentSerializer, Document};
use crate::schema::Schema;
use crate::store::store_compressor::BlockCompressor;
use crate::DocId;
pub struct StoreWriter {
compressor: Compressor,
block_size: usize,
num_docs_in_current_block: DocId,
current_block: Vec<u8>,
doc_pos: Vec<u32>,
block_compressor: BlockCompressor,
}
impl StoreWriter {
pub fn new(
writer: WritePtr,
compressor: Compressor,
block_size: usize,
dedicated_thread: bool,
) -> io::Result<StoreWriter> {
let block_compressor = BlockCompressor::new(compressor, writer, dedicated_thread)?;
Ok(StoreWriter {
compressor,
block_size,
num_docs_in_current_block: 0,
doc_pos: Vec::new(),
current_block: Vec::new(),
block_compressor,
})
}
pub(crate) fn compressor(&self) -> Compressor {
self.compressor
}
pub fn mem_usage(&self) -> usize {
self.current_block.capacity() + self.doc_pos.capacity() * std::mem::size_of::<u32>()
}
fn check_flush_block(&mut self) -> io::Result<()> {
let index_len = self.doc_pos.len() * std::mem::size_of::<usize>();
if self.current_block.len() + index_len > self.block_size {
self.send_current_block_to_compressor()?;
}
Ok(())
}
fn send_current_block_to_compressor(&mut self) -> io::Result<()> {
if self.current_block.is_empty() {
return Ok(());
}
let size_of_u32 = std::mem::size_of::<u32>();
self.current_block
.reserve((self.doc_pos.len() + 1) * size_of_u32);
for pos in self.doc_pos.iter() {
pos.serialize(&mut self.current_block)?;
}
(self.doc_pos.len() as u32).serialize(&mut self.current_block)?;
self.block_compressor
.compress_block_and_write(&self.current_block, self.num_docs_in_current_block)?;
self.doc_pos.clear();
self.current_block.clear();
self.num_docs_in_current_block = 0;
Ok(())
}
pub fn store<D: Document>(&mut self, document: &D, schema: &Schema) -> io::Result<()> {
self.doc_pos.push(self.current_block.len() as u32);
let mut serializer = BinaryDocumentSerializer::new(&mut self.current_block, schema);
serializer.serialize_doc(document)?;
self.num_docs_in_current_block += 1;
self.check_flush_block()?;
Ok(())
}
pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> {
self.doc_pos.push(self.current_block.len() as u32);
self.current_block.extend_from_slice(serialized_document);
self.num_docs_in_current_block += 1;
self.check_flush_block()?;
Ok(())
}
pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> {
self.send_current_block_to_compressor()?;
self.block_compressor.stack_reader(store_reader)?;
Ok(())
}
pub fn close(mut self) -> io::Result<()> {
self.send_current_block_to_compressor()?;
self.block_compressor.close()?;
Ok(())
}
}