use std::io;
use std::iter::Sum;
use std::ops::{AddAssign, Range};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use common::{BinarySerializable, HasLen};
use lru::LruCache;
use ownedbytes::OwnedBytes;
use super::footer::DocStoreFooter;
use super::index::SkipIndex;
use super::Decompressor;
use crate::directory::FileSlice;
use crate::error::DataCorruption;
use crate::fastfield::AliveBitSet;
use crate::schema::Document;
use crate::space_usage::StoreSpaceUsage;
use crate::store::index::Checkpoint;
use crate::DocId;
pub(crate) const DOCSTORE_CACHE_CAPACITY: usize = 100;
type Block = OwnedBytes;
pub struct StoreReader {
decompressor: Decompressor,
data: FileSlice,
skip_index: Arc<SkipIndex>,
space_usage: StoreSpaceUsage,
cache: BlockCache,
}
struct BlockCache {
cache: Mutex<LruCache<usize, Block>>,
cache_hits: Arc<AtomicUsize>,
cache_misses: Arc<AtomicUsize>,
}
impl BlockCache {
fn get_from_cache(&self, pos: usize) -> Option<Block> {
if let Some(block) = self.cache.lock().unwrap().get(&pos) {
self.cache_hits.fetch_add(1, Ordering::SeqCst);
return Some(block.clone());
}
self.cache_misses.fetch_add(1, Ordering::SeqCst);
None
}
fn put_into_cache(&self, pos: usize, data: Block) {
self.cache.lock().unwrap().put(pos, data);
}
fn stats(&self) -> CacheStats {
CacheStats {
cache_hits: self.cache_hits.load(Ordering::Relaxed),
cache_misses: self.cache_misses.load(Ordering::Relaxed),
num_entries: self.len(),
}
}
fn len(&self) -> usize {
self.cache.lock().unwrap().len()
}
#[cfg(test)]
fn peek_lru(&self) -> Option<usize> {
self.cache
.lock()
.unwrap()
.peek_lru()
.map(|(&k, _)| k as usize)
}
}
#[derive(Debug, Default)]
pub struct CacheStats {
pub num_entries: usize,
pub cache_hits: usize,
pub cache_misses: usize,
}
impl AddAssign for CacheStats {
fn add_assign(&mut self, other: Self) {
*self = Self {
num_entries: self.num_entries + other.num_entries,
cache_hits: self.cache_hits + other.cache_hits,
cache_misses: self.cache_misses + other.cache_misses,
};
}
}
impl Sum for CacheStats {
fn sum<I: Iterator<Item = Self>>(mut iter: I) -> Self {
let mut first = iter.next().unwrap_or_default();
for el in iter {
first += el;
}
first
}
}
impl StoreReader {
pub fn open(store_file: FileSlice, cache_size: usize) -> io::Result<StoreReader> {
let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?;
let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize);
let index_data = offset_index_file.read_bytes()?;
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
let skip_index = SkipIndex::open(index_data);
Ok(StoreReader {
decompressor: footer.decompressor,
data: data_file,
cache: BlockCache {
cache: Mutex::new(LruCache::new(cache_size)),
cache_hits: Default::default(),
cache_misses: Default::default(),
},
skip_index: Arc::new(skip_index),
space_usage,
})
}
pub(crate) fn block_checkpoints(&self) -> impl Iterator<Item = Checkpoint> + '_ {
self.skip_index.checkpoints()
}
pub(crate) fn decompressor(&self) -> Decompressor {
self.decompressor
}
pub(crate) fn cache_stats(&self) -> CacheStats {
self.cache.stats()
}
fn block_checkpoint(&self, doc_id: DocId) -> crate::Result<Checkpoint> {
self.skip_index.seek(doc_id).ok_or_else(|| {
crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{}.", doc_id))
})
}
pub(crate) fn block_data(&self) -> io::Result<OwnedBytes> {
self.data.read_bytes()
}
fn get_compressed_block(&self, checkpoint: &Checkpoint) -> io::Result<OwnedBytes> {
self.data.slice(checkpoint.byte_range.clone()).read_bytes()
}
fn read_block(&self, checkpoint: &Checkpoint) -> io::Result<Block> {
let cache_key = checkpoint.byte_range.start;
if let Some(block) = self.cache.get_from_cache(cache_key) {
return Ok(block);
}
let compressed_block = self.get_compressed_block(checkpoint)?;
let decompressed_block =
OwnedBytes::new(self.decompressor.decompress(compressed_block.as_ref())?);
self.cache
.put_into_cache(cache_key, decompressed_block.clone());
Ok(decompressed_block)
}
pub fn get(&self, doc_id: DocId) -> crate::Result<Document> {
let mut doc_bytes = self.get_document_bytes(doc_id)?;
Ok(Document::deserialize(&mut doc_bytes)?)
}
pub fn get_document_bytes(&self, doc_id: DocId) -> crate::Result<OwnedBytes> {
let checkpoint = self.block_checkpoint(doc_id)?;
let block = self.read_block(&checkpoint)?;
Self::get_document_bytes_from_block(block, doc_id, &checkpoint)
}
fn get_document_bytes_from_block(
block: OwnedBytes,
doc_id: DocId,
checkpoint: &Checkpoint,
) -> crate::Result<OwnedBytes> {
let doc_pos = doc_id - checkpoint.doc_range.start;
let range = block_read_index(&block, doc_pos)?;
Ok(block.slice(range))
}
pub fn iter<'a: 'b, 'b>(
&'b self,
alive_bitset: Option<&'a AliveBitSet>,
) -> impl Iterator<Item = crate::Result<Document>> + 'b {
self.iter_raw(alive_bitset).map(|doc_bytes_res| {
let mut doc_bytes = doc_bytes_res?;
Ok(Document::deserialize(&mut doc_bytes)?)
})
}
pub(crate) fn iter_raw<'a: 'b, 'b>(
&'b self,
alive_bitset: Option<&'a AliveBitSet>,
) -> impl Iterator<Item = crate::Result<OwnedBytes>> + 'b {
let last_doc_id = self
.block_checkpoints()
.last()
.map(|checkpoint| checkpoint.doc_range.end)
.unwrap_or(0);
let mut checkpoint_block_iter = self.block_checkpoints();
let mut curr_checkpoint = checkpoint_block_iter.next();
let mut curr_block = curr_checkpoint
.as_ref()
.map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); let mut doc_pos = 0;
(0..last_doc_id)
.filter_map(move |doc_id| {
if doc_id >= curr_checkpoint.as_ref().unwrap().doc_range.end {
curr_checkpoint = checkpoint_block_iter.next();
curr_block = curr_checkpoint
.as_ref()
.map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind()));
doc_pos = 0;
}
let alive = alive_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
let res = if alive {
Some((curr_block.clone(), doc_pos))
} else {
None
};
doc_pos += 1;
res
})
.map(move |(block, doc_pos)| {
let block = block
.ok_or_else(|| {
DataCorruption::comment_only(
"the current checkpoint in the doc store iterator is none, this \
should never happen",
)
})?
.map_err(|error_kind| {
std::io::Error::new(error_kind, "error when reading block in doc store")
})?;
let range = block_read_index(&block, doc_pos)?;
Ok(block.slice(range))
})
}
pub fn space_usage(&self) -> StoreSpaceUsage {
self.space_usage.clone()
}
}
fn block_read_index(block: &[u8], doc_pos: u32) -> crate::Result<Range<usize>> {
let doc_pos = doc_pos as usize;
let size_of_u32 = std::mem::size_of::<u32>();
let index_len_pos = block.len() - size_of_u32;
let index_len = u32::deserialize(&mut &block[index_len_pos..])? as usize;
if doc_pos > index_len {
return Err(crate::TantivyError::InternalError(
"Attempted to read doc from wrong block".to_owned(),
));
}
let index_start = block.len() - (index_len + 1) * size_of_u32;
let index = &block[index_start..index_start + index_len * size_of_u32];
let start_offset = u32::deserialize(&mut &index[doc_pos * size_of_u32..])? as usize;
let end_offset = u32::deserialize(&mut &index[(doc_pos + 1) * size_of_u32..])
.unwrap_or(index_start as u32) as usize;
Ok(start_offset..end_offset)
}
#[cfg(feature = "quickwit")]
impl StoreReader {
async fn read_block_async(&self, checkpoint: &Checkpoint) -> crate::AsyncIoResult<Block> {
let cache_key = checkpoint.byte_range.start;
if let Some(block) = self.cache.get_from_cache(checkpoint.byte_range.start) {
return Ok(block);
}
let compressed_block = self
.data
.slice(checkpoint.byte_range.clone())
.read_bytes_async()
.await?;
let decompressed_block =
OwnedBytes::new(self.decompressor.decompress(compressed_block.as_ref())?);
self.cache
.put_into_cache(cache_key, decompressed_block.clone());
Ok(decompressed_block)
}
pub async fn get_document_bytes_async(&self, doc_id: DocId) -> crate::Result<OwnedBytes> {
let checkpoint = self.block_checkpoint(doc_id)?;
let block = self.read_block_async(&checkpoint).await?;
Self::get_document_bytes_from_block(block, doc_id, &checkpoint)
}
pub async fn get_async(&self, doc_id: DocId) -> crate::Result<Document> {
let mut doc_bytes = self.get_document_bytes_async(doc_id).await?;
Ok(Document::deserialize(&mut doc_bytes)?)
}
}
#[cfg(test)]
mod tests {
use std::path::Path;
use super::*;
use crate::directory::RamDirectory;
use crate::schema::{Document, Field};
use crate::store::tests::write_lorem_ipsum_store;
use crate::store::Compressor;
use crate::Directory;
const BLOCK_SIZE: usize = 16_384;
fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> {
doc.get_first(*field).and_then(|f| f.as_text())
}
#[test]
fn test_store_lru_cache() -> crate::Result<()> {
let directory = RamDirectory::create();
let path = Path::new("store");
let writer = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE, true);
let title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file, DOCSTORE_CACHE_CAPACITY)?;
assert_eq!(store.cache.len(), 0);
assert_eq!(store.cache_stats().cache_hits, 0);
assert_eq!(store.cache_stats().cache_misses, 0);
let doc = store.get(0)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
assert_eq!(store.cache.len(), 1);
assert_eq!(store.cache_stats().cache_hits, 0);
assert_eq!(store.cache_stats().cache_misses, 1);
assert_eq!(store.cache.peek_lru(), Some(0));
let doc = store.get(499)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 499"));
assert_eq!(store.cache.len(), 2);
assert_eq!(store.cache_stats().cache_hits, 0);
assert_eq!(store.cache_stats().cache_misses, 2);
assert_eq!(store.cache.peek_lru(), Some(0));
let doc = store.get(0)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
assert_eq!(store.cache.len(), 2);
assert_eq!(store.cache_stats().cache_hits, 1);
assert_eq!(store.cache_stats().cache_misses, 2);
assert_eq!(store.cache.peek_lru(), Some(11163));
Ok(())
}
}