use std::cmp::Ordering;
use std::io::{self, Write};
use common::{BinarySerializable, CountingWriter, VInt};
use fail::fail_point;
use super::TermInfo;
use crate::core::Segment;
use crate::directory::{CompositeWrite, WritePtr};
use crate::fieldnorm::FieldNormReader;
use crate::positions::PositionSerializer;
use crate::postings::compression::{BlockEncoder, VIntEncoder, COMPRESSION_BLOCK_SIZE};
use crate::postings::skip::SkipSerializer;
use crate::query::Bm25Weight;
use crate::schema::{Field, FieldEntry, FieldType, IndexRecordOption, Schema};
use crate::termdict::{TermDictionaryBuilder, TermOrdinal};
use crate::{DocId, Score};
pub struct InvertedIndexSerializer {
terms_write: CompositeWrite<WritePtr>,
postings_write: CompositeWrite<WritePtr>,
positions_write: CompositeWrite<WritePtr>,
schema: Schema,
}
impl InvertedIndexSerializer {
pub fn open(segment: &mut Segment) -> crate::Result<InvertedIndexSerializer> {
use crate::SegmentComponent::{Positions, Postings, Terms};
let inv_index_serializer = InvertedIndexSerializer {
terms_write: CompositeWrite::wrap(segment.open_write(Terms)?),
postings_write: CompositeWrite::wrap(segment.open_write(Postings)?),
positions_write: CompositeWrite::wrap(segment.open_write(Positions)?),
schema: segment.schema(),
};
Ok(inv_index_serializer)
}
pub fn new_field(
&mut self,
field: Field,
total_num_tokens: u64,
fieldnorm_reader: Option<FieldNormReader>,
) -> io::Result<FieldSerializer> {
let field_entry: &FieldEntry = self.schema.get_field_entry(field);
let term_dictionary_write = self.terms_write.for_field(field);
let postings_write = self.postings_write.for_field(field);
let positions_write = self.positions_write.for_field(field);
let field_type: FieldType = (*field_entry.field_type()).clone();
FieldSerializer::create(
&field_type,
total_num_tokens,
term_dictionary_write,
postings_write,
positions_write,
fieldnorm_reader,
)
}
pub fn close(self) -> io::Result<()> {
self.terms_write.close()?;
self.postings_write.close()?;
self.positions_write.close()?;
Ok(())
}
}
pub struct FieldSerializer<'a> {
term_dictionary_builder: TermDictionaryBuilder<&'a mut CountingWriter<WritePtr>>,
postings_serializer: PostingsSerializer<&'a mut CountingWriter<WritePtr>>,
positions_serializer_opt: Option<PositionSerializer<&'a mut CountingWriter<WritePtr>>>,
current_term_info: TermInfo,
term_open: bool,
num_terms: TermOrdinal,
}
impl<'a> FieldSerializer<'a> {
fn create(
field_type: &FieldType,
total_num_tokens: u64,
term_dictionary_write: &'a mut CountingWriter<WritePtr>,
postings_write: &'a mut CountingWriter<WritePtr>,
positions_write: &'a mut CountingWriter<WritePtr>,
fieldnorm_reader: Option<FieldNormReader>,
) -> io::Result<FieldSerializer<'a>> {
total_num_tokens.serialize(postings_write)?;
let index_record_option = field_type
.index_record_option()
.unwrap_or(IndexRecordOption::Basic);
let term_dictionary_builder = TermDictionaryBuilder::create(term_dictionary_write)?;
let average_fieldnorm = fieldnorm_reader
.as_ref()
.map(|ff_reader| (total_num_tokens as Score / ff_reader.num_docs() as Score))
.unwrap_or(0.0);
let postings_serializer = PostingsSerializer::new(
postings_write,
average_fieldnorm,
index_record_option,
fieldnorm_reader,
);
let positions_serializer_opt = if index_record_option.has_positions() {
Some(PositionSerializer::new(positions_write))
} else {
None
};
Ok(FieldSerializer {
term_dictionary_builder,
postings_serializer,
positions_serializer_opt,
current_term_info: TermInfo::default(),
term_open: false,
num_terms: TermOrdinal::default(),
})
}
fn current_term_info(&self) -> TermInfo {
let positions_start =
if let Some(positions_serializer) = self.positions_serializer_opt.as_ref() {
positions_serializer.written_bytes()
} else {
0u64
} as usize;
let addr = self.postings_serializer.written_bytes() as usize;
TermInfo {
doc_freq: 0,
postings_range: addr..addr,
positions_range: positions_start..positions_start,
}
}
pub fn new_term(&mut self, term: &[u8], term_doc_freq: u32) -> io::Result<TermOrdinal> {
assert!(
!self.term_open,
"Called new_term, while the previous term was not closed."
);
self.term_open = true;
self.postings_serializer.clear();
self.current_term_info = self.current_term_info();
self.term_dictionary_builder.insert_key(term)?;
let term_ordinal = self.num_terms;
self.num_terms += 1;
self.postings_serializer.new_term(term_doc_freq);
Ok(term_ordinal)
}
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32, position_deltas: &[u32]) {
self.current_term_info.doc_freq += 1;
self.postings_serializer.write_doc(doc_id, term_freq);
if let Some(ref mut positions_serializer) = self.positions_serializer_opt.as_mut() {
assert_eq!(term_freq as usize, position_deltas.len());
positions_serializer.write_positions_delta(position_deltas);
}
}
pub fn close_term(&mut self) -> io::Result<()> {
fail_point!("FieldSerializer::close_term", |msg: Option<String>| {
Err(io::Error::new(io::ErrorKind::Other, format!("{:?}", msg)))
});
if self.term_open {
self.postings_serializer
.close_term(self.current_term_info.doc_freq)?;
self.current_term_info.postings_range.end =
self.postings_serializer.written_bytes() as usize;
if let Some(positions_serializer) = self.positions_serializer_opt.as_mut() {
positions_serializer.close_term()?;
self.current_term_info.positions_range.end =
positions_serializer.written_bytes() as usize;
}
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.term_open = false;
}
Ok(())
}
pub fn close(mut self) -> io::Result<()> {
self.close_term()?;
if let Some(positions_serializer) = self.positions_serializer_opt {
positions_serializer.close()?;
}
self.postings_serializer.close()?;
self.term_dictionary_builder.finish()?;
Ok(())
}
}
struct Block {
doc_ids: [DocId; COMPRESSION_BLOCK_SIZE],
term_freqs: [u32; COMPRESSION_BLOCK_SIZE],
len: usize,
}
impl Block {
fn new() -> Self {
Block {
doc_ids: [0u32; COMPRESSION_BLOCK_SIZE],
term_freqs: [0u32; COMPRESSION_BLOCK_SIZE],
len: 0,
}
}
fn doc_ids(&self) -> &[DocId] {
&self.doc_ids[..self.len]
}
fn term_freqs(&self) -> &[u32] {
&self.term_freqs[..self.len]
}
fn clear(&mut self) {
self.len = 0;
}
fn append_doc(&mut self, doc: DocId, term_freq: u32) {
let len = self.len;
self.doc_ids[len] = doc;
self.term_freqs[len] = term_freq;
self.len = len + 1;
}
fn is_full(&self) -> bool {
self.len == COMPRESSION_BLOCK_SIZE
}
fn is_empty(&self) -> bool {
self.len == 0
}
fn last_doc(&self) -> DocId {
assert_eq!(self.len, COMPRESSION_BLOCK_SIZE);
self.doc_ids[COMPRESSION_BLOCK_SIZE - 1]
}
}
pub struct PostingsSerializer<W: Write> {
output_write: CountingWriter<W>,
last_doc_id_encoded: u32,
block_encoder: BlockEncoder,
block: Box<Block>,
postings_write: Vec<u8>,
skip_write: SkipSerializer,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
bm25_weight: Option<Bm25Weight>,
avg_fieldnorm: Score,
}
impl<W: Write> PostingsSerializer<W> {
pub fn new(
write: W,
avg_fieldnorm: Score,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
) -> PostingsSerializer<W> {
PostingsSerializer {
output_write: CountingWriter::wrap(write),
block_encoder: BlockEncoder::new(),
block: Box::new(Block::new()),
postings_write: Vec::new(),
skip_write: SkipSerializer::new(),
last_doc_id_encoded: 0u32,
mode,
fieldnorm_reader,
bm25_weight: None,
avg_fieldnorm,
}
}
pub fn new_term(&mut self, term_doc_freq: u32) {
self.bm25_weight = None;
if !self.mode.has_freq() {
return;
}
let num_docs_in_segment: u64 =
if let Some(fieldnorm_reader) = self.fieldnorm_reader.as_ref() {
fieldnorm_reader.num_docs() as u64
} else {
return;
};
if num_docs_in_segment == 0 {
return;
}
self.bm25_weight = Some(Bm25Weight::for_one_term(
term_doc_freq as u64,
num_docs_in_segment,
self.avg_fieldnorm,
));
}
fn write_block(&mut self) {
{
let (num_bits, block_encoded): (u8, &[u8]) = self
.block_encoder
.compress_block_sorted(self.block.doc_ids(), self.last_doc_id_encoded);
self.last_doc_id_encoded = self.block.last_doc();
self.skip_write
.write_doc(self.last_doc_id_encoded, num_bits);
self.postings_write.extend(block_encoded);
}
if self.mode.has_freq() {
let (num_bits, block_encoded): (u8, &[u8]) = self
.block_encoder
.compress_block_unsorted(self.block.term_freqs());
self.postings_write.extend(block_encoded);
self.skip_write.write_term_freq(num_bits);
if self.mode.has_positions() {
let sum_freq = self.block.term_freqs().iter().cloned().sum();
self.skip_write.write_total_term_freq(sum_freq);
}
let mut blockwand_params = (0u8, 0u32);
if let Some(bm25_weight) = self.bm25_weight.as_ref() {
if let Some(fieldnorm_reader) = self.fieldnorm_reader.as_ref() {
let docs = self.block.doc_ids().iter().cloned();
let term_freqs = self.block.term_freqs().iter().cloned();
let fieldnorms = docs.map(|doc| fieldnorm_reader.fieldnorm_id(doc));
blockwand_params = fieldnorms
.zip(term_freqs)
.max_by(
|(left_fieldnorm_id, left_term_freq),
(right_fieldnorm_id, right_term_freq)| {
let left_score =
bm25_weight.tf_factor(*left_fieldnorm_id, *left_term_freq);
let right_score =
bm25_weight.tf_factor(*right_fieldnorm_id, *right_term_freq);
left_score
.partial_cmp(&right_score)
.unwrap_or(Ordering::Equal)
},
)
.unwrap();
}
}
let (fieldnorm_id, term_freq) = blockwand_params;
self.skip_write.write_blockwand_max(fieldnorm_id, term_freq);
}
self.block.clear();
}
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32) {
self.block.append_doc(doc_id, term_freq);
if self.block.is_full() {
self.write_block();
}
}
fn close(mut self) -> io::Result<()> {
self.postings_write.flush()
}
pub fn close_term(&mut self, doc_freq: u32) -> io::Result<()> {
if !self.block.is_empty() {
{
let block_encoded = self
.block_encoder
.compress_vint_sorted(self.block.doc_ids(), self.last_doc_id_encoded);
self.postings_write.write_all(block_encoded)?;
}
if self.mode.has_freq() {
let block_encoded = self
.block_encoder
.compress_vint_unsorted(self.block.term_freqs());
self.postings_write.write_all(block_encoded)?;
}
self.block.clear();
}
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
let skip_data = self.skip_write.data();
VInt(skip_data.len() as u64).serialize(&mut self.output_write)?;
self.output_write.write_all(skip_data)?;
}
self.output_write.write_all(&self.postings_write[..])?;
self.skip_write.clear();
self.postings_write.clear();
self.bm25_weight = None;
Ok(())
}
fn written_bytes(&self) -> u64 {
self.output_write.written_bytes()
}
fn clear(&mut self) {
self.block.clear();
self.last_doc_id_encoded = 0;
}
}