summavy/positions/
reader.rs

1use std::io;
2
3use common::{BinarySerializable, VInt};
4
5use crate::directory::OwnedBytes;
6use crate::positions::COMPRESSION_BLOCK_SIZE;
7use crate::postings::compression::{BlockDecoder, VIntDecoder};
8
9/// When accessing the positions of a term, we get a positions_idx from the `Terminfo`.
10/// This means we need to skip to the `nth` position efficiently.
11///
12/// Blocks are compressed using bitpacking, so `skip_read` contains the number of bits
13/// (values can go from 0 to 32 bits) required to decompress every block.
14///
15/// A given block obviously takes `(128 x  num_bit_for_the_block / num_bits_in_a_byte)`,
16/// so skipping a block without decompressing it is just a matter of advancing that many
17/// bytes.
18
19#[derive(Clone)]
20pub struct PositionReader {
21    bit_widths: OwnedBytes,
22    positions: OwnedBytes,
23
24    block_decoder: BlockDecoder,
25
26    // offset, expressed in positions, for the first position of the block currently loaded
27    // block_offset is a multiple of COMPRESSION_BLOCK_SIZE.
28    block_offset: u64,
29    // offset, expressed in positions, for the position of the first block encoded
30    // in the `self.positions` bytes, and if bitpacked, compressed using the bitwidth in
31    // `self.bit_widths`.
32    //
33    // As we advance, anchor increases simultaneously with bit_widths and positions get consumed.
34    anchor_offset: u64,
35
36    // These are just copies used for .reset().
37    original_bit_widths: OwnedBytes,
38    original_positions: OwnedBytes,
39}
40
41impl PositionReader {
42    /// Open and reads the term positions encoded into the positions_data owned bytes.
43    pub fn open(mut positions_data: OwnedBytes) -> io::Result<PositionReader> {
44        let num_positions_bitpacked_blocks = VInt::deserialize(&mut positions_data)?.0 as usize;
45        let (bit_widths, positions) = positions_data.split(num_positions_bitpacked_blocks);
46        Ok(PositionReader {
47            bit_widths: bit_widths.clone(),
48            positions: positions.clone(),
49            block_decoder: BlockDecoder::default(),
50            block_offset: i64::MAX as u64,
51            anchor_offset: 0u64,
52            original_bit_widths: bit_widths,
53            original_positions: positions,
54        })
55    }
56
57    fn reset(&mut self) {
58        self.positions = self.original_positions.clone();
59        self.bit_widths = self.original_bit_widths.clone();
60        self.block_offset = i64::MAX as u64;
61        self.anchor_offset = 0u64;
62    }
63
64    /// Advance from num_blocks bitpacked blocks.
65    ///
66    /// Panics if there are not that many remaining blocks.
67    fn advance_num_blocks(&mut self, num_blocks: usize) {
68        let num_bits: usize = self.bit_widths.as_ref()[..num_blocks]
69            .iter()
70            .cloned()
71            .map(|num_bits| num_bits as usize)
72            .sum();
73        let num_bytes_to_skip = num_bits * COMPRESSION_BLOCK_SIZE / 8;
74        self.bit_widths.advance(num_blocks);
75        self.positions.advance(num_bytes_to_skip);
76        self.anchor_offset += (num_blocks * COMPRESSION_BLOCK_SIZE) as u64;
77    }
78
79    /// block_rel_id is counted relatively to the anchor.
80    /// block_rel_id = 0 means the anchor block.
81    /// block_rel_id = i means the ith block after the anchor block.
82    fn load_block(&mut self, block_rel_id: usize) {
83        let bit_widths = self.bit_widths.as_slice();
84        let byte_offset: usize = bit_widths[0..block_rel_id]
85            .iter()
86            .map(|&b| b as usize)
87            .sum::<usize>()
88            * COMPRESSION_BLOCK_SIZE
89            / 8;
90        let compressed_data = &self.positions.as_slice()[byte_offset..];
91        if bit_widths.len() > block_rel_id {
92            // that block is bitpacked.
93            let bit_width = bit_widths[block_rel_id];
94            self.block_decoder
95                .uncompress_block_unsorted(compressed_data, bit_width);
96        } else {
97            // that block is vint encoded.
98            self.block_decoder
99                .uncompress_vint_unsorted_until_end(compressed_data);
100        }
101        self.block_offset = self.anchor_offset + (block_rel_id * COMPRESSION_BLOCK_SIZE) as u64;
102    }
103
104    /// Fills a buffer with the positions `[offset..offset+output.len())` integers.
105    ///
106    /// This function is optimized to be called with increasing values of `offset`.
107    pub fn read(&mut self, mut offset: u64, mut output: &mut [u32]) {
108        if offset < self.anchor_offset {
109            self.reset();
110        }
111        let delta_to_block_offset = offset as i64 - self.block_offset as i64;
112        if !(0..128).contains(&delta_to_block_offset) {
113            // The first position is not within the first block.
114            // (Note that it could be before or after)
115            // We need to possibly skip a few blocks, and decompress the first relevant  block.
116            let delta_to_anchor_offset = offset - self.anchor_offset;
117            let num_blocks_to_skip =
118                (delta_to_anchor_offset / (COMPRESSION_BLOCK_SIZE as u64)) as usize;
119            self.advance_num_blocks(num_blocks_to_skip);
120            self.load_block(0);
121        } else {
122            // The request offset is within the loaded block.
123            // We still need to advance anchor_offset to our current block.
124            let num_blocks_to_skip =
125                ((self.block_offset - self.anchor_offset) / COMPRESSION_BLOCK_SIZE as u64) as usize;
126            self.advance_num_blocks(num_blocks_to_skip);
127        }
128
129        // At this point, the block containing offset is loaded, and anchor has
130        // been updated to point to it as well.
131        for i in 1.. {
132            // we copy the part from block i - 1 that is relevant.
133            let offset_in_block = (offset as usize) % COMPRESSION_BLOCK_SIZE;
134            let remaining_in_block = COMPRESSION_BLOCK_SIZE - offset_in_block;
135            if remaining_in_block >= output.len() {
136                output.copy_from_slice(
137                    &self.block_decoder.output_array()[offset_in_block..][..output.len()],
138                );
139                break;
140            }
141            output[..remaining_in_block]
142                .copy_from_slice(&self.block_decoder.output_array()[offset_in_block..]);
143            output = &mut output[remaining_in_block..];
144            // we load block #i if necessary.
145            offset += remaining_in_block as u64;
146            self.load_block(i);
147        }
148    }
149}