use std::sync::Arc;
use columnar::{
ColumnType, ColumnarReader, MergeRowOrder, RowAddr, ShuffleMergeOrder, StackMergeOrder,
};
use common::ReadOnlyBitSet;
use itertools::Itertools;
use measure_time::debug_time;
use crate::directory::WritePtr;
use crate::docset::{DocSet, TERMINATED};
use crate::error::DataCorruption;
use crate::fastfield::AliveBitSet;
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::index::{Segment, SegmentComponent, SegmentReader};
use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping};
use crate::indexer::SegmentSerializer;
use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings};
use crate::schema::{value_type_to_column_type, Field, FieldType, Schema};
use crate::store::StoreWriter;
use crate::termdict::{TermMerger, TermOrdinal};
use crate::{DocAddress, DocId, InvertedIndexReader};
pub const MAX_DOC_LIMIT: u32 = 1 << 31;
fn estimate_total_num_tokens_in_single_segment(
reader: &SegmentReader,
field: Field,
) -> crate::Result<u64> {
if !reader.has_deletes() {
return Ok(reader.inverted_index(field)?.total_num_tokens());
}
if let Some(fieldnorm_reader) = reader.fieldnorms_readers().get_field(field)? {
let mut count: [usize; 256] = [0; 256];
for doc in reader.doc_ids_alive() {
let fieldnorm_id = fieldnorm_reader.fieldnorm_id(doc);
count[fieldnorm_id as usize] += 1;
}
let total_num_tokens = count
.iter()
.cloned()
.enumerate()
.map(|(fieldnorm_ord, count)| {
count as u64 * u64::from(FieldNormReader::id_to_fieldnorm(fieldnorm_ord as u8))
})
.sum::<u64>();
return Ok(total_num_tokens);
}
let segment_num_tokens = reader.inverted_index(field)?.total_num_tokens();
if reader.max_doc() == 0 {
return Ok(0u64);
}
let ratio = reader.num_docs() as f64 / reader.max_doc() as f64;
Ok((segment_num_tokens as f64 * ratio) as u64)
}
fn estimate_total_num_tokens(readers: &[SegmentReader], field: Field) -> crate::Result<u64> {
let mut total_num_tokens: u64 = 0;
for reader in readers {
total_num_tokens += estimate_total_num_tokens_in_single_segment(reader, field)?;
}
Ok(total_num_tokens)
}
pub struct IndexMerger {
schema: Schema,
pub(crate) readers: Vec<SegmentReader>,
max_doc: u32,
}
struct DeltaComputer {
buffer: Vec<u32>,
}
impl DeltaComputer {
fn new() -> DeltaComputer {
DeltaComputer {
buffer: vec![0u32; 512],
}
}
fn compute_delta(&mut self, positions: &[u32]) -> &[u32] {
if positions.len() > self.buffer.len() {
self.buffer.resize(positions.len(), 0u32);
}
let mut last_pos = 0u32;
for (cur_pos, dest) in positions.iter().cloned().zip(self.buffer.iter_mut()) {
*dest = cur_pos - last_pos;
last_pos = cur_pos;
}
&self.buffer[..positions.len()]
}
}
fn convert_to_merge_order(
columnars: &[&ColumnarReader],
doc_id_mapping: SegmentDocIdMapping,
) -> MergeRowOrder {
match doc_id_mapping.mapping_type() {
MappingType::Stacked => MergeRowOrder::Stack(StackMergeOrder::stack(columnars)),
MappingType::StackedWithDeletes => {
let new_row_id_to_old_row_id: Vec<RowAddr> = doc_id_mapping
.new_doc_id_to_old_doc_addr
.into_iter()
.map(|doc_addr| RowAddr {
segment_ord: doc_addr.segment_ord,
row_id: doc_addr.doc_id,
})
.collect();
MergeRowOrder::Shuffled(ShuffleMergeOrder {
new_row_id_to_old_row_id,
alive_bitsets: doc_id_mapping.alive_bitsets,
})
}
}
}
fn extract_fast_field_required_columns(schema: &Schema) -> Vec<(String, ColumnType)> {
schema
.fields()
.map(|(_, field_entry)| field_entry)
.filter(|field_entry| field_entry.is_fast())
.filter_map(|field_entry| {
let column_name = field_entry.name().to_string();
let column_type = value_type_to_column_type(field_entry.field_type().value_type())?;
Some((column_name, column_type))
})
.collect()
}
impl IndexMerger {
pub fn open(schema: Schema, segments: &[Segment]) -> crate::Result<IndexMerger> {
let alive_bitset = segments.iter().map(|_| None).collect_vec();
Self::open_with_custom_alive_set(schema, segments, alive_bitset)
}
pub fn open_with_custom_alive_set(
schema: Schema,
segments: &[Segment],
alive_bitset_opt: Vec<Option<AliveBitSet>>,
) -> crate::Result<IndexMerger> {
let mut readers = vec![];
for (segment, new_alive_bitset_opt) in segments.iter().zip(alive_bitset_opt) {
if segment.meta().num_docs() > 0 {
let reader =
SegmentReader::open_with_custom_alive_set(segment, new_alive_bitset_opt)?;
readers.push(reader);
}
}
let max_doc = readers.iter().map(|reader| reader.num_docs()).sum();
if max_doc >= MAX_DOC_LIMIT {
let err_msg = format!(
"The segment resulting from this merge would have {max_doc} docs,which exceeds \
the limit {MAX_DOC_LIMIT}."
);
return Err(crate::LucivyError::InvalidArgument(err_msg));
}
Ok(IndexMerger {
schema,
readers,
max_doc,
})
}
fn write_fieldnorms(
&self,
mut fieldnorms_serializer: FieldNormsSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let fields = FieldNormsWriter::fields_with_fieldnorm(&self.schema);
let mut fieldnorms_data = Vec::with_capacity(self.max_doc as usize);
for field in fields {
fieldnorms_data.clear();
let fieldnorms_readers: Vec<FieldNormReader> = self
.readers
.iter()
.map(|reader| reader.get_fieldnorms_reader(field))
.collect::<Result<_, _>>()?;
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let fieldnorms_reader = &fieldnorms_readers[old_doc_addr.segment_ord as usize];
let fieldnorm_id = fieldnorms_reader.fieldnorm_id(old_doc_addr.doc_id);
fieldnorms_data.push(fieldnorm_id);
}
fieldnorms_serializer.serialize_field(field, &fieldnorms_data[..])?;
}
fieldnorms_serializer.close()?;
Ok(())
}
fn write_fast_fields(
&self,
fast_field_wrt: &mut WritePtr,
doc_id_mapping: SegmentDocIdMapping,
) -> crate::Result<()> {
debug_time!("write-fast-fields");
let required_columns = extract_fast_field_required_columns(&self.schema);
let columnars: Vec<&ColumnarReader> = self
.readers
.iter()
.map(|reader| reader.fast_fields().columnar())
.collect();
let merge_row_order = convert_to_merge_order(&columnars[..], doc_id_mapping);
columnar::merge_columnar(
&columnars[..],
&required_columns,
merge_row_order,
fast_field_wrt,
)?;
Ok(())
}
pub(crate) fn get_doc_id_from_concatenated_data(&self) -> crate::Result<SegmentDocIdMapping> {
let total_num_new_docs = self
.readers
.iter()
.map(|reader| reader.num_docs() as usize)
.sum();
let mut mapping: Vec<DocAddress> = Vec::with_capacity(total_num_new_docs);
mapping.extend(
self.readers
.iter()
.enumerate()
.flat_map(|(segment_ord, reader)| {
reader.doc_ids_alive().map(move |doc_id| DocAddress {
segment_ord: segment_ord as u32,
doc_id,
})
}),
);
let has_deletes: bool = self.readers.iter().any(SegmentReader::has_deletes);
let mapping_type = if has_deletes {
MappingType::StackedWithDeletes
} else {
MappingType::Stacked
};
let alive_bitsets: Vec<Option<ReadOnlyBitSet>> = self
.readers
.iter()
.map(|reader| {
let alive_bitset = reader.alive_bitset()?;
Some(alive_bitset.bitset().clone())
})
.collect();
Ok(SegmentDocIdMapping::new(
mapping,
mapping_type,
alive_bitsets,
))
}
fn write_postings_for_field(
&self,
indexed_field: Field,
_field_type: &FieldType,
serializer: &mut InvertedIndexSerializer,
fieldnorm_reader: Option<FieldNormReader>,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
debug_time!("write-postings-for-field");
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
let mut delta_computer = DeltaComputer::new();
let mut max_term_ords: Vec<TermOrdinal> = Vec::new();
let field_readers: Vec<Arc<InvertedIndexReader>> = self
.readers
.iter()
.map(|reader| reader.inverted_index(indexed_field))
.collect::<crate::Result<Vec<_>>>()?;
let mut field_term_streams = Vec::new();
for field_reader in &field_readers {
let terms = field_reader.terms();
field_term_streams.push(terms.stream()?);
max_term_ords.push(terms.num_terms() as u64);
}
let mut merged_terms = TermMerger::new(field_term_streams);
let mut merged_doc_id_map: Vec<Vec<Option<DocId>>> = self
.readers
.iter()
.map(|reader| {
let mut segment_local_map = vec![];
segment_local_map.resize(reader.max_doc() as usize, None);
segment_local_map
})
.collect();
for (new_doc_id, old_doc_addr) in doc_id_mapping.iter_old_doc_addrs().enumerate() {
let segment_map = &mut merged_doc_id_map[old_doc_addr.segment_ord as usize];
segment_map[old_doc_addr.doc_id as usize] = Some(new_doc_id as DocId);
}
let total_num_tokens: u64 = estimate_total_num_tokens(&self.readers, indexed_field)?;
let mut field_serializer =
serializer.new_field(indexed_field, total_num_tokens, fieldnorm_reader)?;
let field_entry = self.schema.get_field_entry(indexed_field);
let segment_postings_option = field_entry.field_type().get_index_record_option().expect(
"Encountered a field that is not supposed to be
indexed. Have you modified the schema?",
);
let mut segment_postings_containing_the_term: Vec<(usize, SegmentPostings)> = vec![];
while merged_terms.advance() {
segment_postings_containing_the_term.clear();
let term_bytes: &[u8] = merged_terms.key();
let mut total_doc_freq = 0;
for (segment_ord, term_info) in merged_terms.current_segment_ords_and_term_infos() {
let segment_reader = &self.readers[segment_ord];
let inverted_index: &InvertedIndexReader = &field_readers[segment_ord];
let segment_postings = inverted_index
.read_postings_from_terminfo(&term_info, segment_postings_option)?;
let alive_bitset_opt = segment_reader.alive_bitset();
let doc_freq = if let Some(alive_bitset) = alive_bitset_opt {
segment_postings.doc_freq_given_deletes(alive_bitset)
} else {
segment_postings.doc_freq()
};
if doc_freq > 0u32 {
total_doc_freq += doc_freq;
segment_postings_containing_the_term.push((segment_ord, segment_postings));
}
}
if total_doc_freq == 0u32 {
continue;
}
assert!(!segment_postings_containing_the_term.is_empty());
let has_term_freq = {
let has_term_freq = !segment_postings_containing_the_term[0]
.1
.block_cursor
.freqs()
.is_empty();
for (_, postings) in &segment_postings_containing_the_term[1..] {
if has_term_freq == postings.block_cursor.freqs().is_empty() {
return Err(DataCorruption::comment_only(
"Term freqs are inconsistent across segments",
)
.into());
}
}
has_term_freq
};
field_serializer.new_term(term_bytes, total_doc_freq, has_term_freq)?;
for (segment_ord, mut segment_postings) in
segment_postings_containing_the_term.drain(..)
{
let old_to_new_doc_id = &merged_doc_id_map[segment_ord];
let mut doc = segment_postings.doc();
while doc != TERMINATED {
if let Some(remapped_doc_id) = old_to_new_doc_id[doc as usize] {
let term_freq = if has_term_freq {
segment_postings.positions(&mut positions_buffer);
segment_postings.term_freq()
} else {
positions_buffer.clear();
0u32
};
let delta_positions = delta_computer.compute_delta(&positions_buffer);
field_serializer.write_doc(remapped_doc_id, term_freq, delta_positions);
}
doc = segment_postings.advance();
}
}
field_serializer.close_term()?;
}
field_serializer.close()?;
Ok(())
}
fn write_postings(
&self,
serializer: &mut InvertedIndexSerializer,
fieldnorm_readers: FieldNormReaders,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
for (field, field_entry) in self.schema.fields() {
let fieldnorm_reader = fieldnorm_readers.get_field(field)?;
if field_entry.is_indexed() {
self.write_postings_for_field(
field,
field_entry.field_type(),
serializer,
fieldnorm_reader,
doc_id_mapping,
)?;
}
}
Ok(())
}
fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> crate::Result<()> {
debug_time!("write-storable-fields");
debug!("write-storable-field");
for reader in &self.readers {
let store_reader = reader.get_store_reader(1)?;
if reader.has_deletes()
|| store_reader.block_checkpoints().take(7).count() < 6
|| store_reader.decompressor() != store_writer.compressor().into()
{
for doc_bytes_res in store_reader.iter_raw(reader.alive_bitset()) {
let doc_bytes = doc_bytes_res?;
store_writer.store_bytes(&doc_bytes)?;
}
} else {
store_writer.stack(store_reader)?;
}
}
Ok(())
}
pub fn write(&self, mut serializer: SegmentSerializer) -> crate::Result<u32> {
let doc_id_mapping = self.get_doc_id_from_concatenated_data()?;
debug!("write-fieldnorms");
if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {
self.write_fieldnorms(fieldnorms_serializer, &doc_id_mapping)?;
}
debug!("write-postings");
let fieldnorm_data = serializer
.segment()
.open_read(SegmentComponent::FieldNorms)?;
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
self.write_postings(
serializer.get_postings_serializer(),
fieldnorm_readers,
&doc_id_mapping,
)?;
debug!("write-storagefields");
self.write_storable_fields(serializer.get_store_writer())?;
debug!("write-fastfields");
self.write_fast_fields(serializer.get_fast_field_write(), doc_id_mapping)?;
debug!("close-serializer");
serializer.close()?;
Ok(self.max_doc)
}
}
#[cfg(test)]
mod tests {
use columnar::Column;
use proptest::prop_oneof;
use proptest::strategy::Strategy;
use schema::FAST;
use crate::collector::tests::{
BytesFastFieldTestCollector, FastFieldTestCollector, TEST_COLLECTOR_WITH_SCORE,
};
use crate::collector::{Count, FacetCollector};
use crate::index::{Index, SegmentId};
use crate::indexer::NoMergePolicy;
use crate::query::{AllQuery, BooleanQuery, EnableScoring, Scorer, TermQuery};
use crate::schema::{
Facet, FacetOptions, IndexRecordOption, NumericOptions, LucivyDocument, Term,
TextFieldIndexing, Value, INDEXED, TEXT,
};
use crate::time::OffsetDateTime;
use crate::{
assert_nearly_equals, schema, DateTime, DocAddress, DocId, DocSet, IndexSettings,
IndexWriter, Searcher,
};
#[test]
fn test_index_merger_no_deletes() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let text_fieldtype = schema::TextOptions::default()
.set_indexing_options(
TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs),
)
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let date_field = schema_builder.add_date_field("date", INDEXED);
let score_fieldtype = schema::NumericOptions::default().set_fast();
let score_field = schema_builder.add_u64_field("score", score_fieldtype);
let bytes_score_field = schema_builder.add_bytes_field("score_bytes", FAST);
let index = Index::create_in_ram(schema_builder.build());
let reader = index.reader()?;
let curr_time = OffsetDateTime::now_utc();
{
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(
text_field => "af b",
score_field => 3u64,
date_field => DateTime::from_utc(curr_time),
bytes_score_field => 3u32.to_be_bytes().as_ref()
))?;
index_writer.add_document(doc!(
text_field => "a b c",
score_field => 5u64,
bytes_score_field => 5u32.to_be_bytes().as_ref()
))?;
index_writer.add_document(doc!(
text_field => "a b c d",
score_field => 7u64,
bytes_score_field => 7u32.to_be_bytes().as_ref()
))?;
index_writer.commit()?;
index_writer.add_document(doc!(
text_field => "af b",
date_field => DateTime::from_utc(curr_time),
score_field => 11u64,
bytes_score_field => 11u32.to_be_bytes().as_ref()
))?;
index_writer.add_document(doc!(
text_field => "a b c g",
score_field => 13u64,
bytes_score_field => 13u32.to_be_bytes().as_ref()
))?;
index_writer.commit()?;
}
{
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer: IndexWriter = index.writer_for_tests()?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}
{
reader.reload()?;
let searcher = reader.searcher();
let get_doc_ids = |terms: Vec<Term>| {
let query = BooleanQuery::new_multiterms_query(terms);
searcher
.search(&query, &TEST_COLLECTOR_WITH_SCORE)
.map(|top_docs| top_docs.docs().to_vec())
};
{
assert_eq!(
get_doc_ids(vec![Term::from_field_text(text_field, "a")])?,
vec![
DocAddress::new(0, 1),
DocAddress::new(0, 2),
DocAddress::new(0, 4)
]
);
assert_eq!(
get_doc_ids(vec![Term::from_field_text(text_field, "af")])?,
vec![DocAddress::new(0, 0), DocAddress::new(0, 3)]
);
assert_eq!(
get_doc_ids(vec![Term::from_field_text(text_field, "g")])?,
vec![DocAddress::new(0, 4)]
);
assert_eq!(
get_doc_ids(vec![Term::from_field_text(text_field, "b")])?,
vec![
DocAddress::new(0, 0),
DocAddress::new(0, 1),
DocAddress::new(0, 2),
DocAddress::new(0, 3),
DocAddress::new(0, 4)
]
);
assert_eq!(
get_doc_ids(vec![Term::from_field_date_for_search(
date_field,
DateTime::from_utc(curr_time)
)])?,
vec![DocAddress::new(0, 0), DocAddress::new(0, 3)]
);
}
{
let doc = searcher.doc::<LucivyDocument>(DocAddress::new(0, 0))?;
assert_eq!(
doc.get_first(text_field).unwrap().as_value().as_str(),
Some("af b")
);
}
{
let doc = searcher.doc::<LucivyDocument>(DocAddress::new(0, 1))?;
assert_eq!(
doc.get_first(text_field).unwrap().as_value().as_str(),
Some("a b c")
);
}
{
let doc = searcher.doc::<LucivyDocument>(DocAddress::new(0, 2))?;
assert_eq!(
doc.get_first(text_field).unwrap().as_value().as_str(),
Some("a b c d")
);
}
{
let doc = searcher.doc::<LucivyDocument>(DocAddress::new(0, 3))?;
assert_eq!(doc.get_first(text_field).unwrap().as_str(), Some("af b"));
}
{
let doc = searcher.doc::<LucivyDocument>(DocAddress::new(0, 4))?;
assert_eq!(doc.get_first(text_field).unwrap().as_str(), Some("a b c g"));
}
{
let get_fast_vals = |terms: Vec<Term>| {
let query = BooleanQuery::new_multiterms_query(terms);
searcher.search(&query, &FastFieldTestCollector::for_field("score"))
};
let get_fast_vals_bytes = |terms: Vec<Term>| {
let query = BooleanQuery::new_multiterms_query(terms);
searcher.search(
&query,
&BytesFastFieldTestCollector::for_field("score_bytes"),
)
};
assert_eq!(
get_fast_vals(vec![Term::from_field_text(text_field, "a")])?,
vec![5, 7, 13]
);
assert_eq!(
get_fast_vals_bytes(vec![Term::from_field_text(text_field, "a")])?,
vec![0, 0, 0, 5, 0, 0, 0, 7, 0, 0, 0, 13]
);
}
}
Ok(())
}
#[test]
fn test_index_merger_with_deletes() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let text_fieldtype = schema::TextOptions::default()
.set_indexing_options(
TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs),
)
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let score_fieldtype = schema::NumericOptions::default().set_fast();
let score_field = schema_builder.add_u64_field("score", score_fieldtype);
let bytes_score_field = schema_builder.add_bytes_field("score_bytes", FAST);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_for_tests()?;
let reader = index.reader().unwrap();
let search_term = |searcher: &Searcher, term: Term| {
let collector = FastFieldTestCollector::for_field("score");
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
searcher.search(&term_query, &collector)
};
let empty_vec = Vec::<u64>::new();
{
index_writer.add_document(doc!(
text_field => "a b d",
score_field => 1u64,
bytes_score_field => vec![0u8, 0, 0, 1],
))?;
index_writer.add_document(doc!(
text_field => "b c",
score_field => 2u64,
bytes_score_field => vec![0u8, 0, 0, 2],
))?;
index_writer.delete_term(Term::from_field_text(text_field, "c"));
index_writer.add_document(doc!(
text_field => "c d",
score_field => 3u64,
bytes_score_field => vec![0u8, 0, 0, 3],
))?;
index_writer.commit()?;
reader.reload()?;
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 2);
assert_eq!(searcher.segment_readers()[0].num_docs(), 2);
assert_eq!(searcher.segment_readers()[0].max_doc(), 3);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "a"))?,
vec![1]
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "b"))?,
vec![1]
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "c"))?,
vec![3]
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "d"))?,
vec![1, 3]
);
}
{
index_writer.add_document(doc!(
text_field => "a d e",
score_field => 4_000u64,
bytes_score_field => vec![0u8, 0, 0, 4],
))?;
index_writer.add_document(doc!(
text_field => "e f",
score_field => 5_000u64,
bytes_score_field => vec![0u8, 0, 0, 5],
))?;
index_writer.delete_term(Term::from_field_text(text_field, "a"));
index_writer.delete_term(Term::from_field_text(text_field, "f"));
index_writer.add_document(doc!(
text_field => "f g",
score_field => 6_000u64,
bytes_score_field => vec![0u8, 0, 23, 112],
))?;
index_writer.add_document(doc!(
text_field => "g h",
score_field => 7_000u64,
bytes_score_field => vec![0u8, 0, 27, 88],
))?;
index_writer.commit()?;
reader.reload()?;
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 2);
assert_eq!(searcher.num_docs(), 3);
assert_eq!(searcher.segment_readers()[0].num_docs(), 2);
assert_eq!(searcher.segment_readers()[0].max_doc(), 4);
assert_eq!(searcher.segment_readers()[1].num_docs(), 1);
assert_eq!(searcher.segment_readers()[1].max_doc(), 3);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "a"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "b"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "c"))?,
vec![3]
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "d"))?,
vec![3]
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "e"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "f"))?,
vec![6_000]
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "g"))?,
vec![6_000, 7_000]
);
let score_field_reader = searcher
.segment_reader(0)
.fast_fields()
.u64("score")
.unwrap();
assert_eq!(score_field_reader.min_value(), 4000);
assert_eq!(score_field_reader.max_value(), 7000);
let score_field_reader = searcher
.segment_reader(1)
.fast_fields()
.u64("score")
.unwrap();
assert_eq!(score_field_reader.min_value(), 1);
assert_eq!(score_field_reader.max_value(), 3);
}
{
let segment_ids = index.searchable_segment_ids()?;
index_writer.merge(&segment_ids).wait()?;
reader.reload()?;
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.num_docs(), 3);
assert_eq!(searcher.segment_readers()[0].num_docs(), 3);
assert_eq!(searcher.segment_readers()[0].max_doc(), 3);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "a"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "b"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "c"))?,
vec![3]
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "d"))?,
vec![3]
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "e"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "f"))?,
vec![6_000]
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "g"))?,
vec![6_000, 7_000]
);
let score_field_reader = searcher
.segment_reader(0)
.fast_fields()
.u64("score")
.unwrap();
assert_eq!(score_field_reader.min_value(), 3);
assert_eq!(score_field_reader.max_value(), 7000);
}
{
index_writer.delete_term(Term::from_field_text(text_field, "c"));
index_writer.commit()?;
reader.reload()?;
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.num_docs(), 2);
assert_eq!(searcher.segment_readers()[0].num_docs(), 2);
assert_eq!(searcher.segment_readers()[0].max_doc(), 3);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "a"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "b"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "c"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "d"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "e"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "f"))?,
vec![6_000]
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "g"))?,
vec![6_000, 7_000]
);
let score_field_reader = searcher
.segment_reader(0)
.fast_fields()
.u64("score")
.unwrap();
assert_eq!(score_field_reader.min_value(), 3);
assert_eq!(score_field_reader.max_value(), 7000);
}
{
let segment_ids = index.searchable_segment_ids()?;
index_writer.merge(&segment_ids).wait()?;
reader.reload()?;
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.num_docs(), 2);
assert_eq!(searcher.segment_readers()[0].num_docs(), 2);
assert_eq!(searcher.segment_readers()[0].max_doc(), 2);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "a"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "b"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "c"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "d"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "e"))?,
empty_vec
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "f"))?,
vec![6_000]
);
assert_eq!(
search_term(&searcher, Term::from_field_text(text_field, "g"))?,
vec![6_000, 7_000]
);
let score_field_reader = searcher
.segment_reader(0)
.fast_fields()
.u64("score")
.unwrap();
assert_eq!(score_field_reader.min_value(), 6000);
assert_eq!(score_field_reader.max_value(), 7000);
}
{
index_writer.delete_term(Term::from_field_text(text_field, "g"));
index_writer.commit()?;
let segment_ids = index.searchable_segment_ids()?;
reader.reload()?;
let searcher = reader.searcher();
assert!(segment_ids.is_empty());
assert!(searcher.segment_readers().is_empty());
assert_eq!(searcher.num_docs(), 0);
}
Ok(())
}
#[test]
fn test_merge_facets_sort_none() {
test_merge_facets(None, true)
}
fn test_merge_facets(index_settings: Option<IndexSettings>, force_segment_value_overlap: bool) {
let mut schema_builder = schema::Schema::builder();
let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
let int_options = NumericOptions::default().set_fast().set_indexed();
let int_field = schema_builder.add_u64_field("intval", int_options);
let mut index_builder = Index::builder().schema(schema_builder.build());
if let Some(settings) = index_settings {
index_builder = index_builder.settings(settings);
}
let index = index_builder.create_in_ram().unwrap();
let reader = index.reader().unwrap();
let mut int_val = 0;
{
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
let index_doc =
|index_writer: &mut IndexWriter, doc_facets: &[&str], int_val: &mut u64| {
let mut doc = LucivyDocument::default();
for facet in doc_facets {
doc.add_facet(facet_field, Facet::from(facet));
}
doc.add_u64(int_field, *int_val);
*int_val += 1;
index_writer.add_document(doc).unwrap();
};
index_doc(
&mut index_writer,
&["/top/a/firstdoc", "/top/b"],
&mut int_val,
);
index_doc(
&mut index_writer,
&["/top/a/firstdoc", "/top/b", "/top/c"],
&mut int_val,
);
index_doc(&mut index_writer, &["/top/a", "/top/b"], &mut int_val);
index_doc(&mut index_writer, &["/top/a"], &mut int_val);
index_doc(&mut index_writer, &["/top/b", "/top/d"], &mut int_val);
if force_segment_value_overlap {
index_doc(&mut index_writer, &["/top/d"], &mut 0);
index_doc(&mut index_writer, &["/top/e"], &mut 10);
index_writer.commit().expect("committed");
index_doc(&mut index_writer, &["/top/a"], &mut 5); } else {
index_doc(&mut index_writer, &["/top/d"], &mut int_val);
index_doc(&mut index_writer, &["/top/e"], &mut int_val);
index_writer.commit().expect("committed");
index_doc(&mut index_writer, &["/top/a"], &mut int_val);
}
index_doc(&mut index_writer, &["/top/b"], &mut int_val);
index_doc(&mut index_writer, &["/top/c"], &mut int_val);
index_writer.commit().expect("committed");
index_doc(&mut index_writer, &["/top/e", "/top/f"], &mut int_val);
index_writer.commit().expect("committed");
}
reader.reload().unwrap();
let test_searcher = |expected_num_docs: usize, expected: &[(&str, u64)]| {
let searcher = reader.searcher();
let mut facet_collector = FacetCollector::for_field("facet");
facet_collector.add_facet(Facet::from("/top"));
let (count, facet_counts) = searcher
.search(&AllQuery, &(Count, facet_collector))
.unwrap();
assert_eq!(count, expected_num_docs);
let facets: Vec<(String, u64)> = facet_counts
.get("/top")
.map(|(facet, count)| (facet.to_string(), count))
.collect();
assert_eq!(
facets,
expected
.iter()
.map(|&(facet_str, count)| (String::from(facet_str), count))
.collect::<Vec<_>>()
);
};
test_searcher(
11,
&[
("/top/a", 5),
("/top/b", 5),
("/top/c", 2),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1),
],
);
{
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
index_writer
.merge(&segment_ids)
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
reader.reload().unwrap();
test_searcher(
11,
&[
("/top/a", 5),
("/top/b", 5),
("/top/c", 2),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1),
],
);
}
{
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
let facet = Facet::from_path(vec!["top", "a", "firstdoc"]);
let facet_term = Term::from_facet(facet_field, &facet);
index_writer.delete_term(facet_term);
index_writer.commit().unwrap();
reader.reload().unwrap();
test_searcher(
9,
&[
("/top/a", 3),
("/top/b", 3),
("/top/c", 1),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1),
],
);
}
}
#[test]
fn test_bug_merge() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let int_field = schema_builder.add_u64_field("intvals", INDEXED);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
index_writer.add_document(doc!(int_field => 1u64))?;
index_writer.commit().expect("commit failed");
index_writer.add_document(doc!(int_field => 1u64))?;
index_writer.commit().expect("commit failed");
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 2);
index_writer.delete_term(Term::from_field_u64(int_field, 1));
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
index_writer.merge(&segment_ids).wait()?;
reader.reload()?;
assert_eq!(reader.searcher().num_docs(), 2);
Ok(())
}
#[test]
fn test_merge_multivalued_int_fields_all_deleted() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let int_options = NumericOptions::default().set_fast().set_indexed();
let int_field = schema_builder.add_u64_field("intvals", int_options);
let index = Index::create_in_ram(schema_builder.build());
let reader = index.reader()?;
{
let mut index_writer = index.writer_for_tests()?;
let mut doc = LucivyDocument::default();
doc.add_u64(int_field, 1);
index_writer.add_document(doc.clone())?;
index_writer.commit()?;
index_writer.add_document(doc)?;
index_writer.commit()?;
index_writer.delete_term(Term::from_field_u64(int_field, 1));
let segment_ids = index.searchable_segment_ids()?;
index_writer.merge(&segment_ids).wait()?;
reader.reload()?;
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 2);
index_writer.commit()?;
index_writer.wait_merging_threads()?;
}
reader.reload()?;
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 0);
Ok(())
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum IndexingOp {
ZeroVal,
OneVal { val: u64 },
TwoVal { val: u64 },
Commit,
}
fn balanced_operation_strategy() -> impl Strategy<Value = IndexingOp> {
prop_oneof![
(0u64..1u64).prop_map(|_| IndexingOp::ZeroVal),
(0u64..1u64).prop_map(|val| IndexingOp::OneVal { val }),
(0u64..1u64).prop_map(|val| IndexingOp::TwoVal { val }),
(0u64..1u64).prop_map(|_| IndexingOp::Commit),
]
}
use proptest::prelude::*;
proptest! {
#[test]
fn test_merge_columnar_int_proptest(ops in proptest::collection::vec(balanced_operation_strategy(), 1..20)) {
assert!(test_merge_int_fields(&ops[..]).is_ok());
}
}
fn test_merge_int_fields(ops: &[IndexingOp]) -> crate::Result<()> {
if ops.iter().all(|op| *op == IndexingOp::Commit) {
return Ok(());
}
let expected_doc_and_vals: Vec<(u32, Vec<u64>)> = ops
.iter()
.filter(|op| *op != &IndexingOp::Commit)
.map(|op| match op {
IndexingOp::ZeroVal => vec![],
IndexingOp::OneVal { val } => vec![*val],
IndexingOp::TwoVal { val } => vec![*val, *val],
IndexingOp::Commit => unreachable!(),
})
.enumerate()
.map(|(id, val)| (id as u32, val))
.collect();
let mut schema_builder = schema::Schema::builder();
let int_options = NumericOptions::default().set_fast().set_indexed();
let int_field = schema_builder.add_u64_field("intvals", int_options);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
let index_doc = |index_writer: &mut IndexWriter, int_vals: &[u64]| {
let mut doc = LucivyDocument::default();
for &val in int_vals {
doc.add_u64(int_field, val);
}
index_writer.add_document(doc).unwrap();
};
for op in ops {
match op {
IndexingOp::ZeroVal => index_doc(&mut index_writer, &[]),
IndexingOp::OneVal { val } => index_doc(&mut index_writer, &[*val]),
IndexingOp::TwoVal { val } => index_doc(&mut index_writer, &[*val, *val]),
IndexingOp::Commit => {
index_writer.commit().expect("commit failed");
}
}
}
index_writer.commit().expect("commit failed");
}
{
let mut segment_ids = index.searchable_segment_ids()?;
segment_ids.sort();
let mut index_writer: IndexWriter = index.writer_for_tests()?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}
let reader = index.reader()?;
reader.reload()?;
let mut vals: Vec<u64> = Vec::new();
let mut test_vals = move |col: &Column<u64>, doc: DocId, expected: &[u64]| {
vals.clear();
vals.extend(col.values_for_doc(doc));
assert_eq!(&vals[..], expected);
};
let mut test_col = move |col: &Column<u64>, column_expected: &[(u32, Vec<u64>)]| {
for (doc_id, vals) in column_expected.iter() {
test_vals(col, *doc_id, vals);
}
};
{
let searcher = reader.searcher();
let segment = searcher.segment_reader(0u32);
let col = segment
.fast_fields()
.column_opt::<u64>("intvals")
.unwrap()
.unwrap();
test_col(&col, &expected_doc_and_vals);
}
Ok(())
}
#[test]
fn test_merge_multivalued_int_fields_simple() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let int_options = NumericOptions::default().set_fast().set_indexed();
let int_field = schema_builder.add_u64_field("intvals", int_options);
let index = Index::create_in_ram(schema_builder.build());
let mut vals: Vec<u64> = Vec::new();
let mut test_vals = move |col: &Column<u64>, doc: DocId, expected: &[u64]| {
vals.clear();
vals.extend(col.values_for_doc(doc));
assert_eq!(&vals[..], expected);
};
{
let mut index_writer = index.writer_for_tests()?;
let index_doc = |index_writer: &mut IndexWriter, int_vals: &[u64]| {
let mut doc = LucivyDocument::default();
for &val in int_vals {
doc.add_u64(int_field, val);
}
index_writer.add_document(doc).unwrap();
};
index_doc(&mut index_writer, &[1, 2]);
index_doc(&mut index_writer, &[1, 2, 3]);
index_doc(&mut index_writer, &[4, 5]);
index_doc(&mut index_writer, &[1, 2]);
index_doc(&mut index_writer, &[1, 5]);
index_doc(&mut index_writer, &[3]);
index_doc(&mut index_writer, &[17]);
assert!(index_writer.commit().is_ok());
index_doc(&mut index_writer, &[20]);
assert!(index_writer.commit().is_ok());
index_doc(&mut index_writer, &[28, 27]);
index_doc(&mut index_writer, &[1_000]);
assert!(index_writer.commit().is_ok());
}
let reader = index.reader()?;
let searcher = reader.searcher();
{
let segment = searcher.segment_reader(0u32);
let column = segment
.fast_fields()
.column_opt::<u64>("intvals")
.unwrap()
.unwrap();
test_vals(&column, 0, &[1, 2]);
test_vals(&column, 1, &[1, 2, 3]);
test_vals(&column, 2, &[4, 5]);
test_vals(&column, 3, &[1, 2]);
test_vals(&column, 4, &[1, 5]);
test_vals(&column, 5, &[3]);
test_vals(&column, 6, &[17]);
}
{
let segment = searcher.segment_reader(1u32);
let col = segment
.fast_fields()
.column_opt::<u64>("intvals")
.unwrap()
.unwrap();
test_vals(&col, 0, &[28, 27]);
test_vals(&col, 1, &[1000]);
}
{
let segment = searcher.segment_reader(2u32);
let col = segment
.fast_fields()
.column_opt::<u64>("intvals")
.unwrap()
.unwrap();
test_vals(&col, 0, &[20]);
}
{
let segment_ids = index.searchable_segment_ids()?;
let mut index_writer: IndexWriter = index.writer_for_tests()?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}
reader.reload()?;
{
let searcher = reader.searcher();
let segment = searcher.segment_reader(0u32);
let col = segment
.fast_fields()
.column_opt::<u64>("intvals")
.unwrap()
.unwrap();
test_vals(&col, 0, &[1, 2]);
test_vals(&col, 1, &[1, 2, 3]);
test_vals(&col, 2, &[4, 5]);
test_vals(&col, 3, &[1, 2]);
test_vals(&col, 4, &[1, 5]);
test_vals(&col, 5, &[3]);
test_vals(&col, 6, &[17]);
test_vals(&col, 7, &[28, 27]);
test_vals(&col, 8, &[1000]);
test_vals(&col, 9, &[20]);
}
Ok(())
}
#[test]
fn merges_f64_fast_fields_correctly() -> crate::Result<()> {
let mut builder = schema::SchemaBuilder::new();
let fast_multi = NumericOptions::default().set_fast();
let field = builder.add_f64_field("f64", schema::FAST);
let multi_field = builder.add_f64_field("f64s", fast_multi);
let index = Index::create_in_ram(builder.build());
let mut writer = index.writer_for_tests()?;
let mut policy = crate::indexer::LogMergePolicy::default();
policy.set_min_num_segments(2);
writer.set_merge_policy(Box::new(policy));
for i in 0..100 {
let mut doc = LucivyDocument::new();
doc.add_f64(field, 42.0);
doc.add_f64(multi_field, 0.24);
doc.add_f64(multi_field, 0.27);
writer.add_document(doc)?;
if i % 5 == 0 {
writer.commit()?;
}
}
writer.commit()?;
writer.wait_merging_threads()?;
assert_eq!(1, index.searchable_segments()?.len());
Ok(())
}
#[test]
fn test_merged_index_has_blockwand() -> crate::Result<()> {
let mut builder = schema::SchemaBuilder::new();
let text = builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(builder.build());
let mut writer = index.writer_for_tests()?;
let happy_term = Term::from_field_text(text, "happy");
let term_query = TermQuery::new(happy_term, IndexRecordOption::WithFreqs);
for _ in 0..62 {
writer.add_document(doc!(text=>"hello happy tax payer"))?;
}
writer.commit()?;
let reader = index.reader()?;
let searcher = reader.searcher();
let mut term_scorer = term_query
.specialized_weight(EnableScoring::enabled_from_searcher(&searcher))?
.term_scorer_for_test(searcher.segment_reader(0u32), 1.0)?
.unwrap();
assert_eq!(term_scorer.doc(), 0);
assert_nearly_equals!(term_scorer.block_max_score(), 0.0079681855);
assert_nearly_equals!(term_scorer.score(), 0.0079681855);
for _ in 0..81 {
writer.add_document(doc!(text=>"hello happy tax payer"))?;
}
writer.commit()?;
reader.reload()?;
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 2);
for segment_reader in searcher.segment_readers() {
let mut term_scorer = term_query
.specialized_weight(EnableScoring::enabled_from_searcher(&searcher))?
.term_scorer_for_test(segment_reader, 1.0)?
.unwrap();
for doc in segment_reader.doc_ids_alive() {
assert_eq!(term_scorer.doc(), doc);
assert_nearly_equals!(term_scorer.block_max_score(), 0.003478312);
assert_nearly_equals!(term_scorer.score(), 0.003478312);
term_scorer.advance();
}
}
let segment_ids: Vec<SegmentId> = searcher
.segment_readers()
.iter()
.map(|reader| reader.segment_id())
.collect();
writer.merge(&segment_ids[..]).wait()?;
reader.reload()?;
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0u32);
let mut term_scorer = term_query
.specialized_weight(EnableScoring::enabled_from_searcher(&searcher))?
.term_scorer_for_test(segment_reader, 1.0)?
.unwrap();
for doc in segment_reader.doc_ids_alive() {
assert_eq!(term_scorer.doc(), doc);
assert_nearly_equals!(term_scorer.block_max_score(), 0.003478312);
assert_nearly_equals!(term_scorer.score(), 0.003478312);
term_scorer.advance();
}
Ok(())
}
#[test]
fn test_max_doc() {
assert!(((super::MAX_DOC_LIMIT - 1) as i32) >= 0);
assert!((super::MAX_DOC_LIMIT as i32) < 0);
}
}