use error::{
ErrorKind::{CorruptIndex, IllegalArgument, UnexpectedEOF},
Result,
};
use std::any::Any;
use std::boxed::Box;
use std::cmp::min;
use std::io::{self, Read};
use std::ptr;
use std::sync::Arc;
use core::codec::codec_util::*;
use core::codec::field_infos::{FieldInfo, FieldInfos};
use core::codec::segment_infos::{segment_file_name, SegmentInfo};
use core::codec::stored_fields::{
CompressingStoredFieldsWriter, StoredFieldsFormat, StoredFieldsReader, StoredFieldsWriterEnum,
};
use core::codec::Codec;
use core::doc::{Status as VisitStatus, StoredFieldVisitor};
use core::store::directory::Directory;
use core::store::io::{ByteArrayDataInput, DataInput, IndexInput};
use core::store::IOContext;
use core::util::packed::{get_reader_iterator_no_header, get_reader_no_header};
use core::util::packed::{Format, OffsetAndLength, Reader, ReaderEnum, ReaderIterator};
use core::util::BytesRef;
use core::util::DocId;
use core::util::{CompressionMode, Decompress, Decompressor};
use core::util::{UnsignedShift, ZigZagEncoding};
pub const STORED_FIELDS_EXTENSION: &str = "fdt";
pub const STORED_FIELDS_INDEX_EXTENSION: &str = "fdx";
pub const CODEC_SFX_IDX: &str = "Index";
pub const CODEC_SFX_DAT: &str = "Data";
pub const VERSION_START: i32 = 0;
pub const VERSION_CHUNK_STATS: i32 = 1;
pub const VERSION_CURRENT: i32 = VERSION_CHUNK_STATS;
pub const STRING: i32 = 0x00;
pub const BYTE_ARR: i32 = 0x01;
pub const NUMERIC_INT: i32 = 0x02;
pub const NUMERIC_FLOAT: i32 = 0x03;
pub const NUMERIC_LONG: i32 = 0x04;
pub const NUMERIC_DOUBLE: i32 = 0x05;
pub const TYPE_BITS: i32 = 3;
pub const TYPE_MASK: i32 = 7;
pub const NEGATIVE_ZERO_FLOAT: i32 = -2_147_483_648;
pub const NEGATIVE_ZERO_DOUBLE: i64 = -9_223_372_036_854_775_808;
pub const SECOND: i64 = 1000;
pub const HOUR: i64 = 60 * 60 * SECOND;
pub const DAY: i64 = HOUR * 24;
pub const SECOND_ENCODING: i32 = 0x40;
pub const HOUR_ENCODING: i32 = 0x80;
pub const DAY_ENCODING: i32 = 0xC0;
pub struct CompressingStoredFieldsFormat {
format_name: String,
segment_suffix: String,
compression_mode: CompressionMode,
chunk_size: i32,
max_docs_per_chunk: i32,
block_size: i32,
}
impl CompressingStoredFieldsFormat {
pub fn new(
format_name: &str,
segment_suffix: &str,
compression_mode: CompressionMode,
chunk_size: i32,
max_docs_per_chunk: i32,
block_size: i32,
) -> CompressingStoredFieldsFormat {
debug_assert!(chunk_size >= 1 && max_docs_per_chunk >= 1 && block_size >= 1);
CompressingStoredFieldsFormat {
format_name: String::from(format_name),
segment_suffix: String::from(segment_suffix),
compression_mode,
chunk_size,
max_docs_per_chunk,
block_size,
}
}
}
impl StoredFieldsFormat for CompressingStoredFieldsFormat {
type Reader = CompressingStoredFieldsReader;
fn fields_reader<D: Directory, DW: Directory, C: Codec>(
&self,
directory: &DW,
si: &SegmentInfo<D, C>,
field_info: Arc<FieldInfos>,
io_ctx: &IOContext,
) -> Result<Self::Reader> {
CompressingStoredFieldsReader::new(
directory,
si,
&self.segment_suffix,
field_info,
io_ctx,
&self.format_name,
self.compression_mode,
)
}
fn fields_writer<D, DW, C>(
&self,
directory: Arc<DW>,
si: &mut SegmentInfo<D, C>,
ioctx: &IOContext,
) -> Result<StoredFieldsWriterEnum<DW::IndexOutput>>
where
D: Directory,
DW: Directory,
DW::IndexOutput: 'static,
C: Codec,
{
Ok(StoredFieldsWriterEnum::Compressing(
CompressingStoredFieldsWriter::new(
directory,
si,
&self.segment_suffix,
ioctx,
&self.format_name,
self.compression_mode,
self.chunk_size as usize,
self.max_docs_per_chunk as usize,
self.block_size as usize,
)?,
))
}
}
pub struct CompressingStoredFieldsIndexReader {
max_doc: i32,
doc_bases: Vec<i32>,
start_pointers: Vec<i64>,
avg_chunk_docs: Vec<i32>,
avg_chunk_sizes: Vec<i64>,
doc_bases_deltas: Vec<ReaderEnum>,
start_pointers_deltas: Vec<ReaderEnum>,
}
impl CompressingStoredFieldsIndexReader {
pub fn new<T: IndexInput + ?Sized, D: Directory, C: Codec>(
fields_index_in: &mut T,
si: &SegmentInfo<D, C>,
) -> Result<CompressingStoredFieldsIndexReader> {
let mut doc_bases = Vec::new();
let mut start_pointers = Vec::new();
let mut avg_chunk_docs = Vec::new();
let mut avg_chunk_sizes = Vec::new();
let mut doc_bases_deltas = Vec::new();
let mut start_pointers_deltas = Vec::new();
let packed_ints_version = fields_index_in.read_vint()?;
loop {
let num_chunks = fields_index_in.read_vint()?;
if num_chunks == 0 {
break;
}
doc_bases.push(fields_index_in.read_vint()?);
avg_chunk_docs.push(fields_index_in.read_vint()?);
let bits_per_doc_base = fields_index_in.read_vint()?;
if bits_per_doc_base > 32 {
bail!(CorruptIndex(format!(
"bits_per_doc_base: {}",
bits_per_doc_base
)));
}
doc_bases_deltas.push(get_reader_no_header(
fields_index_in,
Format::Packed,
packed_ints_version,
num_chunks as usize,
bits_per_doc_base,
)?);
start_pointers.push(fields_index_in.read_vlong()?);
avg_chunk_sizes.push(fields_index_in.read_vlong()?);
let bits_per_start_pointer = fields_index_in.read_vint()?;
if bits_per_start_pointer > 64 {
bail!(CorruptIndex(format!(
"bits_per_start_pointer: {}",
bits_per_start_pointer
)));
}
start_pointers_deltas.push(get_reader_no_header(
fields_index_in,
Format::Packed,
packed_ints_version,
num_chunks as usize,
bits_per_start_pointer,
)?);
}
Ok(CompressingStoredFieldsIndexReader {
max_doc: si.max_doc,
doc_bases,
start_pointers,
avg_chunk_docs,
avg_chunk_sizes,
doc_bases_deltas,
start_pointers_deltas,
})
}
fn block(&self, doc_id: DocId) -> usize {
let mut lo = 0usize;
let mut hi = self.doc_bases.len() - 1usize;
while lo <= hi {
let mid = (lo + hi) >> 1;
let mid_value = self.doc_bases[mid];
if mid_value == doc_id {
return mid;
} else if mid_value < doc_id {
lo = mid + 1;
} else {
hi = mid - 1;
}
}
hi
}
fn relative_doc_base(&self, block: usize, relative_chunk: usize) -> DocId {
let expected = self.avg_chunk_docs[block] as usize * relative_chunk;
let delta = self.doc_bases_deltas[block].get(relative_chunk).decode();
(expected as i64 + delta) as DocId
}
fn relative_start_pointer(&self, block: usize, relative_chunk: usize) -> i64 {
let expected = self.avg_chunk_sizes[block] as usize * relative_chunk;
let delta = self.start_pointers_deltas[block]
.get(relative_chunk)
.decode();
expected as i64 + delta
}
fn relative_chunk(&self, block: usize, relative_doc: DocId) -> Result<usize> {
let mut lo = 0usize;
let mut hi = self.doc_bases_deltas[block].size() - 1usize;
while lo <= hi {
let mid = (lo + hi) >> 1;
let mid_value = self.relative_doc_base(block, mid);
if mid_value == relative_doc {
return Ok(mid);
} else if mid_value < relative_doc {
lo = mid + 1;
} else {
hi = mid - 1;
}
}
Ok(hi)
}
pub fn start_pointer(&self, doc_id: DocId) -> Result<i64> {
debug_assert!(doc_id >= 0 && doc_id < self.max_doc);
let block = self.block(doc_id);
let relative_chunk = self.relative_chunk(block, doc_id - self.doc_bases[block])?;
Ok(self.start_pointers[block] + self.relative_start_pointer(block, relative_chunk))
}
}
pub struct CompressingStoredFieldsReader {
version: i32,
field_infos: Arc<FieldInfos>,
index_reader: Arc<CompressingStoredFieldsIndexReader>,
max_pointer: i64,
fields_stream: Box<dyn IndexInput>,
chunk_size: i32,
packed_ints_version: i32,
compression_mode: CompressionMode,
decompressor: Decompressor,
num_docs: i32,
merging: bool,
num_chunks: i64,
num_dirty_chunks: i64,
doc_base: DocId,
chunk_docs: usize,
sliced: bool,
offsets: Vec<i32>,
num_stored_fields: Vec<i32>,
start_pointer: i64,
bytes: Vec<u8>,
bytes_position: OffsetAndLength,
spare: Vec<u8>,
spare_position: OffsetAndLength,
current_doc: SerializedDocument,
}
unsafe impl Send for CompressingStoredFieldsReader {}
unsafe impl Sync for CompressingStoredFieldsReader {}
impl CompressingStoredFieldsReader {
pub fn new<D: Directory, DW: Directory, C: Codec>(
dir: &DW,
si: &SegmentInfo<D, C>,
segment_suffix: &str,
field_infos: Arc<FieldInfos>,
context: &IOContext,
format_name: &str,
compression_mode: CompressionMode,
) -> Result<CompressingStoredFieldsReader> {
let index_name = segment_file_name(&si.name, segment_suffix, STORED_FIELDS_INDEX_EXTENSION);
let mut index_stream = dir.open_checksum_input(&index_name, context)?;
let codec_name_idx = String::from(format_name) + CODEC_SFX_IDX;
let version = check_index_header(
&mut index_stream,
&codec_name_idx,
VERSION_START,
VERSION_CURRENT,
&si.id,
segment_suffix,
)?;
debug_assert_eq!(
index_header_length(&codec_name_idx, segment_suffix),
index_stream.file_pointer() as usize
);
let index_reader = CompressingStoredFieldsIndexReader::new(&mut index_stream, si)?;
let max_pointer = index_stream.read_vlong()?;
check_footer(&mut index_stream)?;
let fields_stream_fn = segment_file_name(&si.name, segment_suffix, STORED_FIELDS_EXTENSION);
let mut fields_stream = dir.open_input(&fields_stream_fn, context)?;
let codec_name_dat = String::from(format_name) + CODEC_SFX_DAT;
let fields_version = check_index_header(
fields_stream.as_mut(),
&codec_name_dat,
VERSION_START,
VERSION_CURRENT,
&si.id,
segment_suffix,
)?;
if version != fields_version {
bail!(CorruptIndex(format!(
"Version mismatch between stored fields index and data: {} != {}",
version, fields_version
)));
}
debug_assert_eq!(
index_header_length(&codec_name_dat, segment_suffix),
fields_stream.file_pointer() as usize
);
let chunk_size = fields_stream.read_vint()?;
let packed_ints_version = fields_stream.read_vint()?;
let mut num_chunks = -1i64;
let mut num_dirty_chunks = -1i64;
if version >= VERSION_CHUNK_STATS {
fields_stream.seek(max_pointer)?;
num_chunks = fields_stream.read_vlong()?;
num_dirty_chunks = fields_stream.read_vlong()?;
if num_dirty_chunks > num_chunks {
bail!(CorruptIndex(format!(
"invalid chunk counts: dirty={}, total={}",
num_dirty_chunks, num_chunks
)));
}
}
retrieve_checksum(fields_stream.as_mut())?;
let decompressor = compression_mode.new_decompressor();
Ok(CompressingStoredFieldsReader {
version,
field_infos,
index_reader: Arc::new(index_reader),
max_pointer,
fields_stream,
chunk_size,
packed_ints_version,
compression_mode,
decompressor,
num_docs: si.max_doc,
merging: false,
num_chunks,
num_dirty_chunks,
doc_base: 0,
chunk_docs: 0,
sliced: false,
offsets: Vec::new(),
num_stored_fields: Vec::new(),
start_pointer: 0i64,
bytes: vec![0u8; chunk_size as usize * 2],
bytes_position: OffsetAndLength(0, 0),
spare: vec![],
spare_position: OffsetAndLength(0, 0),
current_doc: SerializedDocument {
length: 0,
num_stored_fields: 0,
decompressed: 0,
input: DocumentInput::Bytes(ByteArrayDataInput::new(BytesRef::default())),
},
})
}
#[inline]
pub fn version(&self) -> i32 {
self.version
}
#[inline]
pub fn index_reader(&self) -> &CompressingStoredFieldsIndexReader {
self.index_reader.as_ref()
}
#[inline]
pub fn chunk_size(&self) -> i32 {
self.chunk_size
}
#[inline]
pub fn packed_ints_version(&self) -> i32 {
self.packed_ints_version
}
#[inline]
pub fn compression_mode(&self) -> CompressionMode {
self.compression_mode
}
#[inline]
pub fn num_chunks(&self) -> i64 {
self.num_chunks
}
#[inline]
pub fn num_dirty_chunks(&self) -> i64 {
self.num_dirty_chunks
}
#[inline]
pub fn max_pointer(&self) -> i64 {
self.max_pointer
}
#[inline]
pub fn fields_stream_mut(&mut self) -> &mut dyn IndexInput {
&mut *self.fields_stream
}
pub fn current_doc_mut(&mut self) -> &mut SerializedDocument {
&mut self.current_doc
}
fn copy_for_merge(&self) -> Result<CompressingStoredFieldsReader> {
Ok(CompressingStoredFieldsReader {
version: self.version,
field_infos: self.field_infos.clone(),
index_reader: Arc::clone(&self.index_reader),
max_pointer: self.max_pointer,
fields_stream: self.fields_stream.clone()?,
chunk_size: self.chunk_size,
packed_ints_version: self.packed_ints_version,
compression_mode: self.compression_mode,
decompressor: self.decompressor.clone(),
num_docs: self.num_docs,
merging: true,
num_chunks: self.num_chunks,
num_dirty_chunks: self.num_dirty_chunks,
doc_base: 0,
chunk_docs: 0,
sliced: false,
offsets: Vec::new(),
num_stored_fields: Vec::new(),
start_pointer: 0i64,
bytes: vec![0u8; self.chunk_size as usize * 2],
bytes_position: OffsetAndLength(0, 0),
spare: vec![],
spare_position: OffsetAndLength(0, 0),
current_doc: SerializedDocument {
length: 0,
num_stored_fields: 0,
decompressed: 0,
input: DocumentInput::Compressing(CompressingStoredFieldsInput {
reader: ptr::null_mut(),
}),
},
})
}
fn read_field<V: StoredFieldVisitor + ?Sized>(
input: &mut impl DataInput,
visitor: &mut V,
info: &FieldInfo,
bits: i32,
) -> Result<()> {
match bits & TYPE_MASK {
BYTE_ARR => {
let length = input.read_vint()? as usize;
let mut data = vec![0u8; length];
input.read_exact(data.as_mut())?;
visitor.add_binary_field(info, data)?;
}
STRING => {
let length = input.read_vint()? as usize;
let mut data = vec![0u8; length];
input.read_exact(data.as_mut())?;
visitor.add_string_field(info, data)?;
}
NUMERIC_INT => {
visitor.add_int_field(info, input.read_zint()?)?;
}
NUMERIC_FLOAT => {
visitor.add_float_field(info, Self::read_zfloat(input)?)?;
}
NUMERIC_LONG => {
visitor.add_long_field(info, Self::read_tlong(input)?)?;
}
NUMERIC_DOUBLE => {
visitor.add_double_field(info, Self::read_zdouble(input)?)?;
}
_ => {
debug_assert!(false, "Unknown type flag!");
}
}
Ok(())
}
fn read_zfloat(input: &mut impl DataInput) -> Result<f32> {
let b = i32::from(input.read_byte()?) & 0xffi32;
if b == 0xff {
Ok(f32::from_bits(input.read_int()? as u32))
} else if (b & 0x80) != 0 {
Ok(((b & 0x7f) - 1) as f32)
} else {
let bits = b << 24
| ((i32::from(input.read_short()?) & 0xffff) << 8)
| (i32::from(input.read_byte()?) & 0xff);
Ok(f32::from_bits(bits as u32))
}
}
fn read_zdouble(input: &mut impl DataInput) -> Result<f64> {
let b = i32::from(input.read_byte()?) & 0xffi32;
if b == 0xff {
Ok(f64::from_bits(input.read_long()? as u64))
} else if (b & 0x80) != 0 {
Ok(f64::from((b & 0x7f) - 1))
} else {
let bits = (i64::from(b)) << 56i64
| ((i64::from(input.read_int()?) & 0xffff_ffffi64) << 24)
| ((i64::from(input.read_short()?) & 0xffffi64) << 8)
| (i64::from(input.read_byte()?) & 0xffi64);
Ok(f64::from_bits(bits as u64))
}
}
fn read_tlong(input: &mut impl DataInput) -> Result<i64> {
let header = i32::from(input.read_byte()?) & 0xff;
let mut bits = i64::from(header) & 0x1fi64;
if (header & 0x20) != 0 {
bits |= input.read_vlong()? << 5i64;
}
let mut l = bits.decode();
match header & DAY_ENCODING {
SECOND_ENCODING => {
l *= SECOND;
}
HOUR_ENCODING => {
l *= HOUR;
}
DAY_ENCODING => {
l *= DAY;
}
0 => {}
_ => {
debug_assert!(false);
}
}
Ok(l)
}
fn skip_field(input: &mut impl DataInput, bits: i32) -> Result<()> {
match bits & TYPE_MASK {
BYTE_ARR | STRING => {
let length = input.read_vint()?;
input.skip_bytes(length as usize)?;
}
NUMERIC_INT => {
input.read_zint()?;
}
NUMERIC_FLOAT => {
Self::read_zfloat(input)?;
}
NUMERIC_LONG => {
Self::read_tlong(input)?;
}
NUMERIC_DOUBLE => {
Self::read_zdouble(input)?;
}
_ => {
debug_assert!(false, format!("Unknown type flag: {}", bits));
}
}
Ok(())
}
fn contains(&self, doc_id: i32) -> bool {
doc_id >= self.doc_base && doc_id < self.doc_base + self.chunk_docs as i32
}
fn reset_state(&mut self, doc_id: i32) {
if self.do_reset(doc_id).is_err() {
self.chunk_docs = 0;
}
}
fn do_reset(&mut self, doc_id: i32) -> Result<()> {
self.doc_base = self.fields_stream.read_vint()?;
let token = self.fields_stream.read_vint()?;
debug_assert!(token >= 0);
self.chunk_docs = token.unsigned_shift(1usize) as usize;
if !self.contains(doc_id) || self.doc_base + self.chunk_docs as i32 > self.num_docs {
bail!(CorruptIndex(format!(
"doc_id={}, doc_base={}, chunk_docs={}, num_docs={}",
doc_id, self.doc_base, self.chunk_docs, self.num_docs
)));
}
self.sliced = (token & 1) != 0;
self.offsets.resize((self.chunk_docs + 1) as usize, 0);
self.num_stored_fields.resize(self.chunk_docs as usize, 0);
if self.chunk_docs == 1 {
self.num_stored_fields[0] = self.fields_stream.read_vint()?;
self.offsets[1] = self.fields_stream.read_vint()?;
} else {
let bits_per_stored_fields = self.fields_stream.read_vint()?;
if bits_per_stored_fields == 0 {
let value = self.fields_stream.read_vint()?;
for i in 0..self.chunk_docs {
self.num_stored_fields[i] = value;
}
} else if bits_per_stored_fields > 31 {
bail!(CorruptIndex(format!(
"bits_per_stored_fields={}",
bits_per_stored_fields
)));
} else {
let mut it = get_reader_iterator_no_header(
Format::Packed,
self.packed_ints_version,
self.chunk_docs,
bits_per_stored_fields,
1,
)?;
for i in 0..self.chunk_docs {
self.num_stored_fields[i] = it.next(self.fields_stream.as_mut())? as i32;
}
}
let bits_per_length = self.fields_stream.read_vint()?;
if bits_per_length == 0 {
let length = self.fields_stream.read_vint()?;
for i in 0..self.chunk_docs {
self.offsets[1 + i] = (1 + i as i32) * length;
}
} else {
let mut it = get_reader_iterator_no_header(
Format::Packed,
self.packed_ints_version,
self.chunk_docs,
bits_per_length,
1,
)?;
for i in 0..self.chunk_docs {
self.offsets[i + 1] = it.next(self.fields_stream.as_mut())? as i32;
}
for i in 0..self.chunk_docs {
self.offsets[i + 1] += self.offsets[i];
}
}
for i in 0..self.chunk_docs {
let len = self.offsets[i + 1] - self.offsets[i];
let stored_fields = self.num_stored_fields[i];
if (len == 0) != (stored_fields == 0) {
bail!(CorruptIndex(format!(
"length={}, num_stored_fields={}",
len, stored_fields
)));
}
}
}
self.start_pointer = self.fields_stream.file_pointer();
if self.merging {
let total_length = self.offsets[self.chunk_docs];
if self.sliced {
self.bytes_position.0 = 0;
self.bytes_position.1 = 0;
let mut decompressed = 0;
while decompressed < total_length {
let to_decompress = min(total_length - decompressed, self.chunk_size);
self.decompressor.decompress(
self.fields_stream.as_mut(),
to_decompress as usize,
0,
to_decompress as usize,
&mut self.spare,
&mut self.spare_position,
)?;
self.bytes
.resize(self.bytes_position.1 + self.spare_position.1, 0);
self.bytes
[self.bytes_position.0..self.bytes_position.0 + self.spare_position.1]
.copy_from_slice(
&self.spare[self.spare_position.0
..self.spare_position.0 + self.spare_position.1],
);
self.bytes_position.1 += self.spare_position.1;
decompressed += to_decompress;
}
} else {
self.decompressor.decompress(
self.fields_stream.as_mut(),
total_length as usize,
0,
total_length as usize,
&mut self.bytes,
&mut self.bytes_position,
)?;
}
if self.bytes_position.1 != total_length as usize {
bail!(CorruptIndex(format!(
"expected chunk size = {}, got {}",
total_length, self.bytes_position.1
)));
}
}
Ok(())
}
pub fn document(&mut self, doc_id: DocId) -> Result<()> {
if !self.contains(doc_id) {
self.fields_stream
.seek(self.index_reader.start_pointer(doc_id)?)?;
self.reset_state(doc_id);
}
debug_assert!(self.contains(doc_id));
self.do_get_document(doc_id)
}
fn do_get_document(&mut self, doc_id: DocId) -> Result<()> {
if !self.contains(doc_id) {
bail!(IllegalArgument(format!("doc {} don't exist", doc_id)));
}
let index = (doc_id - self.doc_base) as usize;
let offset = self.offsets[index] as usize;
let length = self.offsets[index + 1] as usize - offset;
let total_length = self.offsets[self.chunk_docs] as usize;
let num_stored_fields = self.num_stored_fields[index];
self.current_doc.num_stored_fields = num_stored_fields;
self.current_doc.length = length;
self.current_doc.input = if length == 0 {
DocumentInput::Bytes(ByteArrayDataInput::new(BytesRef::default()))
} else if self.merging {
let start = self.bytes_position.0 + offset;
DocumentInput::Bytes(ByteArrayDataInput::new(BytesRef::new(
&self.bytes[start..start + length],
)))
} else if self.sliced {
self.fields_stream.seek(self.start_pointer)?;
self.decompressor.decompress(
self.fields_stream.as_mut(),
self.chunk_size as usize,
offset,
min(length, self.chunk_size as usize - offset),
self.bytes.as_mut(),
&mut self.bytes_position,
)?;
self.current_doc.decompressed = self.bytes_position.1;
DocumentInput::Compressing(CompressingStoredFieldsInput::new(self))
} else {
self.fields_stream.seek(self.start_pointer)?;
self.decompressor.decompress(
self.fields_stream.as_mut(),
total_length,
offset,
length,
self.bytes.as_mut(),
&mut self.bytes_position,
)?;
debug_assert_eq!(self.bytes_position.1, length);
self.current_doc.decompressed = self.bytes_position.1;
DocumentInput::Compressing(CompressingStoredFieldsInput::new(self))
};
Ok(())
}
fn fill_buffer(&mut self) -> Result<()> {
debug_assert!(self.current_doc.decompressed <= self.current_doc.length);
if self.current_doc.decompressed == self.current_doc.length {
bail!(UnexpectedEOF("".into()));
}
let to_decompress = min(
self.current_doc.length - self.current_doc.decompressed,
self.chunk_size as usize,
);
self.decompressor.decompress(
self.fields_stream.as_mut(),
to_decompress,
0,
to_decompress,
&mut self.bytes,
&mut self.bytes_position,
)?;
self.current_doc.decompressed += to_decompress;
Ok(())
}
pub fn clone(&self) -> Result<Self> {
Ok(CompressingStoredFieldsReader {
version: self.version,
field_infos: self.field_infos.clone(),
index_reader: self.index_reader.clone(),
max_pointer: self.max_pointer,
fields_stream: self.fields_stream.as_ref().clone()?,
chunk_size: self.chunk_size,
packed_ints_version: self.packed_ints_version,
compression_mode: self.compression_mode,
decompressor: self.decompressor.clone(),
num_docs: self.num_docs,
merging: false,
num_chunks: self.num_chunks,
num_dirty_chunks: self.num_dirty_chunks,
doc_base: self.doc_base,
chunk_docs: self.chunk_docs,
sliced: self.sliced,
offsets: self.offsets.clone(),
num_stored_fields: self.num_stored_fields.clone(),
start_pointer: self.start_pointer,
bytes: vec![0u8; self.chunk_size as usize * 2],
bytes_position: OffsetAndLength(0, 0),
spare: vec![],
spare_position: OffsetAndLength(0, 0),
current_doc: SerializedDocument {
length: self.current_doc.length,
num_stored_fields: self.current_doc.num_stored_fields,
decompressed: self.current_doc.decompressed,
input: DocumentInput::Bytes(ByteArrayDataInput::new(BytesRef::default())),
},
})
}
fn do_visit_document<V: StoredFieldVisitor + ?Sized>(
&mut self,
doc_id: DocId,
visitor: &mut V,
) -> Result<()> {
self.document(doc_id)?;
for field_idx in 0..self.current_doc.num_stored_fields {
let info_and_bits = self.current_doc.input.read_vlong()?;
let field_number = info_and_bits.unsigned_shift(TYPE_BITS as usize) as u32;
let field_info = self.field_infos.by_number[&field_number].clone();
let bits = (info_and_bits & i64::from(TYPE_MASK)) as i32;
debug_assert!(bits <= NUMERIC_DOUBLE);
match visitor.needs_field(field_info.as_ref()) {
VisitStatus::Yes => {
Self::read_field(
&mut self.current_doc.input,
visitor,
field_info.as_ref(),
bits,
)?;
}
VisitStatus::No => {
if field_idx == self.current_doc.num_stored_fields - 1 {
return Ok(());
}
Self::skip_field(&mut self.current_doc.input, bits)?;
}
VisitStatus::Stop => {
return Ok(());
}
}
}
Ok(())
}
}
impl StoredFieldsReader for CompressingStoredFieldsReader {
fn visit_document(&self, doc_id: DocId, visitor: &mut dyn StoredFieldVisitor) -> Result<()> {
self.clone()?.do_visit_document(doc_id, visitor)
}
fn visit_document_mut(
&mut self,
doc_id: i32,
visitor: &mut dyn StoredFieldVisitor,
) -> Result<()> {
self.do_visit_document(doc_id, visitor)
}
fn get_merge_instance(&self) -> Result<Self> {
self.copy_for_merge()
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub struct CompressingStoredFieldsInput {
reader: *mut CompressingStoredFieldsReader,
}
impl CompressingStoredFieldsInput {
fn new(reader: &mut CompressingStoredFieldsReader) -> Self {
CompressingStoredFieldsInput { reader }
}
#[allow(clippy::mut_from_ref)]
fn reader(&self) -> &mut CompressingStoredFieldsReader {
unsafe { &mut *self.reader }
}
}
impl DataInput for CompressingStoredFieldsInput {
fn read_byte(&mut self) -> Result<u8> {
let reader = self.reader();
if reader.bytes_position.1 == 0 {
reader.fill_buffer()?;
}
let res = reader.bytes[reader.bytes_position.0];
reader.bytes_position.0 += 1;
reader.bytes_position.1 -= 1;
Ok(res)
}
fn read_bytes(&mut self, b: &mut [u8], offset: usize, len: usize) -> Result<()> {
let reader = self.reader();
let mut len = len;
let mut offset = offset;
while len > reader.bytes_position.1 {
(&mut b[offset..offset + reader.bytes_position.1]).copy_from_slice(
&reader.bytes
[reader.bytes_position.0..reader.bytes_position.0 + reader.bytes_position.1],
);
len -= reader.bytes_position.1;
offset += reader.bytes_position.1;
reader.fill_buffer()?;
}
(&mut b[offset..offset + len])
.copy_from_slice(&reader.bytes[reader.bytes_position.0..reader.bytes_position.0 + len]);
reader.bytes_position.0 += len;
reader.bytes_position.1 -= len;
Ok(())
}
}
impl Read for CompressingStoredFieldsInput {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let reader = self.reader();
let size = min(buf.len(), reader.bytes_position.1);
buf[0..size].copy_from_slice(
&reader.bytes[reader.bytes_position.0..reader.bytes_position.0 + size],
);
reader.bytes_position.0 += size;
reader.bytes_position.1 -= size;
Ok(size)
}
}
pub enum DocumentInput {
Compressing(CompressingStoredFieldsInput),
Bytes(ByteArrayDataInput<BytesRef>),
}
impl Read for DocumentInput {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
DocumentInput::Compressing(c) => c.read(buf),
DocumentInput::Bytes(b) => b.read(buf),
}
}
}
impl DataInput for DocumentInput {
fn read_byte(&mut self) -> Result<u8> {
match self {
DocumentInput::Compressing(c) => c.read_byte(),
DocumentInput::Bytes(b) => b.read_byte(),
}
}
fn read_bytes(&mut self, b: &mut [u8], offset: usize, len: usize) -> Result<()> {
match self {
DocumentInput::Compressing(c) => c.read_bytes(b, offset, len),
DocumentInput::Bytes(i) => i.read_bytes(b, offset, len),
}
}
fn skip_bytes(&mut self, count: usize) -> Result<()> {
match self {
DocumentInput::Compressing(c) => c.skip_bytes(count),
DocumentInput::Bytes(b) => b.skip_bytes(count),
}
}
}
pub struct SerializedDocument {
pub length: usize,
pub num_stored_fields: i32,
pub decompressed: usize,
pub input: DocumentInput,
}