use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use std::io::{self, Write};
use std::sync::Arc;
use crate::DocId;
use crate::compression::CompressionDict;
#[cfg(feature = "native")]
use crate::compression::CompressionLevel;
use crate::directories::FileHandle;
use crate::dsl::{Document, Schema};
const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2;
pub const STORE_BLOCK_SIZE: usize = 16 * 1024;
pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
#[cfg(feature = "native")]
const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(3);
fn write_store_index_and_footer(
writer: &mut (impl Write + ?Sized),
index: &[StoreBlockIndex],
data_end_offset: u64,
dict_offset: u64,
num_docs: u32,
has_dict: bool,
) -> io::Result<()> {
writer.write_u32::<LittleEndian>(index.len() as u32)?;
for entry in index {
writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
writer.write_u64::<LittleEndian>(entry.offset)?;
writer.write_u32::<LittleEndian>(entry.length)?;
writer.write_u32::<LittleEndian>(entry.num_docs)?;
}
writer.write_u64::<LittleEndian>(data_end_offset)?;
writer.write_u64::<LittleEndian>(dict_offset)?;
writer.write_u32::<LittleEndian>(num_docs)?;
writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
writer.write_u32::<LittleEndian>(STORE_VERSION)?;
writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
Ok(())
}
pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
let mut buf = Vec::with_capacity(256);
serialize_document_into(doc, schema, &mut buf)?;
Ok(buf)
}
pub fn serialize_document_into(
doc: &Document,
schema: &Schema,
buf: &mut Vec<u8>,
) -> io::Result<()> {
use crate::dsl::FieldValue;
buf.clear();
let is_stored = |field: &crate::dsl::Field, value: &FieldValue| -> bool {
if matches!(
value,
FieldValue::DenseVector(_) | FieldValue::BinaryDenseVector(_)
) {
return false;
}
schema.get_field_entry(*field).is_some_and(|e| e.stored)
};
let stored_count = doc
.field_values()
.iter()
.filter(|(field, value)| is_stored(field, value))
.count();
buf.write_u16::<LittleEndian>(stored_count as u16)?;
for (field, value) in doc.field_values().iter().filter(|(f, v)| is_stored(f, v)) {
buf.write_u16::<LittleEndian>(field.0 as u16)?;
match value {
FieldValue::Text(s) => {
buf.push(0);
let bytes = s.as_bytes();
buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
buf.extend_from_slice(bytes);
}
FieldValue::U64(v) => {
buf.push(1);
buf.write_u64::<LittleEndian>(*v)?;
}
FieldValue::I64(v) => {
buf.push(2);
buf.write_i64::<LittleEndian>(*v)?;
}
FieldValue::F64(v) => {
buf.push(3);
buf.write_f64::<LittleEndian>(*v)?;
}
FieldValue::Bytes(b) => {
buf.push(4);
buf.write_u32::<LittleEndian>(b.len() as u32)?;
buf.extend_from_slice(b);
}
FieldValue::SparseVector(entries) => {
buf.push(5);
buf.write_u32::<LittleEndian>(entries.len() as u32)?;
for (idx, val) in entries {
buf.write_u32::<LittleEndian>(*idx)?;
buf.write_f32::<LittleEndian>(*val)?;
}
}
FieldValue::DenseVector(values) => {
buf.push(6);
buf.write_u32::<LittleEndian>(values.len() as u32)?;
let byte_slice = unsafe {
std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
};
buf.extend_from_slice(byte_slice);
}
FieldValue::Json(v) => {
buf.push(7);
let json_bytes = serde_json::to_vec(v)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
buf.extend_from_slice(&json_bytes);
}
FieldValue::BinaryDenseVector(b) => {
buf.push(8);
buf.write_u32::<LittleEndian>(b.len() as u32)?;
buf.extend_from_slice(b);
}
}
}
Ok(())
}
#[cfg(feature = "native")]
struct CompressedBlock {
seq: usize,
first_doc_id: DocId,
num_docs: u32,
compressed: Vec<u8>,
}
#[cfg(feature = "native")]
pub struct EagerParallelStoreWriter<'a> {
writer: &'a mut dyn Write,
block_buffer: Vec<u8>,
serialize_buf: Vec<u8>,
compressed_blocks: Vec<CompressedBlock>,
pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
next_seq: usize,
next_doc_id: DocId,
block_first_doc: DocId,
dict: Option<Arc<CompressionDict>>,
compression_level: CompressionLevel,
}
#[cfg(feature = "native")]
impl<'a> EagerParallelStoreWriter<'a> {
pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
}
pub fn with_compression_level(
writer: &'a mut dyn Write,
_num_threads: usize,
compression_level: CompressionLevel,
) -> Self {
Self {
writer,
block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
serialize_buf: Vec::with_capacity(512),
compressed_blocks: Vec::new(),
pending_handles: Vec::new(),
next_seq: 0,
next_doc_id: 0,
block_first_doc: 0,
dict: None,
compression_level,
}
}
pub fn with_dict(
writer: &'a mut dyn Write,
dict: CompressionDict,
_num_threads: usize,
) -> Self {
Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
}
pub fn with_dict_and_level(
writer: &'a mut dyn Write,
dict: CompressionDict,
_num_threads: usize,
compression_level: CompressionLevel,
) -> Self {
Self {
writer,
block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
serialize_buf: Vec::with_capacity(512),
compressed_blocks: Vec::new(),
pending_handles: Vec::new(),
next_seq: 0,
next_doc_id: 0,
block_first_doc: 0,
dict: Some(Arc::new(dict)),
compression_level,
}
}
pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
serialize_document_into(doc, schema, &mut self.serialize_buf)?;
let doc_id = self.next_doc_id;
self.next_doc_id += 1;
self.block_buffer
.write_u32::<LittleEndian>(self.serialize_buf.len() as u32)?;
self.block_buffer.extend_from_slice(&self.serialize_buf);
if self.block_buffer.len() >= STORE_BLOCK_SIZE {
self.spawn_compression();
}
Ok(doc_id)
}
pub fn store_raw(&mut self, doc_bytes: &[u8]) -> io::Result<DocId> {
let doc_id = self.next_doc_id;
self.next_doc_id += 1;
self.block_buffer
.write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
self.block_buffer.extend_from_slice(doc_bytes);
if self.block_buffer.len() >= STORE_BLOCK_SIZE {
self.spawn_compression();
}
Ok(doc_id)
}
fn spawn_compression(&mut self) {
if self.block_buffer.is_empty() {
return;
}
let num_docs = self.next_doc_id - self.block_first_doc;
let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
let seq = self.next_seq;
let first_doc_id = self.block_first_doc;
let dict = self.dict.clone();
self.next_seq += 1;
self.block_first_doc = self.next_doc_id;
let level = self.compression_level;
let handle = std::thread::spawn(move || {
let compressed = if let Some(ref d) = dict {
crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
} else {
crate::compression::compress(&data, level).expect("compression failed")
};
CompressedBlock {
seq,
first_doc_id,
num_docs,
compressed,
}
});
self.pending_handles.push(handle);
}
fn collect_completed(&mut self) {
let mut remaining = Vec::new();
for handle in self.pending_handles.drain(..) {
if handle.is_finished() {
match handle.join() {
Ok(block) => self.compressed_blocks.push(block),
Err(payload) => std::panic::resume_unwind(payload),
}
} else {
remaining.push(handle);
}
}
self.pending_handles = remaining;
}
pub fn finish(mut self) -> io::Result<u32> {
self.spawn_compression();
self.collect_completed();
for handle in self.pending_handles.drain(..) {
match handle.join() {
Ok(block) => self.compressed_blocks.push(block),
Err(payload) => std::panic::resume_unwind(payload),
}
}
if self.compressed_blocks.is_empty() {
write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
return Ok(0);
}
self.compressed_blocks.sort_by_key(|b| b.seq);
let mut index = Vec::with_capacity(self.compressed_blocks.len());
let mut current_offset = 0u64;
for block in &self.compressed_blocks {
index.push(StoreBlockIndex {
first_doc_id: block.first_doc_id,
offset: current_offset,
length: block.compressed.len() as u32,
num_docs: block.num_docs,
});
self.writer.write_all(&block.compressed)?;
current_offset += block.compressed.len() as u64;
}
let data_end_offset = current_offset;
let dict_offset = if let Some(ref dict) = self.dict {
let offset = current_offset;
let dict_bytes = dict.as_bytes();
self.writer
.write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
self.writer.write_all(dict_bytes)?;
Some(offset)
} else {
None
};
write_store_index_and_footer(
&mut self.writer,
&index,
data_end_offset,
dict_offset.unwrap_or(0),
self.next_doc_id,
self.dict.is_some(),
)?;
Ok(self.next_doc_id)
}
}
#[derive(Debug, Clone)]
pub(crate) struct StoreBlockIndex {
pub(crate) first_doc_id: DocId,
pub(crate) offset: u64,
pub(crate) length: u32,
pub(crate) num_docs: u32,
}
pub struct AsyncStoreReader {
data_slice: FileHandle,
index: Vec<StoreBlockIndex>,
num_docs: u32,
dict: Option<CompressionDict>,
cache: RwLock<StoreBlockCache>,
}
struct CachedBlock {
data: Vec<u8>,
offsets: Vec<u32>,
}
impl CachedBlock {
fn build(data: Vec<u8>, num_docs: u32) -> io::Result<Self> {
let mut offsets = Vec::with_capacity(num_docs as usize);
let mut pos = 0usize;
for _ in 0..num_docs {
if pos + 4 > data.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"truncated block while building offset table",
));
}
offsets.push(pos as u32);
let doc_len =
u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
as usize;
pos += 4 + doc_len;
}
Ok(Self { data, offsets })
}
fn doc_bytes(&self, doc_offset_in_block: u32) -> io::Result<&[u8]> {
let idx = doc_offset_in_block as usize;
if idx >= self.offsets.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"doc offset out of range",
));
}
let start = self.offsets[idx] as usize;
if start + 4 > self.data.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"truncated doc length",
));
}
let doc_len = u32::from_le_bytes([
self.data[start],
self.data[start + 1],
self.data[start + 2],
self.data[start + 3],
]) as usize;
let data_start = start + 4;
if data_start + doc_len > self.data.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"doc data overflow",
));
}
Ok(&self.data[data_start..data_start + doc_len])
}
}
struct StoreBlockCache {
blocks: FxHashMap<DocId, Arc<CachedBlock>>,
lru_order: std::collections::VecDeque<DocId>,
max_blocks: usize,
}
impl StoreBlockCache {
fn new(max_blocks: usize) -> Self {
Self {
blocks: FxHashMap::default(),
lru_order: std::collections::VecDeque::with_capacity(max_blocks),
max_blocks,
}
}
fn peek(&self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
self.blocks.get(&first_doc_id).map(Arc::clone)
}
fn get(&mut self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
let block = self.blocks.get(&first_doc_id).map(Arc::clone)?;
self.promote(first_doc_id);
Some(block)
}
fn insert(&mut self, first_doc_id: DocId, block: Arc<CachedBlock>) {
if self.blocks.contains_key(&first_doc_id) {
self.promote(first_doc_id);
return;
}
while self.blocks.len() >= self.max_blocks {
if let Some(evict) = self.lru_order.pop_front() {
self.blocks.remove(&evict);
} else {
break;
}
}
self.blocks.insert(first_doc_id, block);
self.lru_order.push_back(first_doc_id);
}
fn promote(&mut self, key: DocId) {
if let Some(pos) = self.lru_order.iter().position(|&k| k == key) {
self.lru_order.remove(pos);
self.lru_order.push_back(key);
}
}
}
impl AsyncStoreReader {
pub async fn open(file_handle: FileHandle, cache_blocks: usize) -> io::Result<Self> {
let file_len = file_handle.len();
if file_len < 32 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Store too small",
));
}
let footer = file_handle
.read_bytes_range(file_len - 32..file_len)
.await?;
let mut reader = footer.as_slice();
let data_end_offset = reader.read_u64::<LittleEndian>()?;
let dict_offset = reader.read_u64::<LittleEndian>()?;
let num_docs = reader.read_u32::<LittleEndian>()?;
let has_dict = reader.read_u32::<LittleEndian>()? != 0;
let version = reader.read_u32::<LittleEndian>()?;
let magic = reader.read_u32::<LittleEndian>()?;
if magic != STORE_MAGIC {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid store magic",
));
}
if version != STORE_VERSION {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Unsupported store version: {}", version),
));
}
let (dict, index_start) = if has_dict && dict_offset > 0 {
let dict_start = dict_offset;
let dict_len_bytes = file_handle
.read_bytes_range(dict_start..dict_start + 4)
.await?;
let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
let dict_bytes = file_handle
.read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
.await?;
let idx_start = dict_start + 4 + dict_len;
(
Some(CompressionDict::from_owned_bytes(dict_bytes)),
idx_start,
)
} else {
(None, data_end_offset)
};
let index_end = file_len - 32;
let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
let mut reader = index_bytes.as_slice();
let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
let mut index = Vec::with_capacity(num_blocks);
for _ in 0..num_blocks {
let first_doc_id = reader.read_u32::<LittleEndian>()?;
let offset = reader.read_u64::<LittleEndian>()?;
let length = reader.read_u32::<LittleEndian>()?;
let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
index.push(StoreBlockIndex {
first_doc_id,
offset,
length,
num_docs: num_docs_in_block,
});
}
let data_slice = file_handle.slice(0..data_end_offset);
Ok(Self {
data_slice,
index,
num_docs,
dict,
cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
})
}
pub fn num_docs(&self) -> u32 {
self.num_docs
}
pub fn cached_blocks(&self) -> usize {
self.cache.read().blocks.len()
}
pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
if doc_id >= self.num_docs {
return Ok(None);
}
let (entry, block) = self.find_and_load_block(doc_id).await?;
let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
deserialize_document(doc_bytes, schema).map(Some)
}
pub async fn get_fields(
&self,
doc_id: DocId,
schema: &Schema,
field_ids: &[u32],
) -> io::Result<Option<Document>> {
if doc_id >= self.num_docs {
return Ok(None);
}
let (entry, block) = self.find_and_load_block(doc_id).await?;
let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
deserialize_document_fields(doc_bytes, schema, field_ids).map(Some)
}
async fn find_and_load_block(
&self,
doc_id: DocId,
) -> io::Result<(&StoreBlockIndex, Arc<CachedBlock>)> {
let block_idx = self
.index
.binary_search_by(|entry| {
if doc_id < entry.first_doc_id {
std::cmp::Ordering::Greater
} else if doc_id >= entry.first_doc_id + entry.num_docs {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Equal
}
})
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
let entry = &self.index[block_idx];
let block = self.load_block(entry).await?;
Ok((entry, block))
}
async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<CachedBlock>> {
{
let cache = self.cache.read();
if let Some(block) = cache.peek(entry.first_doc_id) {
return Ok(block);
}
}
{
if let Some(block) = self.cache.write().get(entry.first_doc_id) {
return Ok(block);
}
}
let start = entry.offset;
let end = start + entry.length as u64;
let compressed = self.data_slice.read_bytes_range(start..end).await?;
let decompressed = if let Some(ref dict) = self.dict {
crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
} else {
crate::compression::decompress(compressed.as_slice())?
};
let cached = CachedBlock::build(decompressed, entry.num_docs)?;
let block = Arc::new(cached);
{
let mut cache = self.cache.write();
cache.insert(entry.first_doc_id, Arc::clone(&block));
}
Ok(block)
}
}
pub fn deserialize_document_fields(
data: &[u8],
schema: &Schema,
field_ids: &[u32],
) -> io::Result<Document> {
deserialize_document_inner(data, schema, Some(field_ids))
}
pub fn deserialize_document(data: &[u8], schema: &Schema) -> io::Result<Document> {
deserialize_document_inner(data, schema, None)
}
fn deserialize_document_inner(
data: &[u8],
_schema: &Schema,
field_filter: Option<&[u32]>,
) -> io::Result<Document> {
use crate::dsl::Field;
let mut reader = data;
let num_fields = reader.read_u16::<LittleEndian>()? as usize;
let mut doc = Document::new();
for _ in 0..num_fields {
let field_id = reader.read_u16::<LittleEndian>()?;
let type_tag = reader.read_u8()?;
let wanted = field_filter.is_none_or(|ids| ids.contains(&(field_id as u32)));
match type_tag {
0 => {
let len = reader.read_u32::<LittleEndian>()? as usize;
if wanted {
let s = std::str::from_utf8(&reader[..len])
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
doc.add_text(Field(field_id as u32), s);
}
reader = &reader[len..];
}
1 => {
let v = reader.read_u64::<LittleEndian>()?;
if wanted {
doc.add_u64(Field(field_id as u32), v);
}
}
2 => {
let v = reader.read_i64::<LittleEndian>()?;
if wanted {
doc.add_i64(Field(field_id as u32), v);
}
}
3 => {
let v = reader.read_f64::<LittleEndian>()?;
if wanted {
doc.add_f64(Field(field_id as u32), v);
}
}
4 => {
let len = reader.read_u32::<LittleEndian>()? as usize;
if wanted {
doc.add_bytes(Field(field_id as u32), reader[..len].to_vec());
}
reader = &reader[len..];
}
5 => {
let count = reader.read_u32::<LittleEndian>()? as usize;
if wanted {
let mut entries = Vec::with_capacity(count);
for _ in 0..count {
let idx = reader.read_u32::<LittleEndian>()?;
let val = reader.read_f32::<LittleEndian>()?;
entries.push((idx, val));
}
doc.add_sparse_vector(Field(field_id as u32), entries);
} else {
let skip = count * 8;
if skip > reader.len() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"sparse vector skip overflow",
));
}
reader = &reader[skip..];
}
}
6 => {
let count = reader.read_u32::<LittleEndian>()? as usize;
let byte_len = count * 4;
if byte_len > reader.len() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"dense vector truncated",
));
}
if wanted {
let mut values = vec![0.0f32; count];
unsafe {
std::ptr::copy_nonoverlapping(
reader.as_ptr(),
values.as_mut_ptr() as *mut u8,
byte_len,
);
}
doc.add_dense_vector(Field(field_id as u32), values);
}
reader = &reader[byte_len..];
}
7 => {
let len = reader.read_u32::<LittleEndian>()? as usize;
if len > reader.len() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"json field truncated",
));
}
if wanted {
let v: serde_json::Value = serde_json::from_slice(&reader[..len])
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
doc.add_json(Field(field_id as u32), v);
}
reader = &reader[len..];
}
8 => {
let len = reader.read_u32::<LittleEndian>()? as usize;
if len > reader.len() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"binary dense vector truncated",
));
}
if wanted {
doc.add_binary_dense_vector(Field(field_id as u32), reader[..len].to_vec());
}
reader = &reader[len..];
}
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Unknown field type tag: {}", type_tag),
));
}
}
}
Ok(doc)
}
#[derive(Debug, Clone)]
pub struct RawStoreBlock {
pub first_doc_id: DocId,
pub num_docs: u32,
pub offset: u64,
pub length: u32,
}
pub struct StoreMerger<'a, W: Write> {
writer: &'a mut W,
index: Vec<StoreBlockIndex>,
current_offset: u64,
next_doc_id: DocId,
}
impl<'a, W: Write> StoreMerger<'a, W> {
pub fn new(writer: &'a mut W) -> Self {
Self {
writer,
index: Vec::new(),
current_offset: 0,
next_doc_id: 0,
}
}
pub async fn append_store(
&mut self,
data_slice: &FileHandle,
blocks: &[RawStoreBlock],
) -> io::Result<()> {
for block in blocks {
let start = block.offset;
let end = start + block.length as u64;
let compressed_data = data_slice.read_bytes_range(start..end).await?;
self.writer.write_all(compressed_data.as_slice())?;
self.index.push(StoreBlockIndex {
first_doc_id: self.next_doc_id,
offset: self.current_offset,
length: block.length,
num_docs: block.num_docs,
});
self.current_offset += block.length as u64;
self.next_doc_id += block.num_docs;
}
Ok(())
}
pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
let dict = store.dict();
let data_slice = store.data_slice();
let blocks = store.block_index();
for block in blocks {
let start = block.offset;
let end = start + block.length as u64;
let compressed = data_slice.read_bytes_range(start..end).await?;
let decompressed = if let Some(d) = dict {
crate::compression::decompress_with_dict(compressed.as_slice(), d)?
} else {
crate::compression::decompress(compressed.as_slice())?
};
let recompressed = crate::compression::compress(
&decompressed,
crate::compression::CompressionLevel::default(),
)?;
self.writer.write_all(&recompressed)?;
self.index.push(StoreBlockIndex {
first_doc_id: self.next_doc_id,
offset: self.current_offset,
length: recompressed.len() as u32,
num_docs: block.num_docs,
});
self.current_offset += recompressed.len() as u64;
self.next_doc_id += block.num_docs;
}
Ok(())
}
pub fn finish(self) -> io::Result<u32> {
let data_end_offset = self.current_offset;
let dict_offset = 0u64;
write_store_index_and_footer(
self.writer,
&self.index,
data_end_offset,
dict_offset,
self.next_doc_id,
false,
)?;
Ok(self.next_doc_id)
}
}
impl AsyncStoreReader {
pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
self.index
.iter()
.map(|entry| RawStoreBlock {
first_doc_id: entry.first_doc_id,
num_docs: entry.num_docs,
offset: entry.offset,
length: entry.length,
})
.collect()
}
pub fn data_slice(&self) -> &FileHandle {
&self.data_slice
}
pub fn has_dict(&self) -> bool {
self.dict.is_some()
}
pub fn dict(&self) -> Option<&CompressionDict> {
self.dict.as_ref()
}
pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
&self.index
}
}