tantivy_columnar/column_index/optional_index/
mod.rs

1use std::io::{self, Write};
2use std::sync::Arc;
3
4mod set;
5mod set_block;
6
7use common::{BinarySerializable, OwnedBytes, VInt};
8pub use set::{SelectCursor, Set, SetCodec};
9use set_block::{
10    DenseBlock, DenseBlockCodec, SparseBlock, SparseBlockCodec, DENSE_BLOCK_NUM_BYTES,
11};
12
13use crate::iterable::Iterable;
14use crate::{DocId, InvalidData, RowId};
15
16/// The threshold for for number of elements after which we switch to dense block encoding.
17///
18/// We simply pick the value that minimize the size of the blocks.
19const DENSE_BLOCK_THRESHOLD: u32 =
20    set_block::DENSE_BLOCK_NUM_BYTES / std::mem::size_of::<u16>() as u32; //< 5_120
21
22const ELEMENTS_PER_BLOCK: u32 = u16::MAX as u32 + 1;
23
24#[derive(Copy, Clone, Debug)]
25struct BlockMeta {
26    non_null_rows_before_block: u32,
27    start_byte_offset: u32,
28    block_variant: BlockVariant,
29}
30
31#[derive(Clone, Copy, Debug)]
32enum BlockVariant {
33    Dense,
34    Sparse { num_vals: u16 },
35}
36
37impl BlockVariant {
38    pub fn empty() -> Self {
39        Self::Sparse { num_vals: 0 }
40    }
41    pub fn num_bytes_in_block(&self) -> u32 {
42        match *self {
43            BlockVariant::Dense => set_block::DENSE_BLOCK_NUM_BYTES,
44            BlockVariant::Sparse { num_vals } => num_vals as u32 * 2,
45        }
46    }
47}
48
49/// This codec is inspired by roaring bitmaps.
50/// In the dense blocks, however, in order to accelerate `select`
51/// we interleave an offset over two bytes. (more on this lower)
52///
53/// The lower 16 bits of doc ids are stored as u16 while the upper 16 bits are given by the block
54/// id. Each block contains 1<<16 docids.
55///
56/// # Serialized Data Layout
57/// The data starts with the block data. Each block is either dense or sparse encoded, depending on
58/// the number of values in the block. A block is sparse when it contains less than
59/// DENSE_BLOCK_THRESHOLD (6144) values.
60/// [Sparse data block | dense data block, .. #repeat*; Desc: Either a sparse or dense encoded
61/// block]
62/// ### Sparse block data
63/// [u16 LE, .. #repeat*; Desc: Positions with values in a block]
64/// ### Dense block data
65/// [Dense codec for the whole block; Desc: Similar to a bitvec(0..ELEMENTS_PER_BLOCK) + Metadata
66/// for faster lookups. See dense.rs]
67///
68/// The data is followed by block metadata, to know which area of the raw block data belongs to
69/// which block. Only metadata for blocks with elements is recorded to
70/// keep the overhead low for scenarios with many very sparse columns. The block metadata consists
71/// of the block index and the number of values in the block. Since we don't store empty blocks
72/// num_vals is incremented by 1, e.g. 0 means 1 value.
73///
74/// The last u16 is storing the number of metadata blocks.
75/// [u16 LE, .. #repeat*; Desc: Positions with values in a block][(u16 LE, u16 LE), .. #repeat*;
76/// Desc: (Block Id u16, Num Elements u16)][u16 LE; Desc: num blocks with values u16]
77///
78/// # Opening
79/// When opening the data layout, the data is expanded to `Vec<SparseCodecBlockVariant>`, where the
80/// index is the block index. For each block `byte_start` and `offset` is computed.
81#[derive(Clone)]
82pub struct OptionalIndex {
83    num_rows: RowId,
84    num_non_null_rows: RowId,
85    block_data: OwnedBytes,
86    block_metas: Arc<[BlockMeta]>,
87}
88
89impl std::fmt::Debug for OptionalIndex {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        f.debug_struct("OptionalIndex")
92            .field("num_rows", &self.num_rows)
93            .field("num_non_null_rows", &self.num_non_null_rows)
94            .finish_non_exhaustive()
95    }
96}
97
98/// Splits a value address into lower and upper 16bits.
99/// The lower 16 bits are the value in the block
100/// The upper 16 bits are the block index
101#[derive(Copy, Debug, Clone)]
102struct RowAddr {
103    block_id: u16,
104    in_block_row_id: u16,
105}
106
107#[inline(always)]
108fn row_addr_from_row_id(row_id: RowId) -> RowAddr {
109    RowAddr {
110        block_id: (row_id / ELEMENTS_PER_BLOCK) as u16,
111        in_block_row_id: (row_id % ELEMENTS_PER_BLOCK) as u16,
112    }
113}
114
115enum BlockSelectCursor<'a> {
116    Dense(<DenseBlock<'a> as Set<u16>>::SelectCursor<'a>),
117    Sparse(<SparseBlock<'a> as Set<u16>>::SelectCursor<'a>),
118}
119
120impl<'a> BlockSelectCursor<'a> {
121    fn select(&mut self, rank: u16) -> u16 {
122        match self {
123            BlockSelectCursor::Dense(dense_select_cursor) => dense_select_cursor.select(rank),
124            BlockSelectCursor::Sparse(sparse_select_cursor) => sparse_select_cursor.select(rank),
125        }
126    }
127}
128pub struct OptionalIndexSelectCursor<'a> {
129    current_block_cursor: BlockSelectCursor<'a>,
130    current_block_id: u16,
131    // The current block is guaranteed to contain ranks < end_rank.
132    current_block_end_rank: RowId,
133    optional_index: &'a OptionalIndex,
134    block_doc_idx_start: RowId,
135    num_null_rows_before_block: RowId,
136}
137
138impl<'a> OptionalIndexSelectCursor<'a> {
139    fn search_and_load_block(&mut self, rank: RowId) {
140        if rank < self.current_block_end_rank {
141            // we are already in the right block
142            return;
143        }
144        self.current_block_id = self.optional_index.find_block(rank, self.current_block_id);
145        self.current_block_end_rank = self
146            .optional_index
147            .block_metas
148            .get(self.current_block_id as usize + 1)
149            .map(|block_meta| block_meta.non_null_rows_before_block)
150            .unwrap_or(u32::MAX);
151        self.block_doc_idx_start = (self.current_block_id as u32) * ELEMENTS_PER_BLOCK;
152        let block_meta = self.optional_index.block_metas[self.current_block_id as usize];
153        self.num_null_rows_before_block = block_meta.non_null_rows_before_block;
154        let block: Block<'_> = self.optional_index.block(block_meta);
155        self.current_block_cursor = match block {
156            Block::Dense(dense_block) => BlockSelectCursor::Dense(dense_block.select_cursor()),
157            Block::Sparse(sparse_block) => BlockSelectCursor::Sparse(sparse_block.select_cursor()),
158        };
159    }
160}
161
162impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
163    fn select(&mut self, rank: RowId) -> RowId {
164        self.search_and_load_block(rank);
165        let index_in_block = (rank - self.num_null_rows_before_block) as u16;
166        self.current_block_cursor.select(index_in_block) as RowId + self.block_doc_idx_start
167    }
168}
169
170impl Set<RowId> for OptionalIndex {
171    type SelectCursor<'b> = OptionalIndexSelectCursor<'b> where Self: 'b;
172    // Check if value at position is not null.
173    #[inline]
174    fn contains(&self, row_id: RowId) -> bool {
175        let RowAddr {
176            block_id,
177            in_block_row_id,
178        } = row_addr_from_row_id(row_id);
179        let block_meta = self.block_metas[block_id as usize];
180        match self.block(block_meta) {
181            Block::Dense(dense_block) => dense_block.contains(in_block_row_id),
182            Block::Sparse(sparse_block) => sparse_block.contains(in_block_row_id),
183        }
184    }
185
186    /// Any value doc_id is allowed.
187    /// In particular, doc_id = num_rows.
188    #[inline]
189    fn rank(&self, doc_id: DocId) -> RowId {
190        if doc_id >= self.num_docs() {
191            return self.num_non_nulls();
192        }
193        let RowAddr {
194            block_id,
195            in_block_row_id,
196        } = row_addr_from_row_id(doc_id);
197        let block_meta = self.block_metas[block_id as usize];
198        let block = self.block(block_meta);
199        let block_offset_row_id = match block {
200            Block::Dense(dense_block) => dense_block.rank(in_block_row_id),
201            Block::Sparse(sparse_block) => sparse_block.rank(in_block_row_id),
202        } as u32;
203        block_meta.non_null_rows_before_block + block_offset_row_id
204    }
205
206    /// Any value doc_id is allowed.
207    /// In particular, doc_id = num_rows.
208    #[inline]
209    fn rank_if_exists(&self, doc_id: DocId) -> Option<RowId> {
210        let RowAddr {
211            block_id,
212            in_block_row_id,
213        } = row_addr_from_row_id(doc_id);
214        let block_meta = *self.block_metas.get(block_id as usize)?;
215        let block = self.block(block_meta);
216        let block_offset_row_id = match block {
217            Block::Dense(dense_block) => dense_block.rank_if_exists(in_block_row_id),
218            Block::Sparse(sparse_block) => sparse_block.rank_if_exists(in_block_row_id),
219        }? as u32;
220        Some(block_meta.non_null_rows_before_block + block_offset_row_id)
221    }
222
223    #[inline]
224    fn select(&self, rank: RowId) -> RowId {
225        let block_pos = self.find_block(rank, 0);
226        let block_doc_idx_start = (block_pos as u32) * ELEMENTS_PER_BLOCK;
227        let block_meta = self.block_metas[block_pos as usize];
228        let block: Block<'_> = self.block(block_meta);
229        let index_in_block = (rank - block_meta.non_null_rows_before_block) as u16;
230        let in_block_rank = match block {
231            Block::Dense(dense_block) => dense_block.select(index_in_block),
232            Block::Sparse(sparse_block) => sparse_block.select(index_in_block),
233        };
234        block_doc_idx_start + in_block_rank as u32
235    }
236
237    fn select_cursor(&self) -> OptionalIndexSelectCursor<'_> {
238        OptionalIndexSelectCursor {
239            current_block_cursor: BlockSelectCursor::Sparse(
240                SparseBlockCodec::open(b"").select_cursor(),
241            ),
242            current_block_id: 0u16,
243            current_block_end_rank: 0u32, //< this is sufficient to force the first load
244            optional_index: self,
245            block_doc_idx_start: 0u32,
246            num_null_rows_before_block: 0u32,
247        }
248    }
249}
250
251impl OptionalIndex {
252    pub fn for_test(num_rows: RowId, row_ids: &[RowId]) -> OptionalIndex {
253        assert!(row_ids
254            .last()
255            .copied()
256            .map(|last_row_id| last_row_id < num_rows)
257            .unwrap_or(true));
258        let mut buffer = Vec::new();
259        serialize_optional_index(&row_ids, num_rows, &mut buffer).unwrap();
260        let bytes = OwnedBytes::new(buffer);
261        open_optional_index(bytes).unwrap()
262    }
263
264    pub fn num_docs(&self) -> RowId {
265        self.num_rows
266    }
267
268    pub fn num_non_nulls(&self) -> RowId {
269        self.num_non_null_rows
270    }
271
272    pub fn iter_rows(&self) -> impl Iterator<Item = RowId> + '_ {
273        // TODO optimize
274        let mut select_batch = self.select_cursor();
275        (0..self.num_non_null_rows).map(move |rank| select_batch.select(rank))
276    }
277    pub fn select_batch(&self, ranks: &mut [RowId]) {
278        let mut select_cursor = self.select_cursor();
279        for rank in ranks.iter_mut() {
280            *rank = select_cursor.select(*rank);
281        }
282    }
283
284    #[inline]
285    fn block(&self, block_meta: BlockMeta) -> Block<'_> {
286        let BlockMeta {
287            start_byte_offset,
288            block_variant,
289            ..
290        } = block_meta;
291        let start_byte_offset = start_byte_offset as usize;
292        let bytes = self.block_data.as_slice();
293        match block_variant {
294            BlockVariant::Dense => Block::Dense(DenseBlockCodec::open(
295                &bytes[start_byte_offset..start_byte_offset + DENSE_BLOCK_NUM_BYTES as usize],
296            )),
297            BlockVariant::Sparse { num_vals } => {
298                let end_byte_offset = start_byte_offset + num_vals as usize * 2;
299                let sparse_bytes = &bytes[start_byte_offset..end_byte_offset];
300                Block::Sparse(SparseBlockCodec::open(sparse_bytes))
301            }
302        }
303    }
304
305    #[inline]
306    fn find_block(&self, dense_idx: u32, start_block_pos: u16) -> u16 {
307        for block_pos in start_block_pos..self.block_metas.len() as u16 {
308            let offset = self.block_metas[block_pos as usize].non_null_rows_before_block;
309            if offset > dense_idx {
310                return block_pos - 1u16;
311            }
312        }
313        self.block_metas.len() as u16 - 1u16
314    }
315
316    // TODO Add a good API for the codec_idx to original_idx translation.
317    // The Iterator API is a probably a bad idea
318}
319
320#[derive(Copy, Clone)]
321enum Block<'a> {
322    Dense(DenseBlock<'a>),
323    Sparse(SparseBlock<'a>),
324}
325
326#[derive(Debug, Copy, Clone)]
327enum OptionalIndexCodec {
328    Dense = 0,
329    Sparse = 1,
330}
331
332impl OptionalIndexCodec {
333    fn to_code(self) -> u8 {
334        self as u8
335    }
336
337    fn try_from_code(code: u8) -> Result<Self, InvalidData> {
338        match code {
339            0 => Ok(Self::Dense),
340            1 => Ok(Self::Sparse),
341            _ => Err(InvalidData),
342        }
343    }
344}
345
346impl BinarySerializable for OptionalIndexCodec {
347    fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
348        writer.write_all(&[self.to_code()])
349    }
350
351    fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
352        let optional_codec_code = u8::deserialize(reader)?;
353        let optional_codec = Self::try_from_code(optional_codec_code)?;
354        Ok(optional_codec)
355    }
356}
357
358fn serialize_optional_index_block(block_els: &[u16], out: &mut impl io::Write) -> io::Result<()> {
359    let is_sparse = is_sparse(block_els.len() as u32);
360    if is_sparse {
361        SparseBlockCodec::serialize(block_els.iter().copied(), out)?;
362    } else {
363        DenseBlockCodec::serialize(block_els.iter().copied(), out)?;
364    }
365    Ok(())
366}
367
368pub fn serialize_optional_index<W: io::Write>(
369    non_null_rows: &dyn Iterable<RowId>,
370    num_rows: RowId,
371    output: &mut W,
372) -> io::Result<()> {
373    VInt(num_rows as u64).serialize(output)?;
374
375    let mut rows_it = non_null_rows.boxed_iter();
376    let mut block_metadata: Vec<SerializedBlockMeta> = Vec::new();
377    let mut current_block = Vec::new();
378
379    // This if-statement for the first element ensures that
380    // `block_metadata` is not empty in the loop below.
381    let Some(idx) = rows_it.next() else {
382        output.write_all(&0u16.to_le_bytes())?;
383        return Ok(());
384    };
385
386    let row_addr = row_addr_from_row_id(idx);
387
388    let mut current_block_id = row_addr.block_id;
389    current_block.push(row_addr.in_block_row_id);
390
391    for idx in rows_it {
392        let value_addr = row_addr_from_row_id(idx);
393        if current_block_id != value_addr.block_id {
394            serialize_optional_index_block(&current_block[..], output)?;
395            block_metadata.push(SerializedBlockMeta {
396                block_id: current_block_id,
397                num_non_null_rows: current_block.len() as u32,
398            });
399            current_block.clear();
400            current_block_id = value_addr.block_id;
401        }
402        current_block.push(value_addr.in_block_row_id);
403    }
404
405    // handle last block
406    serialize_optional_index_block(&current_block[..], output)?;
407
408    block_metadata.push(SerializedBlockMeta {
409        block_id: current_block_id,
410        num_non_null_rows: current_block.len() as u32,
411    });
412
413    for block in &block_metadata {
414        output.write_all(&block.to_bytes())?;
415    }
416
417    output.write_all((block_metadata.len() as u16).to_le_bytes().as_ref())?;
418
419    Ok(())
420}
421
422const SERIALIZED_BLOCK_META_NUM_BYTES: usize = 4;
423
424#[derive(Clone, Copy, Debug)]
425struct SerializedBlockMeta {
426    block_id: u16,
427    num_non_null_rows: u32, //< takes values in 1..=u16::MAX
428}
429
430// TODO unit tests
431impl SerializedBlockMeta {
432    #[inline]
433    fn from_bytes(bytes: [u8; SERIALIZED_BLOCK_META_NUM_BYTES]) -> SerializedBlockMeta {
434        let block_id = u16::from_le_bytes(bytes[0..2].try_into().unwrap());
435        let num_non_null_rows: u32 =
436            u16::from_le_bytes(bytes[2..4].try_into().unwrap()) as u32 + 1u32;
437        SerializedBlockMeta {
438            block_id,
439            num_non_null_rows,
440        }
441    }
442
443    #[inline]
444    fn to_bytes(self) -> [u8; SERIALIZED_BLOCK_META_NUM_BYTES] {
445        assert!(self.num_non_null_rows > 0);
446        let mut bytes = [0u8; SERIALIZED_BLOCK_META_NUM_BYTES];
447        bytes[0..2].copy_from_slice(&self.block_id.to_le_bytes());
448        // We don't store empty blocks, therefore we can subtract 1.
449        // This way we will be able to use u16 when the number of elements is 1 << 16 or u16::MAX+1
450        bytes[2..4].copy_from_slice(&((self.num_non_null_rows - 1u32) as u16).to_le_bytes());
451        bytes
452    }
453}
454
455#[inline]
456fn is_sparse(num_rows_in_block: u32) -> bool {
457    num_rows_in_block < DENSE_BLOCK_THRESHOLD
458}
459
460fn deserialize_optional_index_block_metadatas(
461    data: &[u8],
462    num_rows: u32,
463) -> (Box<[BlockMeta]>, u32) {
464    let num_blocks = data.len() / SERIALIZED_BLOCK_META_NUM_BYTES;
465    let mut block_metas = Vec::with_capacity(num_blocks + 1);
466    let mut start_byte_offset = 0;
467    let mut non_null_rows_before_block = 0;
468    for block_meta_bytes in data.chunks_exact(SERIALIZED_BLOCK_META_NUM_BYTES) {
469        let block_meta_bytes: [u8; SERIALIZED_BLOCK_META_NUM_BYTES] =
470            block_meta_bytes.try_into().unwrap();
471        let SerializedBlockMeta {
472            block_id,
473            num_non_null_rows,
474        } = SerializedBlockMeta::from_bytes(block_meta_bytes);
475        block_metas.resize(
476            block_id as usize,
477            BlockMeta {
478                non_null_rows_before_block,
479                start_byte_offset,
480                block_variant: BlockVariant::empty(),
481            },
482        );
483        let block_variant = if is_sparse(num_non_null_rows) {
484            BlockVariant::Sparse {
485                num_vals: num_non_null_rows as u16,
486            }
487        } else {
488            BlockVariant::Dense
489        };
490        block_metas.push(BlockMeta {
491            non_null_rows_before_block,
492            start_byte_offset,
493            block_variant,
494        });
495        start_byte_offset += block_variant.num_bytes_in_block();
496        non_null_rows_before_block += num_non_null_rows;
497    }
498    block_metas.resize(
499        ((num_rows + ELEMENTS_PER_BLOCK - 1) / ELEMENTS_PER_BLOCK) as usize,
500        BlockMeta {
501            non_null_rows_before_block,
502            start_byte_offset,
503            block_variant: BlockVariant::empty(),
504        },
505    );
506    (block_metas.into_boxed_slice(), non_null_rows_before_block)
507}
508
509pub fn open_optional_index(bytes: OwnedBytes) -> io::Result<OptionalIndex> {
510    let (mut bytes, num_non_empty_blocks_bytes) = bytes.rsplit(2);
511    let num_non_empty_block_bytes =
512        u16::from_le_bytes(num_non_empty_blocks_bytes.as_slice().try_into().unwrap());
513    let num_rows = VInt::deserialize_u64(&mut bytes)? as u32;
514    let block_metas_num_bytes =
515        num_non_empty_block_bytes as usize * SERIALIZED_BLOCK_META_NUM_BYTES;
516    let (block_data, block_metas) = bytes.rsplit(block_metas_num_bytes);
517    let (block_metas, num_non_null_rows) =
518        deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_rows);
519    let optional_index = OptionalIndex {
520        num_rows,
521        num_non_null_rows,
522        block_data,
523        block_metas: block_metas.into(),
524    };
525    Ok(optional_index)
526}
527
528#[cfg(test)]
529mod tests;