use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use std::io::{self, Write};
use std::sync::Arc;
use crate::DocId;
use crate::compression::CompressionDict;
#[cfg(feature = "native")]
use crate::compression::CompressionLevel;
use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
use crate::dsl::{Document, Schema};
const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2;
pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
#[cfg(feature = "native")]
const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
pub fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
}
#[cfg(feature = "native")]
struct CompressedBlock {
seq: usize,
first_doc_id: DocId,
num_docs: u32,
compressed: Vec<u8>,
}
#[cfg(feature = "native")]
pub struct EagerParallelStoreWriter<'a> {
writer: &'a mut dyn Write,
block_buffer: Vec<u8>,
compressed_blocks: Vec<CompressedBlock>,
pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
next_seq: usize,
next_doc_id: DocId,
block_first_doc: DocId,
dict: Option<Arc<CompressionDict>>,
compression_level: CompressionLevel,
}
#[cfg(feature = "native")]
impl<'a> EagerParallelStoreWriter<'a> {
pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
}
pub fn with_compression_level(
writer: &'a mut dyn Write,
_num_threads: usize,
compression_level: CompressionLevel,
) -> Self {
Self {
writer,
block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
compressed_blocks: Vec::new(),
pending_handles: Vec::new(),
next_seq: 0,
next_doc_id: 0,
block_first_doc: 0,
dict: None,
compression_level,
}
}
pub fn with_dict(
writer: &'a mut dyn Write,
dict: CompressionDict,
_num_threads: usize,
) -> Self {
Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
}
pub fn with_dict_and_level(
writer: &'a mut dyn Write,
dict: CompressionDict,
_num_threads: usize,
compression_level: CompressionLevel,
) -> Self {
Self {
writer,
block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
compressed_blocks: Vec::new(),
pending_handles: Vec::new(),
next_seq: 0,
next_doc_id: 0,
block_first_doc: 0,
dict: Some(Arc::new(dict)),
compression_level,
}
}
pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
let doc_id = self.next_doc_id;
self.next_doc_id += 1;
let doc_bytes = serialize_document(doc, schema)?;
self.block_buffer
.write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
self.block_buffer.extend_from_slice(&doc_bytes);
if self.block_buffer.len() >= STORE_BLOCK_SIZE {
self.spawn_compression();
}
Ok(doc_id)
}
fn spawn_compression(&mut self) {
if self.block_buffer.is_empty() {
return;
}
let num_docs = self.next_doc_id - self.block_first_doc;
let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
let seq = self.next_seq;
let first_doc_id = self.block_first_doc;
let dict = self.dict.clone();
self.next_seq += 1;
self.block_first_doc = self.next_doc_id;
let level = self.compression_level;
let handle = std::thread::spawn(move || {
let compressed = if let Some(ref d) = dict {
crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
} else {
crate::compression::compress(&data, level).expect("compression failed")
};
CompressedBlock {
seq,
first_doc_id,
num_docs,
compressed,
}
});
self.pending_handles.push(handle);
}
fn collect_completed(&mut self) {
let mut remaining = Vec::new();
for handle in self.pending_handles.drain(..) {
if handle.is_finished() {
if let Ok(block) = handle.join() {
self.compressed_blocks.push(block);
}
} else {
remaining.push(handle);
}
}
self.pending_handles = remaining;
}
pub fn finish(mut self) -> io::Result<u32> {
self.spawn_compression();
self.collect_completed();
for handle in self.pending_handles.drain(..) {
if let Ok(block) = handle.join() {
self.compressed_blocks.push(block);
}
}
if self.compressed_blocks.is_empty() {
let data_end_offset = 0u64;
self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u64::<LittleEndian>(data_end_offset)?;
self.writer.write_u64::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
return Ok(0);
}
self.compressed_blocks.sort_by_key(|b| b.seq);
let mut index = Vec::with_capacity(self.compressed_blocks.len());
let mut current_offset = 0u64;
for block in &self.compressed_blocks {
index.push(StoreBlockIndex {
first_doc_id: block.first_doc_id,
offset: current_offset,
length: block.compressed.len() as u32,
num_docs: block.num_docs,
});
self.writer.write_all(&block.compressed)?;
current_offset += block.compressed.len() as u64;
}
let data_end_offset = current_offset;
let dict_offset = if let Some(ref dict) = self.dict {
let offset = current_offset;
let dict_bytes = dict.as_bytes();
self.writer
.write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
self.writer.write_all(dict_bytes)?;
Some(offset)
} else {
None
};
self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
for entry in &index {
self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
self.writer.write_u64::<LittleEndian>(entry.offset)?;
self.writer.write_u32::<LittleEndian>(entry.length)?;
self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
}
self.writer.write_u64::<LittleEndian>(data_end_offset)?;
self.writer
.write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
self.writer
.write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
Ok(self.next_doc_id)
}
}
#[derive(Debug, Clone)]
struct StoreBlockIndex {
first_doc_id: DocId,
offset: u64,
length: u32,
num_docs: u32,
}
pub struct AsyncStoreReader {
data_slice: LazyFileSlice,
index: Vec<StoreBlockIndex>,
num_docs: u32,
dict: Option<CompressionDict>,
cache: RwLock<StoreBlockCache>,
}
struct StoreBlockCache {
blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
access_order: Vec<DocId>,
max_blocks: usize,
}
impl StoreBlockCache {
fn new(max_blocks: usize) -> Self {
Self {
blocks: FxHashMap::default(),
access_order: Vec::new(),
max_blocks,
}
}
fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
if let Some(block) = self.blocks.get(&first_doc_id) {
if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
self.access_order.remove(pos);
self.access_order.push(first_doc_id);
}
Some(Arc::clone(block))
} else {
None
}
}
fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
let evict = self.access_order.remove(0);
self.blocks.remove(&evict);
}
self.blocks.insert(first_doc_id, block);
self.access_order.push(first_doc_id);
}
}
impl AsyncStoreReader {
pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
let file_len = file_handle.len();
if file_len < 32 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Store too small",
));
}
let footer = file_handle
.read_bytes_range(file_len - 32..file_len)
.await?;
let mut reader = footer.as_slice();
let data_end_offset = reader.read_u64::<LittleEndian>()?;
let dict_offset = reader.read_u64::<LittleEndian>()?;
let num_docs = reader.read_u32::<LittleEndian>()?;
let has_dict = reader.read_u32::<LittleEndian>()? != 0;
let version = reader.read_u32::<LittleEndian>()?;
let magic = reader.read_u32::<LittleEndian>()?;
if magic != STORE_MAGIC {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid store magic",
));
}
if version != STORE_VERSION {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Unsupported store version: {}", version),
));
}
let dict = if has_dict && dict_offset > 0 {
let dict_start = dict_offset;
let dict_len_bytes = file_handle
.read_bytes_range(dict_start..dict_start + 4)
.await?;
let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
let dict_bytes = file_handle
.read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
.await?;
Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
} else {
None
};
let index_start = if has_dict && dict_offset > 0 {
let dict_start = dict_offset;
let dict_len_bytes = file_handle
.read_bytes_range(dict_start..dict_start + 4)
.await?;
let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
dict_start + 4 + dict_len
} else {
data_end_offset
};
let index_end = file_len - 32;
let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
let mut reader = index_bytes.as_slice();
let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
let mut index = Vec::with_capacity(num_blocks);
for _ in 0..num_blocks {
let first_doc_id = reader.read_u32::<LittleEndian>()?;
let offset = reader.read_u64::<LittleEndian>()?;
let length = reader.read_u32::<LittleEndian>()?;
let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
index.push(StoreBlockIndex {
first_doc_id,
offset,
length,
num_docs: num_docs_in_block,
});
}
let data_slice = file_handle.slice(0..data_end_offset);
Ok(Self {
data_slice,
index,
num_docs,
dict,
cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
})
}
pub fn num_docs(&self) -> u32 {
self.num_docs
}
pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
if doc_id >= self.num_docs {
return Ok(None);
}
let block_idx = self
.index
.binary_search_by(|entry| {
if doc_id < entry.first_doc_id {
std::cmp::Ordering::Greater
} else if doc_id >= entry.first_doc_id + entry.num_docs {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Equal
}
})
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
let entry = &self.index[block_idx];
let block_data = self.load_block(entry).await?;
let doc_offset_in_block = doc_id - entry.first_doc_id;
let mut reader = &block_data[..];
for _ in 0..doc_offset_in_block {
let doc_len = reader.read_u32::<LittleEndian>()? as usize;
if doc_len > reader.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid doc length",
));
}
reader = &reader[doc_len..];
}
let doc_len = reader.read_u32::<LittleEndian>()? as usize;
let doc_bytes = &reader[..doc_len];
deserialize_document(doc_bytes, schema).map(Some)
}
async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
{
let mut cache = self.cache.write();
if let Some(block) = cache.get(entry.first_doc_id) {
return Ok(block);
}
}
let start = entry.offset;
let end = start + entry.length as u64;
let compressed = self.data_slice.read_bytes_range(start..end).await?;
let decompressed = if let Some(ref dict) = self.dict {
crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
} else {
crate::compression::decompress(compressed.as_slice())?
};
let block = Arc::new(decompressed);
{
let mut cache = self.cache.write();
cache.insert(entry.first_doc_id, Arc::clone(&block));
}
Ok(block)
}
}
pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
}
#[derive(Debug, Clone)]
pub struct RawStoreBlock {
pub first_doc_id: DocId,
pub num_docs: u32,
pub offset: u64,
pub length: u32,
}
pub struct StoreMerger<'a, W: Write> {
writer: &'a mut W,
index: Vec<StoreBlockIndex>,
current_offset: u64,
next_doc_id: DocId,
}
impl<'a, W: Write> StoreMerger<'a, W> {
pub fn new(writer: &'a mut W) -> Self {
Self {
writer,
index: Vec::new(),
current_offset: 0,
next_doc_id: 0,
}
}
pub async fn append_store<F: AsyncFileRead>(
&mut self,
data_slice: &F,
blocks: &[RawStoreBlock],
) -> io::Result<()> {
for block in blocks {
let start = block.offset;
let end = start + block.length as u64;
let compressed_data = data_slice.read_bytes_range(start..end).await?;
self.writer.write_all(compressed_data.as_slice())?;
self.index.push(StoreBlockIndex {
first_doc_id: self.next_doc_id,
offset: self.current_offset,
length: block.length,
num_docs: block.num_docs,
});
self.current_offset += block.length as u64;
self.next_doc_id += block.num_docs;
}
Ok(())
}
pub fn finish(self) -> io::Result<u32> {
let data_end_offset = self.current_offset;
let dict_offset = 0u64;
self.writer
.write_u32::<LittleEndian>(self.index.len() as u32)?;
for entry in &self.index {
self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
self.writer.write_u64::<LittleEndian>(entry.offset)?;
self.writer.write_u32::<LittleEndian>(entry.length)?;
self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
}
self.writer.write_u64::<LittleEndian>(data_end_offset)?;
self.writer.write_u64::<LittleEndian>(dict_offset)?;
self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
Ok(self.next_doc_id)
}
}
impl AsyncStoreReader {
pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
self.index
.iter()
.map(|entry| RawStoreBlock {
first_doc_id: entry.first_doc_id,
num_docs: entry.num_docs,
offset: entry.offset,
length: entry.length,
})
.collect()
}
pub fn data_slice(&self) -> &LazyFileSlice {
&self.data_slice
}
pub fn has_dict(&self) -> bool {
self.dict.is_some()
}
}