const BLOCK_SIZE: usize = 16 * 1024;
pub struct DocStoreWriter {
current_block: Vec<u8>,
compressed_blocks: Vec<CompressedBlock>,
doc_entries: Vec<DocEntry>,
}
struct CompressedBlock {
data: Vec<u8>,
uncompressed_len: u32,
}
#[derive(Clone, Copy)]
struct DocEntry {
block_idx: u32,
within_block_offset: u32,
len: u32,
}
impl DocStoreWriter {
pub fn new() -> Self {
Self {
current_block: Vec::new(),
compressed_blocks: Vec::new(),
doc_entries: Vec::new(),
}
}
pub fn add(&mut self, doc_bytes: &[u8]) {
let block_idx = self.compressed_blocks.len() as u32;
let offset = self.current_block.len() as u32;
self.doc_entries.push(DocEntry {
block_idx,
within_block_offset: offset,
len: doc_bytes.len() as u32,
});
self.current_block.extend_from_slice(doc_bytes);
if self.current_block.len() >= BLOCK_SIZE {
self.flush_block();
}
}
pub fn doc_count(&self) -> u32 {
self.doc_entries.len() as u32
}
fn flush_block(&mut self) {
if self.current_block.is_empty() {
return;
}
let uncompressed_len = self.current_block.len() as u32;
let compressed = lz4_flex::compress_prepend_size(&self.current_block);
self.compressed_blocks.push(CompressedBlock {
data: compressed,
uncompressed_len,
});
self.current_block.clear();
}
pub fn finish(mut self) -> Vec<u8> {
self.flush_block();
let num_blocks = self.compressed_blocks.len() as u32;
let num_docs = self.doc_entries.len() as u32;
let mut result = Vec::new();
result.extend_from_slice(&num_blocks.to_le_bytes());
result.extend_from_slice(&num_docs.to_le_bytes());
let block_meta_start = result.len();
let block_meta_size = num_blocks as usize * 16;
result.resize(result.len() + block_meta_size, 0);
let doc_meta_start = result.len();
let doc_meta_size = num_docs as usize * 12;
result.resize(result.len() + doc_meta_size, 0);
for (i, block) in self.compressed_blocks.iter().enumerate() {
let offset = result.len() as u64;
let compressed_len = block.data.len() as u32;
let meta_pos = block_meta_start + i * 16;
result[meta_pos..meta_pos + 8].copy_from_slice(&offset.to_le_bytes());
result[meta_pos + 8..meta_pos + 12].copy_from_slice(&compressed_len.to_le_bytes());
result[meta_pos + 12..meta_pos + 16]
.copy_from_slice(&block.uncompressed_len.to_le_bytes());
result.extend_from_slice(&block.data);
}
for (i, entry) in self.doc_entries.iter().enumerate() {
let pos = doc_meta_start + i * 12;
result[pos..pos + 4].copy_from_slice(&entry.block_idx.to_le_bytes());
result[pos + 4..pos + 8].copy_from_slice(&entry.within_block_offset.to_le_bytes());
result[pos + 8..pos + 12].copy_from_slice(&entry.len.to_le_bytes());
}
result
}
}
impl Default for DocStoreWriter {
fn default() -> Self {
Self::new()
}
}
pub struct DocStoreReader<'a> {
data: &'a [u8],
#[allow(dead_code)]
num_blocks: u32,
num_docs: u32,
block_meta_start: usize,
doc_meta_start: usize,
cached_block: std::sync::Mutex<Option<(u32, Vec<u8>)>>,
}
impl<'a> DocStoreReader<'a> {
pub fn open(data: &'a [u8]) -> Self {
let num_blocks = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
let num_docs = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
let block_meta_start = 8;
let doc_meta_start = block_meta_start + num_blocks as usize * 16;
Self {
data,
num_blocks,
num_docs,
block_meta_start,
doc_meta_start,
cached_block: std::sync::Mutex::new(None),
}
}
pub fn doc_count(&self) -> u32 {
self.num_docs
}
pub fn get(&self, doc_id: u32) -> Option<Vec<u8>> {
if doc_id >= self.num_docs {
return None;
}
let doc_pos = self.doc_meta_start + doc_id as usize * 12;
let block_idx = u32::from_le_bytes(self.data[doc_pos..doc_pos + 4].try_into().unwrap());
let within_offset =
u32::from_le_bytes(self.data[doc_pos + 4..doc_pos + 8].try_into().unwrap());
let len = u32::from_le_bytes(self.data[doc_pos + 8..doc_pos + 12].try_into().unwrap());
let block_data = self.get_block(block_idx);
let start = within_offset as usize;
let end = start + len as usize;
Some(block_data[start..end].to_vec())
}
fn get_block(&self, block_idx: u32) -> Vec<u8> {
{
let cache = self.cached_block.lock().unwrap();
if let Some((cached_idx, ref data)) = *cache {
if cached_idx == block_idx {
return data.clone();
}
}
}
let meta_pos = self.block_meta_start + block_idx as usize * 16;
let offset =
u64::from_le_bytes(self.data[meta_pos..meta_pos + 8].try_into().unwrap()) as usize;
let compressed_len =
u32::from_le_bytes(self.data[meta_pos + 8..meta_pos + 12].try_into().unwrap()) as usize;
let compressed = &self.data[offset..offset + compressed_len];
let decompressed =
lz4_flex::decompress_size_prepended(compressed).expect("LZ4 decompression failed");
*self.cached_block.lock().unwrap() = Some((block_idx, decompressed.clone()));
decompressed
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn single_doc() {
let mut writer = DocStoreWriter::new();
writer.add(b"hello world");
let data = writer.finish();
let reader = DocStoreReader::open(&data);
assert_eq!(reader.doc_count(), 1);
assert_eq!(reader.get(0).unwrap(), b"hello world");
assert_eq!(reader.get(1), None);
}
#[test]
fn multiple_docs() {
let mut writer = DocStoreWriter::new();
writer.add(b"doc zero");
writer.add(b"doc one");
writer.add(b"doc two");
let data = writer.finish();
let reader = DocStoreReader::open(&data);
assert_eq!(reader.doc_count(), 3);
assert_eq!(reader.get(0).unwrap(), b"doc zero");
assert_eq!(reader.get(1).unwrap(), b"doc one");
assert_eq!(reader.get(2).unwrap(), b"doc two");
}
#[test]
fn empty_doc() {
let mut writer = DocStoreWriter::new();
writer.add(b"");
writer.add(b"nonempty");
writer.add(b"");
let data = writer.finish();
let reader = DocStoreReader::open(&data);
assert_eq!(reader.get(0).unwrap(), b"");
assert_eq!(reader.get(1).unwrap(), b"nonempty");
assert_eq!(reader.get(2).unwrap(), b"");
}
#[test]
fn multi_block_spanning() {
let mut writer = DocStoreWriter::new();
let doc = vec![b'x'; 4096]; for _ in 0..20 {
writer.add(&doc);
}
let data = writer.finish();
let reader = DocStoreReader::open(&data);
assert_eq!(reader.doc_count(), 20);
for i in 0..20 {
assert_eq!(reader.get(i).unwrap(), doc);
}
}
#[test]
fn large_doc_filling_block() {
let mut writer = DocStoreWriter::new();
let large_doc = vec![b'A'; BLOCK_SIZE]; writer.add(&large_doc);
writer.add(b"small");
let data = writer.finish();
let reader = DocStoreReader::open(&data);
assert_eq!(reader.get(0).unwrap(), large_doc);
assert_eq!(reader.get(1).unwrap(), b"small");
}
#[test]
fn compression_reduces_size() {
let mut writer = DocStoreWriter::new();
let doc = vec![0u8; BLOCK_SIZE * 2];
writer.add(&doc);
let data = writer.finish();
assert!(
data.len() < doc.len(),
"compressed size {} should be less than raw {}",
data.len(),
doc.len()
);
}
#[test]
fn all_docs_retrievable() {
let mut writer = DocStoreWriter::new();
let docs: Vec<Vec<u8>> = (0..100)
.map(|i| format!("{{\"id\": {i}, \"text\": \"document number {i}\"}}").into_bytes())
.collect();
for doc in &docs {
writer.add(doc);
}
let data = writer.finish();
let reader = DocStoreReader::open(&data);
assert_eq!(reader.doc_count(), 100);
for (i, expected) in docs.iter().enumerate() {
assert_eq!(&reader.get(i as u32).unwrap(), expected);
}
}
#[test]
fn out_of_range_returns_none() {
let mut writer = DocStoreWriter::new();
writer.add(b"only one");
let data = writer.finish();
let reader = DocStoreReader::open(&data);
assert_eq!(reader.get(1), None);
assert_eq!(reader.get(100), None);
assert_eq!(reader.get(u32::MAX), None);
}
#[test]
fn empty_store() {
let writer = DocStoreWriter::new();
let data = writer.finish();
let reader = DocStoreReader::open(&data);
assert_eq!(reader.doc_count(), 0);
assert_eq!(reader.get(0), None);
}
#[test]
fn json_docs_round_trip() {
let mut writer = DocStoreWriter::new();
let json1 = br#"{"title":"hello","body":"world"}"#;
let json2 = br#"{"title":"foo","body":"bar baz"}"#;
writer.add(json1);
writer.add(json2);
let data = writer.finish();
let reader = DocStoreReader::open(&data);
let doc1: serde_json::Value = serde_json::from_slice(&reader.get(0).unwrap()).unwrap();
assert_eq!(doc1["title"], "hello");
let doc2: serde_json::Value = serde_json::from_slice(&reader.get(1).unwrap()).unwrap();
assert_eq!(doc2["body"], "bar baz");
}
#[test]
fn block_cache_hit() {
let mut writer = DocStoreWriter::new();
writer.add(b"doc A");
writer.add(b"doc B");
let data = writer.finish();
let reader = DocStoreReader::open(&data);
assert_eq!(reader.get(0).unwrap(), b"doc A");
assert_eq!(reader.get(1).unwrap(), b"doc B");
}
#[test]
fn varying_doc_sizes() {
let mut writer = DocStoreWriter::new();
writer.add(b"x");
writer.add(&vec![b'y'; 1000]);
writer.add(b"z");
writer.add(&vec![b'w'; 8000]);
let data = writer.finish();
let reader = DocStoreReader::open(&data);
assert_eq!(reader.get(0).unwrap(), b"x");
assert_eq!(reader.get(1).unwrap(), vec![b'y'; 1000]);
assert_eq!(reader.get(2).unwrap(), b"z");
assert_eq!(reader.get(3).unwrap(), vec![b'w'; 8000]);
}
}