milli_core/update/
word_prefix_docids.rs

1use std::collections::{HashMap, HashSet};
2
3use grenad::CompressionType;
4use heed::types::{Bytes, Str};
5use heed::Database;
6
7use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd};
8use crate::update::index_documents::{
9    create_sorter, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key,
10    write_sorter_into_database, CursorClonableMmap, MergeDeladdCboRoaringBitmaps,
11};
12use crate::{CboRoaringBitmapCodec, Result};
13
14pub struct WordPrefixDocids<'t, 'i> {
15    wtxn: &'t mut heed::RwTxn<'i>,
16    word_docids: Database<Str, CboRoaringBitmapCodec>,
17    word_prefix_docids: Database<Str, CboRoaringBitmapCodec>,
18    pub(crate) chunk_compression_type: CompressionType,
19    pub(crate) chunk_compression_level: Option<u32>,
20    pub(crate) max_nb_chunks: Option<usize>,
21    pub(crate) max_memory: Option<usize>,
22}
23
24impl<'t, 'i> WordPrefixDocids<'t, 'i> {
25    pub fn new(
26        wtxn: &'t mut heed::RwTxn<'i>,
27        word_docids: Database<Str, CboRoaringBitmapCodec>,
28        word_prefix_docids: Database<Str, CboRoaringBitmapCodec>,
29    ) -> WordPrefixDocids<'t, 'i> {
30        WordPrefixDocids {
31            wtxn,
32            word_docids,
33            word_prefix_docids,
34            chunk_compression_type: CompressionType::None,
35            chunk_compression_level: None,
36            max_nb_chunks: None,
37            max_memory: None,
38        }
39    }
40
41    #[tracing::instrument(
42        level = "trace",
43        skip_all,
44        target = "indexing::prefix",
45        name = "word_prefix_docids"
46    )]
47    pub fn execute(
48        self,
49        new_word_docids: grenad::Merger<CursorClonableMmap, MergeDeladdCboRoaringBitmaps>,
50        new_prefix_fst_words: &[String],
51        common_prefix_fst_words: &[&[String]],
52        del_prefix_fst_words: &HashSet<Vec<u8>>,
53    ) -> Result<()> {
54        // It is forbidden to keep a mutable reference into the database
55        // and write into it at the same time, therefore we write into another file.
56        let mut prefix_docids_sorter = create_sorter(
57            grenad::SortAlgorithm::Unstable,
58            MergeDeladdCboRoaringBitmaps,
59            self.chunk_compression_type,
60            self.chunk_compression_level,
61            self.max_nb_chunks,
62            self.max_memory,
63            true,
64        );
65
66        if !common_prefix_fst_words.is_empty() {
67            let mut current_prefixes: Option<&&[String]> = None;
68            let mut prefixes_cache = HashMap::new();
69            let mut new_word_docids_iter = new_word_docids.into_stream_merger_iter()?;
70            while let Some((word, data)) = new_word_docids_iter.next()? {
71                current_prefixes = match current_prefixes.take() {
72                    Some(prefixes) if word.starts_with(prefixes[0].as_bytes()) => Some(prefixes),
73                    _otherwise => {
74                        write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
75                        common_prefix_fst_words
76                            .iter()
77                            .find(|prefixes| word.starts_with(prefixes[0].as_bytes()))
78                    }
79                };
80
81                if let Some(prefixes) = current_prefixes {
82                    for prefix in prefixes.iter() {
83                        if word.starts_with(prefix.as_bytes()) {
84                            match prefixes_cache.get_mut(prefix.as_bytes()) {
85                                Some(value) => value.push(data.to_owned()),
86                                None => {
87                                    prefixes_cache
88                                        .insert(prefix.clone().into(), vec![data.to_owned()]);
89                                }
90                            }
91                        }
92                    }
93                }
94            }
95
96            write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
97        }
98
99        // We fetch the docids associated to the newly added word prefix fst only.
100        let db = self.word_docids.remap_data_type::<Bytes>();
101        let mut buffer = Vec::new();
102        for prefix in new_prefix_fst_words {
103            let prefix = std::str::from_utf8(prefix.as_bytes())?;
104            for result in db.prefix_iter(self.wtxn, prefix)? {
105                let (_word, data) = result?;
106                buffer.clear();
107                let mut writer = KvWriterDelAdd::new(&mut buffer);
108                writer.insert(DelAdd::Addition, data)?;
109
110                prefix_docids_sorter.insert(prefix, writer.into_inner()?)?;
111            }
112        }
113
114        // We remove all the entries that are no more required in this word prefix docids database.
115        let mut iter = self.word_prefix_docids.iter_mut(self.wtxn)?.lazily_decode_data();
116        while let Some((prefix, _)) = iter.next().transpose()? {
117            if del_prefix_fst_words.contains(prefix.as_bytes()) {
118                unsafe { iter.del_current()? };
119            }
120        }
121
122        drop(iter);
123
124        let database_is_empty = self.word_prefix_docids.is_empty(self.wtxn)?;
125
126        // We finally write the word prefix docids into the LMDB database.
127        write_sorter_into_database(
128            prefix_docids_sorter,
129            &self.word_prefix_docids,
130            self.wtxn,
131            database_is_empty,
132            deladd_serialize_add_side,
133            merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
134        )?;
135
136        Ok(())
137    }
138}
139
140fn write_prefixes_in_sorter(
141    prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>,
142    sorter: &mut grenad::Sorter<MergeDeladdCboRoaringBitmaps>,
143) -> Result<()> {
144    for (key, data_slices) in prefixes.drain() {
145        for data in data_slices {
146            if valid_lmdb_key(&key) {
147                sorter.insert(&key, data)?;
148            }
149        }
150    }
151
152    Ok(())
153}