use crate::common::BitSet;
use crate::common::HasLen;
use crate::common::{BinarySerializable, VInt};
use crate::docset::{DocSet, SkipResult};
use crate::positions::PositionReader;
use crate::postings::compression::{compressed_block_size, AlignedBuffer};
use crate::postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE};
use crate::postings::serializer::PostingsSerializer;
use crate::postings::BlockSearcher;
use crate::postings::FreqReadingOption;
use crate::postings::Postings;
use crate::postings::SkipReader;
use crate::postings::USE_SKIP_INFO_LIMIT;
use crate::schema::IndexRecordOption;
use crate::DocId;
use owned_read::OwnedRead;
use std::cmp::Ordering;
use tantivy_fst::Streamer;
struct PositionComputer {
position_to_skip: usize,
position_reader: PositionReader,
}
impl PositionComputer {
pub fn new(position_reader: PositionReader) -> PositionComputer {
PositionComputer {
position_to_skip: 0,
position_reader,
}
}
pub fn add_skip(&mut self, num_skip: usize) {
self.position_to_skip += num_skip;
}
pub fn positions_with_offset(&mut self, offset: u32, output: &mut [u32]) {
self.position_reader.skip(self.position_to_skip);
self.position_to_skip = 0;
self.position_reader.read(output);
let mut cum = offset;
for output_mut in output.iter_mut() {
cum += *output_mut;
*output_mut = cum;
}
}
}
pub struct SegmentPostings {
block_cursor: BlockSegmentPostings,
cur: usize,
position_computer: Option<PositionComputer>,
block_searcher: BlockSearcher,
}
impl SegmentPostings {
pub fn empty() -> Self {
let empty_block_cursor = BlockSegmentPostings::empty();
SegmentPostings {
block_cursor: empty_block_cursor,
cur: COMPRESSION_BLOCK_SIZE,
position_computer: None,
block_searcher: BlockSearcher::default(),
}
}
pub fn create_from_docs(docs: &[u32]) -> SegmentPostings {
let mut buffer = Vec::new();
{
let mut postings_serializer = PostingsSerializer::new(&mut buffer, false, false);
for &doc in docs {
postings_serializer.write_doc(doc, 1u32);
}
postings_serializer
.close_term(docs.len() as u32)
.expect("In memory Serialization should never fail.");
}
let block_segment_postings = BlockSegmentPostings::from_data(
docs.len() as u32,
OwnedRead::new(buffer),
IndexRecordOption::Basic,
IndexRecordOption::Basic,
);
SegmentPostings::from_block_postings(block_segment_postings, None)
}
}
impl SegmentPostings {
pub(crate) fn from_block_postings(
segment_block_postings: BlockSegmentPostings,
positions_stream_opt: Option<PositionReader>,
) -> SegmentPostings {
SegmentPostings {
block_cursor: segment_block_postings,
cur: COMPRESSION_BLOCK_SIZE,
position_computer: positions_stream_opt.map(PositionComputer::new),
block_searcher: BlockSearcher::default(),
}
}
}
impl DocSet for SegmentPostings {
#[inline]
fn advance(&mut self) -> bool {
if self.position_computer.is_some() && self.cur < COMPRESSION_BLOCK_SIZE {
let term_freq = self.term_freq() as usize;
if let Some(position_computer) = self.position_computer.as_mut() {
position_computer.add_skip(term_freq);
}
}
self.cur += 1;
if self.cur >= self.block_cursor.block_len() {
self.cur = 0;
if !self.block_cursor.advance() {
self.cur = COMPRESSION_BLOCK_SIZE;
return false;
}
}
true
}
fn skip_next(&mut self, target: DocId) -> SkipResult {
if !self.advance() {
return SkipResult::End;
}
match self.doc().cmp(&target) {
Ordering::Equal => {
return SkipResult::Reached;
}
Ordering::Greater => {
return SkipResult::OverStep;
}
_ => {
}
}
let mut sum_freqs_skipped: u32 = 0;
if !self
.block_cursor
.docs()
.last()
.map(|doc| *doc >= target)
.unwrap_or(false)
{
if self.position_computer.is_some() {
sum_freqs_skipped = self.block_cursor.freqs()[self.cur..].iter().sum();
match self.block_cursor.skip_to(target) {
BlockSegmentPostingsSkipResult::Success(block_skip_freqs) => {
sum_freqs_skipped += block_skip_freqs;
}
BlockSegmentPostingsSkipResult::Terminated => {
return SkipResult::End;
}
}
} else if self.block_cursor.skip_to(target)
== BlockSegmentPostingsSkipResult::Terminated
{
return SkipResult::End;
}
self.cur = 0;
}
let cur = self.cur;
let (output, len) = self.block_cursor.docs_aligned();
let new_cur = self
.block_searcher
.search_in_block(&output, len, cur, target);
if let Some(position_computer) = self.position_computer.as_mut() {
sum_freqs_skipped += self.block_cursor.freqs()[cur..new_cur].iter().sum::<u32>();
position_computer.add_skip(sum_freqs_skipped as usize);
}
self.cur = new_cur;
let doc = output.0[new_cur];
debug_assert!(doc >= target);
if doc == target {
SkipResult::Reached
} else {
SkipResult::OverStep
}
}
#[inline]
fn doc(&self) -> DocId {
let docs = self.block_cursor.docs();
debug_assert!(
self.cur < docs.len(),
"Have you forgotten to call `.advance()` at least once before calling `.doc()` ."
);
docs[self.cur]
}
fn size_hint(&self) -> u32 {
self.len() as u32
}
fn append_to_bitset(&mut self, bitset: &mut BitSet) {
if self.advance() {
for &doc in &self.block_cursor.docs()[self.cur..] {
bitset.insert(doc);
}
while self.block_cursor.advance() {
for &doc in self.block_cursor.docs() {
bitset.insert(doc);
}
}
}
}
}
impl HasLen for SegmentPostings {
fn len(&self) -> usize {
self.block_cursor.doc_freq()
}
}
impl Postings for SegmentPostings {
fn term_freq(&self) -> u32 {
debug_assert!(
self.cur < COMPRESSION_BLOCK_SIZE,
"Have you forgotten to call `.advance()` at least once before calling \
`.term_freq()`."
);
self.block_cursor.freq(self.cur)
}
fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
let term_freq = self.term_freq() as usize;
if let Some(position_comp) = self.position_computer.as_mut() {
output.resize(term_freq, 0u32);
position_comp.positions_with_offset(offset, &mut output[..]);
} else {
output.clear();
}
}
}
pub struct BlockSegmentPostings {
doc_decoder: BlockDecoder,
freq_decoder: BlockDecoder,
freq_reading_option: FreqReadingOption,
doc_freq: usize,
doc_offset: DocId,
num_vint_docs: usize,
remaining_data: OwnedRead,
skip_reader: SkipReader,
}
fn split_into_skips_and_postings(
doc_freq: u32,
mut data: OwnedRead,
) -> (Option<OwnedRead>, OwnedRead) {
if doc_freq >= USE_SKIP_INFO_LIMIT {
let skip_len = VInt::deserialize(&mut data).expect("Data corrupted").0 as usize;
let mut postings_data = data.clone();
postings_data.advance(skip_len);
data.clip(skip_len);
(Some(data), postings_data)
} else {
(None, data)
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum BlockSegmentPostingsSkipResult {
Terminated,
Success(u32),
}
impl BlockSegmentPostings {
pub(crate) fn from_data(
doc_freq: u32,
data: OwnedRead,
record_option: IndexRecordOption,
requested_option: IndexRecordOption,
) -> BlockSegmentPostings {
let freq_reading_option = match (record_option, requested_option) {
(IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq,
(_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq,
(_, _) => FreqReadingOption::ReadFreq,
};
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, data);
let skip_reader = match skip_data_opt {
Some(skip_data) => SkipReader::new(skip_data, record_option),
None => SkipReader::new(OwnedRead::new(&[][..]), record_option),
};
let doc_freq = doc_freq as usize;
let num_vint_docs = doc_freq % COMPRESSION_BLOCK_SIZE;
BlockSegmentPostings {
num_vint_docs,
doc_decoder: BlockDecoder::new(),
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option,
doc_offset: 0,
doc_freq,
remaining_data: postings_data,
skip_reader,
}
}
pub(crate) fn reset(&mut self, doc_freq: u32, postings_data: OwnedRead) {
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, postings_data);
let num_vint_docs = (doc_freq as usize) & (COMPRESSION_BLOCK_SIZE - 1);
self.num_vint_docs = num_vint_docs;
self.remaining_data = postings_data;
if let Some(skip_data) = skip_data_opt {
self.skip_reader.reset(skip_data);
} else {
self.skip_reader.reset(OwnedRead::new(&[][..]))
}
self.doc_offset = 0;
self.doc_freq = doc_freq as usize;
}
pub fn doc_freq(&self) -> usize {
self.doc_freq
}
#[inline]
pub fn docs(&self) -> &[DocId] {
self.doc_decoder.output_array()
}
pub(crate) fn docs_aligned(&self) -> (&AlignedBuffer, usize) {
self.doc_decoder.output_aligned()
}
#[inline]
pub fn doc(&self, idx: usize) -> u32 {
self.doc_decoder.output(idx)
}
#[inline]
pub fn freqs(&self) -> &[u32] {
self.freq_decoder.output_array()
}
#[inline]
pub fn freq(&self, idx: usize) -> u32 {
self.freq_decoder.output(idx)
}
#[inline]
fn block_len(&self) -> usize {
self.doc_decoder.output_len
}
pub fn skip_to(&mut self, target_doc: DocId) -> BlockSegmentPostingsSkipResult {
let mut skip_freqs = 0u32;
while self.skip_reader.advance() {
if self.skip_reader.doc() >= target_doc {
let num_bits = self.skip_reader.doc_num_bits();
let num_consumed_bytes = self.doc_decoder.uncompress_block_sorted(
self.remaining_data.as_ref(),
self.doc_offset,
num_bits,
);
self.remaining_data.advance(num_consumed_bytes);
let tf_num_bits = self.skip_reader.tf_num_bits();
match self.freq_reading_option {
FreqReadingOption::NoFreq => {}
FreqReadingOption::SkipFreq => {
let num_bytes_to_skip = compressed_block_size(tf_num_bits);
self.remaining_data.advance(num_bytes_to_skip);
}
FreqReadingOption::ReadFreq => {
let num_consumed_bytes = self
.freq_decoder
.uncompress_block_unsorted(self.remaining_data.as_ref(), tf_num_bits);
self.remaining_data.advance(num_consumed_bytes);
}
}
self.doc_offset = self.skip_reader.doc();
return BlockSegmentPostingsSkipResult::Success(skip_freqs);
} else {
skip_freqs += self.skip_reader.tf_sum();
let advance_len = self.skip_reader.total_block_len();
self.doc_offset = self.skip_reader.doc();
self.remaining_data.advance(advance_len);
}
}
if self.num_vint_docs > 0 {
let num_compressed_bytes = self.doc_decoder.uncompress_vint_sorted(
self.remaining_data.as_ref(),
self.doc_offset,
self.num_vint_docs,
);
self.remaining_data.advance(num_compressed_bytes);
match self.freq_reading_option {
FreqReadingOption::NoFreq | FreqReadingOption::SkipFreq => {}
FreqReadingOption::ReadFreq => {
self.freq_decoder
.uncompress_vint_unsorted(self.remaining_data.as_ref(), self.num_vint_docs);
}
}
self.num_vint_docs = 0;
return self
.docs()
.last()
.map(|last_doc| {
if *last_doc >= target_doc {
BlockSegmentPostingsSkipResult::Success(skip_freqs)
} else {
BlockSegmentPostingsSkipResult::Terminated
}
})
.unwrap_or(BlockSegmentPostingsSkipResult::Terminated);
}
BlockSegmentPostingsSkipResult::Terminated
}
pub fn advance(&mut self) -> bool {
if self.skip_reader.advance() {
let num_bits = self.skip_reader.doc_num_bits();
let num_consumed_bytes = self.doc_decoder.uncompress_block_sorted(
self.remaining_data.as_ref(),
self.doc_offset,
num_bits,
);
self.remaining_data.advance(num_consumed_bytes);
let tf_num_bits = self.skip_reader.tf_num_bits();
match self.freq_reading_option {
FreqReadingOption::NoFreq => {}
FreqReadingOption::SkipFreq => {
let num_bytes_to_skip = compressed_block_size(tf_num_bits);
self.remaining_data.advance(num_bytes_to_skip);
}
FreqReadingOption::ReadFreq => {
let num_consumed_bytes = self
.freq_decoder
.uncompress_block_unsorted(self.remaining_data.as_ref(), tf_num_bits);
self.remaining_data.advance(num_consumed_bytes);
}
}
self.doc_offset = self.doc_decoder.output(COMPRESSION_BLOCK_SIZE - 1);
true
} else if self.num_vint_docs > 0 {
let num_compressed_bytes = self.doc_decoder.uncompress_vint_sorted(
self.remaining_data.as_ref(),
self.doc_offset,
self.num_vint_docs,
);
self.remaining_data.advance(num_compressed_bytes);
match self.freq_reading_option {
FreqReadingOption::NoFreq | FreqReadingOption::SkipFreq => {}
FreqReadingOption::ReadFreq => {
self.freq_decoder
.uncompress_vint_unsorted(self.remaining_data.as_ref(), self.num_vint_docs);
}
}
self.num_vint_docs = 0;
true
} else {
false
}
}
pub fn empty() -> BlockSegmentPostings {
BlockSegmentPostings {
num_vint_docs: 0,
doc_decoder: BlockDecoder::new(),
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option: FreqReadingOption::NoFreq,
doc_offset: 0,
doc_freq: 0,
remaining_data: OwnedRead::new(vec![]),
skip_reader: SkipReader::new(OwnedRead::new(vec![]), IndexRecordOption::Basic),
}
}
}
impl<'b> Streamer<'b> for BlockSegmentPostings {
type Item = &'b [DocId];
fn next(&'b mut self) -> Option<&'b [DocId]> {
if self.advance() {
Some(self.docs())
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::BlockSegmentPostings;
use super::BlockSegmentPostingsSkipResult;
use super::SegmentPostings;
use crate::common::HasLen;
use crate::core::Index;
use crate::docset::DocSet;
use crate::postings::postings::Postings;
use crate::schema::IndexRecordOption;
use crate::schema::Schema;
use crate::schema::Term;
use crate::schema::INDEXED;
use crate::DocId;
use crate::SkipResult;
use tantivy_fst::Streamer;
#[test]
fn test_empty_segment_postings() {
let mut postings = SegmentPostings::empty();
assert!(!postings.advance());
assert!(!postings.advance());
assert_eq!(postings.len(), 0);
}
#[test]
#[should_panic(expected = "Have you forgotten to call `.advance()`")]
fn test_panic_if_doc_called_before_advance() {
SegmentPostings::empty().doc();
}
#[test]
#[should_panic(expected = "Have you forgotten to call `.advance()`")]
fn test_panic_if_freq_called_before_advance() {
SegmentPostings::empty().term_freq();
}
#[test]
fn test_empty_block_segment_postings() {
let mut postings = BlockSegmentPostings::empty();
assert!(!postings.advance());
assert_eq!(postings.doc_freq(), 0);
}
#[test]
fn test_block_segment_postings() {
let mut block_segments = build_block_postings(&(0..100_000).collect::<Vec<u32>>());
let mut offset: u32 = 0u32;
assert!(block_segments.docs().is_empty());
assert_eq!(block_segments.doc_freq(), 100_000);
while let Some(block) = block_segments.next() {
for (i, doc) in block.iter().cloned().enumerate() {
assert_eq!(offset + (i as u32), doc);
}
offset += block.len() as u32;
}
}
#[test]
fn test_skip_right_at_new_block() {
let mut doc_ids = (0..128).collect::<Vec<u32>>();
doc_ids.push(129);
doc_ids.push(130);
{
let block_segments = build_block_postings(&doc_ids);
let mut docset = SegmentPostings::from_block_postings(block_segments, None);
assert_eq!(docset.skip_next(128), SkipResult::OverStep);
assert_eq!(docset.doc(), 129);
assert!(docset.advance());
assert_eq!(docset.doc(), 130);
assert!(!docset.advance());
}
{
let block_segments = build_block_postings(&doc_ids);
let mut docset = SegmentPostings::from_block_postings(block_segments, None);
assert_eq!(docset.skip_next(129), SkipResult::Reached);
assert_eq!(docset.doc(), 129);
assert!(docset.advance());
assert_eq!(docset.doc(), 130);
assert!(!docset.advance());
}
{
let block_segments = build_block_postings(&doc_ids);
let mut docset = SegmentPostings::from_block_postings(block_segments, None);
assert_eq!(docset.skip_next(131), SkipResult::End);
}
}
fn build_block_postings(docs: &[DocId]) -> BlockSegmentPostings {
let mut schema_builder = Schema::builder();
let int_field = schema_builder.add_u64_field("id", INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let mut last_doc = 0u32;
for &doc in docs {
for _ in last_doc..doc {
index_writer.add_document(doc!(int_field=>1u64));
}
index_writer.add_document(doc!(int_field=>0u64));
last_doc = doc + 1;
}
index_writer.commit().unwrap();
let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let inverted_index = segment_reader.inverted_index(int_field);
let term = Term::from_field_u64(int_field, 0u64);
let term_info = inverted_index.get_term_info(&term).unwrap();
inverted_index.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)
}
#[test]
fn test_block_segment_postings_skip() {
for i in 0..4 {
let mut block_postings = build_block_postings(&[3]);
assert_eq!(
block_postings.skip_to(i),
BlockSegmentPostingsSkipResult::Success(0u32)
);
assert_eq!(
block_postings.skip_to(i),
BlockSegmentPostingsSkipResult::Terminated
);
}
let mut block_postings = build_block_postings(&[3]);
assert_eq!(
block_postings.skip_to(4u32),
BlockSegmentPostingsSkipResult::Terminated
);
}
#[test]
fn test_block_segment_postings_skip2() {
let mut docs = vec![0];
for i in 0..1300 {
docs.push((i * i / 100) + i);
}
let mut block_postings = build_block_postings(&docs[..]);
for i in vec![0, 424, 10000] {
assert_eq!(
block_postings.skip_to(i),
BlockSegmentPostingsSkipResult::Success(0u32)
);
let docs = block_postings.docs();
assert!(docs[0] <= i);
assert!(docs.last().cloned().unwrap_or(0u32) >= i);
}
assert_eq!(
block_postings.skip_to(100_000),
BlockSegmentPostingsSkipResult::Terminated
);
assert_eq!(
block_postings.skip_to(101_000),
BlockSegmentPostingsSkipResult::Terminated
);
}
#[test]
fn test_reset_block_segment_postings() {
let mut schema_builder = Schema::builder();
let int_field = schema_builder.add_u64_field("id", INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
for i in 0..6 {
let doc = doc!(int_field=> (i % 2) as u64);
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let mut block_segments;
{
let term = Term::from_field_u64(int_field, 0u64);
let inverted_index = segment_reader.inverted_index(int_field);
let term_info = inverted_index.get_term_info(&term).unwrap();
block_segments = inverted_index
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic);
}
assert!(block_segments.advance());
assert_eq!(block_segments.docs(), &[0, 2, 4]);
{
let term = Term::from_field_u64(int_field, 1u64);
let inverted_index = segment_reader.inverted_index(int_field);
let term_info = inverted_index.get_term_info(&term).unwrap();
inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments);
}
assert!(block_segments.advance());
assert_eq!(block_segments.docs(), &[1, 3, 5]);
}
}