milli_core/update/facet/
bulk.rs

1use std::fs::File;
2use std::io::BufReader;
3
4use grenad::{CompressionType, Merger};
5use heed::types::Bytes;
6use heed::{BytesDecode, BytesEncode, Error, PutFlags, RoTxn, RwTxn};
7use roaring::RoaringBitmap;
8
9use super::{clear_facet_levels, FACET_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
10use crate::facet::FacetType;
11use crate::heed_codec::facet::{
12    FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
13};
14use crate::heed_codec::BytesRefCodec;
15use crate::update::del_add::{DelAdd, KvReaderDelAdd};
16use crate::update::index_documents::{create_writer, valid_lmdb_key, writer_into_reader};
17use crate::update::MergeDeladdCboRoaringBitmaps;
18use crate::{CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, FieldId, Index, Result};
19
20/// Algorithm to insert elememts into the `facet_id_(string/f64)_docids` databases
21/// by rebuilding the database "from scratch".
22///
23/// First, the new elements are inserted into the level 0 of the database. Then, the
24/// higher levels are cleared and recomputed from the content of level 0.
25pub struct FacetsUpdateBulk<'i> {
26    index: &'i Index,
27    group_size: u8,
28    min_level_size: u8,
29    facet_type: FacetType,
30    field_ids: Vec<FieldId>,
31    // None if level 0 does not need to be updated
32    delta_data: Option<Merger<BufReader<File>, MergeDeladdCboRoaringBitmaps>>,
33}
34
35impl<'i> FacetsUpdateBulk<'i> {
36    pub fn new(
37        index: &'i Index,
38        field_ids: Vec<FieldId>,
39        facet_type: FacetType,
40        delta_data: Merger<BufReader<File>, MergeDeladdCboRoaringBitmaps>,
41        group_size: u8,
42        min_level_size: u8,
43    ) -> FacetsUpdateBulk<'i> {
44        FacetsUpdateBulk {
45            index,
46            field_ids,
47            group_size,
48            min_level_size,
49            facet_type,
50            delta_data: Some(delta_data),
51        }
52    }
53
54    pub fn new_not_updating_level_0(
55        index: &'i Index,
56        field_ids: Vec<FieldId>,
57        facet_type: FacetType,
58    ) -> FacetsUpdateBulk<'i> {
59        FacetsUpdateBulk {
60            index,
61            field_ids,
62            group_size: FACET_GROUP_SIZE,
63            min_level_size: FACET_MIN_LEVEL_SIZE,
64            facet_type,
65            delta_data: None,
66        }
67    }
68
69    #[tracing::instrument(level = "trace", skip_all, target = "indexing::facets::bulk")]
70    pub fn execute(self, wtxn: &mut heed::RwTxn<'_>) -> Result<()> {
71        let Self { index, field_ids, group_size, min_level_size, facet_type, delta_data } = self;
72
73        let db = match facet_type {
74            FacetType::String => {
75                index.facet_id_string_docids.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
76            }
77            FacetType::Number => {
78                index.facet_id_f64_docids.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
79            }
80        };
81
82        let inner = FacetsUpdateBulkInner { db, delta_data, group_size, min_level_size };
83
84        inner.update(wtxn, &field_ids)?;
85
86        Ok(())
87    }
88}
89
90/// Implementation of `FacetsUpdateBulk` that is independent of milli's `Index` type
91pub(crate) struct FacetsUpdateBulkInner<R: std::io::Read + std::io::Seek> {
92    pub db: heed::Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
93    pub delta_data: Option<Merger<R, MergeDeladdCboRoaringBitmaps>>,
94    pub group_size: u8,
95    pub min_level_size: u8,
96}
97impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
98    pub fn update(mut self, wtxn: &mut RwTxn<'_>, field_ids: &[u16]) -> Result<()> {
99        self.update_level0(wtxn)?;
100        clear_facet_levels(wtxn, &self.db.remap_data_type(), field_ids)?;
101
102        for &field_id in field_ids.iter() {
103            let level_readers = self.compute_levels_for_field_id(field_id, wtxn)?;
104
105            for level_reader in level_readers {
106                let mut cursor = level_reader.into_cursor()?;
107                while let Some((k, v)) = cursor.move_on_next()? {
108                    self.db.remap_types::<Bytes, Bytes>().put(wtxn, k, v)?;
109                }
110            }
111        }
112        Ok(())
113    }
114
115    fn update_level0(&mut self, wtxn: &mut RwTxn<'_>) -> Result<()> {
116        let delta_data = match self.delta_data.take() {
117            Some(x) => x,
118            None => return Ok(()),
119        };
120        if self.db.is_empty(wtxn)? {
121            let mut buffer = Vec::new();
122            let mut database = self.db.iter_mut(wtxn)?.remap_types::<Bytes, Bytes>();
123            let mut iter = delta_data.into_stream_merger_iter()?;
124            while let Some((key, value)) = iter.next()? {
125                if !valid_lmdb_key(key) {
126                    continue;
127                }
128                let value = KvReaderDelAdd::from_slice(value);
129
130                // DB is empty, it is safe to ignore Del operations
131                let Some(value) = value.get(DelAdd::Addition) else {
132                    continue;
133                };
134
135                buffer.clear();
136                // the group size for level 0
137                buffer.push(1);
138                // then we extend the buffer with the docids bitmap
139                buffer.extend_from_slice(value);
140                unsafe {
141                    database.put_current_with_options::<Bytes>(PutFlags::APPEND, key, &buffer)?
142                };
143            }
144        } else {
145            let mut buffer = Vec::new();
146            let database = self.db.remap_types::<Bytes, Bytes>();
147
148            let mut iter = delta_data.into_stream_merger_iter()?;
149            while let Some((key, value)) = iter.next()? {
150                if !valid_lmdb_key(key) {
151                    continue;
152                }
153
154                let value = KvReaderDelAdd::from_slice(value);
155
156                // the value is a CboRoaringBitmap, but I still need to prepend the
157                // group size for level 0 (= 1) to it
158                buffer.clear();
159                buffer.push(1);
160                // then we extend the buffer with the docids bitmap
161                match database.get(wtxn, key)? {
162                    Some(prev_value) => {
163                        // prev_value is the group size for level 0, followed by the previous bitmap.
164                        let old_bitmap = &prev_value[1..];
165                        CboRoaringBitmapCodec::merge_deladd_into(value, old_bitmap, &mut buffer)?;
166                    }
167                    None => {
168                        // it is safe to ignore the del in that case.
169                        let Some(value) = value.get(DelAdd::Addition) else {
170                            // won't put the key in DB as the value would be empty
171                            continue;
172                        };
173
174                        buffer.extend_from_slice(value);
175                    }
176                };
177                let new_bitmap = &buffer[1..];
178                // if the new bitmap is empty, let's remove it
179                if CboRoaringBitmapLenCodec::bytes_decode(new_bitmap).unwrap_or_default() == 0 {
180                    database.delete(wtxn, key)?;
181                } else {
182                    database.put(wtxn, key, &buffer)?;
183                }
184            }
185        }
186        Ok(())
187    }
188    fn compute_levels_for_field_id(
189        &self,
190        field_id: FieldId,
191        txn: &RoTxn<'_>,
192    ) -> Result<Vec<grenad::Reader<BufReader<File>>>> {
193        let subwriters = self.compute_higher_levels(txn, field_id, 32, &mut |_, _| Ok(()))?;
194
195        Ok(subwriters)
196    }
197    #[allow(clippy::type_complexity)]
198    fn read_level_0<'t>(
199        &self,
200        rtxn: &'t RoTxn<'t>,
201        field_id: u16,
202        handle_group: &mut dyn FnMut(&[RoaringBitmap], &'t [u8]) -> Result<()>,
203    ) -> Result<()> {
204        // we read the elements one by one and
205        // 1. keep track of the left bound
206        // 2. fill the `bitmaps` vector to give it to level 1 once `level_group_size` elements were read
207        let mut bitmaps = vec![];
208
209        let mut level_0_prefix = vec![];
210        level_0_prefix.extend_from_slice(&field_id.to_be_bytes());
211        level_0_prefix.push(0);
212
213        let level_0_iter = self
214            .db
215            .remap_types::<Bytes, Bytes>()
216            .prefix_iter(rtxn, level_0_prefix.as_slice())?
217            .remap_types::<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>();
218
219        let mut left_bound: &[u8] = &[];
220        let mut first_iteration_for_new_group = true;
221        for el in level_0_iter {
222            let (key, value) = el?;
223            let bound = key.left_bound;
224            let docids = value.bitmap;
225
226            if first_iteration_for_new_group {
227                left_bound = bound;
228                first_iteration_for_new_group = false;
229            }
230            bitmaps.push(docids);
231
232            if bitmaps.len() == self.group_size as usize {
233                handle_group(&bitmaps, left_bound)?;
234                first_iteration_for_new_group = true;
235                bitmaps.clear();
236            }
237        }
238        // don't forget to give the leftover bitmaps as well
239        if !bitmaps.is_empty() {
240            handle_group(&bitmaps, left_bound)?;
241            bitmaps.clear();
242        }
243        Ok(())
244    }
245
246    /// Compute the content of the database levels from its level 0 for the given field id.
247    ///
248    /// ## Returns:
249    /// A vector of grenad::Reader. The reader at index `i` corresponds to the elements of level `i + 1`
250    /// that must be inserted into the database.
251    #[allow(clippy::type_complexity)]
252    fn compute_higher_levels<'t>(
253        &self,
254        rtxn: &'t RoTxn<'t>,
255        field_id: u16,
256        level: u8,
257        handle_group: &mut dyn FnMut(&[RoaringBitmap], &'t [u8]) -> Result<()>,
258    ) -> Result<Vec<grenad::Reader<BufReader<File>>>> {
259        if level == 0 {
260            self.read_level_0(rtxn, field_id, handle_group)?;
261            // Level 0 is already in the database
262            return Ok(vec![]);
263        }
264        // level >= 1
265        // we compute each element of this level based on the elements of the level below it
266        // once we have computed `level_group_size` elements, we give the left bound
267        // of those elements, and their bitmaps, to the level above
268
269        let mut cur_writer = create_writer(CompressionType::None, None, tempfile::tempfile()?);
270        let mut cur_writer_len: usize = 0;
271
272        let mut group_sizes = vec![];
273        let mut left_bounds = vec![];
274        let mut bitmaps = vec![];
275
276        // compute the levels below
277        // in the callback, we fill `cur_writer` with the correct elements for this level
278        let mut sub_writers = self.compute_higher_levels(
279            rtxn,
280            field_id,
281            level - 1,
282            &mut |sub_bitmaps, left_bound| {
283                let mut combined_bitmap = RoaringBitmap::default();
284                for bitmap in sub_bitmaps {
285                    combined_bitmap |= bitmap;
286                }
287                // The conversion of sub_bitmaps.len() to a u8 will always be correct
288                // since its length is bounded by max_group_size, which is a u8.
289                group_sizes.push(sub_bitmaps.len() as u8);
290                left_bounds.push(left_bound);
291
292                bitmaps.push(combined_bitmap);
293                if bitmaps.len() != self.group_size as usize {
294                    return Ok(());
295                }
296                let left_bound = left_bounds.first().unwrap();
297                handle_group(&bitmaps, left_bound)?;
298
299                for ((bitmap, left_bound), group_size) in
300                    bitmaps.drain(..).zip(left_bounds.drain(..)).zip(group_sizes.drain(..))
301                {
302                    let key = FacetGroupKey { field_id, level, left_bound };
303                    let key = FacetGroupKeyCodec::<BytesRefCodec>::bytes_encode(&key)
304                        .map_err(Error::Encoding)?;
305                    let value = FacetGroupValue { size: group_size, bitmap };
306                    let value =
307                        FacetGroupValueCodec::bytes_encode(&value).map_err(Error::Encoding)?;
308                    cur_writer.insert(key, value)?;
309                    cur_writer_len += 1;
310                }
311                Ok(())
312            },
313        )?;
314        // don't forget to insert the leftover elements into the writer as well
315
316        // but only do so if the current number of elements to be inserted into this
317        // levelcould grow to the minimum level size
318
319        if !bitmaps.is_empty() && (cur_writer_len >= self.min_level_size as usize - 1) {
320            // the length of bitmaps is between 0 and group_size
321            assert!(bitmaps.len() < self.group_size as usize);
322            assert!(cur_writer_len > 0);
323
324            let left_bound = left_bounds.first().unwrap();
325            handle_group(&bitmaps, left_bound)?;
326
327            // Note: how many bitmaps are there here?
328            for ((bitmap, left_bound), group_size) in
329                bitmaps.drain(..).zip(left_bounds.drain(..)).zip(group_sizes.drain(..))
330            {
331                let key = FacetGroupKey { field_id, level, left_bound };
332                let key = FacetGroupKeyCodec::<BytesRefCodec>::bytes_encode(&key)
333                    .map_err(Error::Encoding)?;
334                let value = FacetGroupValue { size: group_size, bitmap };
335                let value = FacetGroupValueCodec::bytes_encode(&value).map_err(Error::Encoding)?;
336                cur_writer.insert(key, value)?;
337                cur_writer_len += 1;
338            }
339        }
340        // if we inserted enough elements to reach the minimum level size, then we push the writer
341        if cur_writer_len >= self.min_level_size as usize {
342            sub_writers.push(writer_into_reader(cur_writer)?);
343        } else {
344            // otherwise, if there are still leftover elements, we give them to the level above
345            // this is necessary in order to get the union of all docids
346            if !bitmaps.is_empty() {
347                handle_group(&bitmaps, left_bounds.first().unwrap())?;
348            }
349        }
350        Ok(sub_writers)
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use std::iter::once;
357
358    use roaring::RoaringBitmap;
359
360    use crate::documents::mmap_from_objects;
361    use crate::heed_codec::facet::OrderedF64Codec;
362    use crate::heed_codec::StrRefCodec;
363    use crate::index::tests::TempIndex;
364    use crate::update::facet::test_helpers::{ordered_string, FacetIndex};
365    use crate::{db_snap, milli_snap, FilterableAttributesRule};
366
367    #[test]
368    fn insert() {
369        let test = |name: &str, group_size: u8, min_level_size: u8| {
370            let index =
371                FacetIndex::<OrderedF64Codec>::new(group_size, 0 /*NA*/, min_level_size);
372
373            let mut elements = Vec::<((u16, f64), RoaringBitmap)>::new();
374            for i in 0..1_000u32 {
375                // field id = 0, left_bound = i, docids = [i]
376                elements.push(((0, i as f64), once(i).collect()));
377            }
378            for i in 0..100u32 {
379                // field id = 1, left_bound = i, docids = [i]
380                elements.push(((1, i as f64), once(i).collect()));
381            }
382            let mut wtxn = index.env.write_txn().unwrap();
383            index.bulk_insert(&mut wtxn, &[0, 1], elements.iter());
384
385            index.verify_structure_validity(&wtxn, 0);
386            index.verify_structure_validity(&wtxn, 1);
387
388            wtxn.commit().unwrap();
389
390            milli_snap!(format!("{index}"), name);
391        };
392
393        test("default", 4, 5);
394        test("small_group_small_min_level", 2, 2);
395        test("small_group_large_min_level", 2, 128);
396        test("large_group_small_min_level", 16, 2);
397        test("odd_group_odd_min_level", 7, 3);
398    }
399    #[test]
400    fn insert_delete_field_insert() {
401        let test = |name: &str, group_size: u8, min_level_size: u8| {
402            let index =
403                FacetIndex::<OrderedF64Codec>::new(group_size, 0 /*NA*/, min_level_size);
404            let mut wtxn = index.env.write_txn().unwrap();
405
406            let mut elements = Vec::<((u16, f64), RoaringBitmap)>::new();
407            for i in 0..100u32 {
408                // field id = 0, left_bound = i, docids = [i]
409                elements.push(((0, i as f64), once(i).collect()));
410            }
411            for i in 0..100u32 {
412                // field id = 1, left_bound = i, docids = [i]
413                elements.push(((1, i as f64), once(i).collect()));
414            }
415            index.bulk_insert(&mut wtxn, &[0, 1], elements.iter());
416
417            index.verify_structure_validity(&wtxn, 0);
418            index.verify_structure_validity(&wtxn, 1);
419            // delete all the elements for the facet id 0
420            for i in 0..100u32 {
421                index.delete_single_docid(&mut wtxn, 0, &(i as f64), i);
422            }
423            index.verify_structure_validity(&wtxn, 0);
424            index.verify_structure_validity(&wtxn, 1);
425
426            let mut elements = Vec::<((u16, f64), RoaringBitmap)>::new();
427            // then add some elements again for the facet id 1
428            for i in 0..110u32 {
429                // field id = 1, left_bound = i, docids = [i]
430                elements.push(((1, i as f64), once(i).collect()));
431            }
432            index.verify_structure_validity(&wtxn, 0);
433            index.verify_structure_validity(&wtxn, 1);
434            index.bulk_insert(&mut wtxn, &[0, 1], elements.iter());
435
436            wtxn.commit().unwrap();
437
438            milli_snap!(format!("{index}"), name);
439        };
440
441        test("default", 4, 5);
442        test("small_group_small_min_level", 2, 2);
443        test("small_group_large_min_level", 2, 128);
444        test("large_group_small_min_level", 16, 2);
445        test("odd_group_odd_min_level", 7, 3);
446    }
447
448    #[test]
449    fn bug_3165() {
450        // Indexing a number of facet values that falls within certains ranges (e.g. 22_540 qualifies)
451        // would lead to a facet DB which was missing some levels.
452        // That was because before writing a level into the database, we would
453        // check that its size was higher than the minimum level size using
454        // a lossy integer conversion: `level_size as u8 >= min_level_size`.
455        //
456        // This missing level in the facet DBs would make the incremental indexer
457        // (and other search algorithms) crash.
458        //
459        // https://github.com/meilisearch/meilisearch/issues/3165
460        let index = TempIndex::new_with_map_size(4096 * 1000 * 100);
461
462        index
463            .update_settings(|settings| {
464                settings.set_primary_key("id".to_owned());
465                settings
466                    .set_filterable_fields(vec![FilterableAttributesRule::Field("id".to_string())]);
467            })
468            .unwrap();
469
470        let mut documents = vec![];
471        for i in 0..=22_540 {
472            documents.push(
473                serde_json::json! {
474                    {
475                        "id": i as u64,
476                    }
477                }
478                .as_object()
479                .unwrap()
480                .clone(),
481            );
482        }
483
484        let documents = mmap_from_objects(documents);
485        index.add_documents(documents).unwrap();
486
487        db_snap!(index, facet_id_f64_docids, "initial", @"c34f499261f3510d862fa0283bbe843a");
488    }
489
490    #[test]
491    fn insert_string() {
492        let test = |name: &str, group_size: u8, min_level_size: u8| {
493            let index = FacetIndex::<StrRefCodec>::new(group_size, 0 /*NA*/, min_level_size);
494
495            let strings = (0..1_000).map(|i| ordered_string(i as usize)).collect::<Vec<_>>();
496            let mut elements = Vec::<((u16, &str), RoaringBitmap)>::new();
497            for i in 0..1_000u32 {
498                // field id = 0, left_bound = i, docids = [i]
499                elements.push(((0, &strings[i as usize]), once(i).collect()));
500            }
501            for i in 0..100u32 {
502                // field id = 1, left_bound = i, docids = [i]
503                elements.push(((1, &strings[i as usize]), once(i).collect()));
504            }
505            let mut wtxn = index.env.write_txn().unwrap();
506            index.bulk_insert(&mut wtxn, &[0, 1], elements.iter());
507
508            index.verify_structure_validity(&wtxn, 0);
509            index.verify_structure_validity(&wtxn, 1);
510
511            wtxn.commit().unwrap();
512
513            milli_snap!(format!("{index}"), name);
514        };
515
516        test("default", 4, 5);
517        test("small_group_small_min_level", 2, 2);
518        test("small_group_large_min_level", 2, 128);
519        test("large_group_small_min_level", 16, 2);
520        test("odd_group_odd_min_level", 7, 3);
521    }
522}