1use std::fs::File;
2use std::io::BufReader;
3
4use grenad::{CompressionType, Merger};
5use heed::types::Bytes;
6use heed::{BytesDecode, BytesEncode, Error, PutFlags, RoTxn, RwTxn};
7use roaring::RoaringBitmap;
8
9use super::{clear_facet_levels, FACET_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
10use crate::facet::FacetType;
11use crate::heed_codec::facet::{
12 FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
13};
14use crate::heed_codec::BytesRefCodec;
15use crate::update::del_add::{DelAdd, KvReaderDelAdd};
16use crate::update::index_documents::{create_writer, valid_lmdb_key, writer_into_reader};
17use crate::update::MergeDeladdCboRoaringBitmaps;
18use crate::{CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, FieldId, Index, Result};
19
20pub struct FacetsUpdateBulk<'i> {
26 index: &'i Index,
27 group_size: u8,
28 min_level_size: u8,
29 facet_type: FacetType,
30 field_ids: Vec<FieldId>,
31 delta_data: Option<Merger<BufReader<File>, MergeDeladdCboRoaringBitmaps>>,
33}
34
35impl<'i> FacetsUpdateBulk<'i> {
36 pub fn new(
37 index: &'i Index,
38 field_ids: Vec<FieldId>,
39 facet_type: FacetType,
40 delta_data: Merger<BufReader<File>, MergeDeladdCboRoaringBitmaps>,
41 group_size: u8,
42 min_level_size: u8,
43 ) -> FacetsUpdateBulk<'i> {
44 FacetsUpdateBulk {
45 index,
46 field_ids,
47 group_size,
48 min_level_size,
49 facet_type,
50 delta_data: Some(delta_data),
51 }
52 }
53
54 pub fn new_not_updating_level_0(
55 index: &'i Index,
56 field_ids: Vec<FieldId>,
57 facet_type: FacetType,
58 ) -> FacetsUpdateBulk<'i> {
59 FacetsUpdateBulk {
60 index,
61 field_ids,
62 group_size: FACET_GROUP_SIZE,
63 min_level_size: FACET_MIN_LEVEL_SIZE,
64 facet_type,
65 delta_data: None,
66 }
67 }
68
69 #[tracing::instrument(level = "trace", skip_all, target = "indexing::facets::bulk")]
70 pub fn execute(self, wtxn: &mut heed::RwTxn<'_>) -> Result<()> {
71 let Self { index, field_ids, group_size, min_level_size, facet_type, delta_data } = self;
72
73 let db = match facet_type {
74 FacetType::String => {
75 index.facet_id_string_docids.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
76 }
77 FacetType::Number => {
78 index.facet_id_f64_docids.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
79 }
80 };
81
82 let inner = FacetsUpdateBulkInner { db, delta_data, group_size, min_level_size };
83
84 inner.update(wtxn, &field_ids)?;
85
86 Ok(())
87 }
88}
89
90pub(crate) struct FacetsUpdateBulkInner<R: std::io::Read + std::io::Seek> {
92 pub db: heed::Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
93 pub delta_data: Option<Merger<R, MergeDeladdCboRoaringBitmaps>>,
94 pub group_size: u8,
95 pub min_level_size: u8,
96}
97impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
98 pub fn update(mut self, wtxn: &mut RwTxn<'_>, field_ids: &[u16]) -> Result<()> {
99 self.update_level0(wtxn)?;
100 clear_facet_levels(wtxn, &self.db.remap_data_type(), field_ids)?;
101
102 for &field_id in field_ids.iter() {
103 let level_readers = self.compute_levels_for_field_id(field_id, wtxn)?;
104
105 for level_reader in level_readers {
106 let mut cursor = level_reader.into_cursor()?;
107 while let Some((k, v)) = cursor.move_on_next()? {
108 self.db.remap_types::<Bytes, Bytes>().put(wtxn, k, v)?;
109 }
110 }
111 }
112 Ok(())
113 }
114
115 fn update_level0(&mut self, wtxn: &mut RwTxn<'_>) -> Result<()> {
116 let delta_data = match self.delta_data.take() {
117 Some(x) => x,
118 None => return Ok(()),
119 };
120 if self.db.is_empty(wtxn)? {
121 let mut buffer = Vec::new();
122 let mut database = self.db.iter_mut(wtxn)?.remap_types::<Bytes, Bytes>();
123 let mut iter = delta_data.into_stream_merger_iter()?;
124 while let Some((key, value)) = iter.next()? {
125 if !valid_lmdb_key(key) {
126 continue;
127 }
128 let value = KvReaderDelAdd::from_slice(value);
129
130 let Some(value) = value.get(DelAdd::Addition) else {
132 continue;
133 };
134
135 buffer.clear();
136 buffer.push(1);
138 buffer.extend_from_slice(value);
140 unsafe {
141 database.put_current_with_options::<Bytes>(PutFlags::APPEND, key, &buffer)?
142 };
143 }
144 } else {
145 let mut buffer = Vec::new();
146 let database = self.db.remap_types::<Bytes, Bytes>();
147
148 let mut iter = delta_data.into_stream_merger_iter()?;
149 while let Some((key, value)) = iter.next()? {
150 if !valid_lmdb_key(key) {
151 continue;
152 }
153
154 let value = KvReaderDelAdd::from_slice(value);
155
156 buffer.clear();
159 buffer.push(1);
160 match database.get(wtxn, key)? {
162 Some(prev_value) => {
163 let old_bitmap = &prev_value[1..];
165 CboRoaringBitmapCodec::merge_deladd_into(value, old_bitmap, &mut buffer)?;
166 }
167 None => {
168 let Some(value) = value.get(DelAdd::Addition) else {
170 continue;
172 };
173
174 buffer.extend_from_slice(value);
175 }
176 };
177 let new_bitmap = &buffer[1..];
178 if CboRoaringBitmapLenCodec::bytes_decode(new_bitmap).unwrap_or_default() == 0 {
180 database.delete(wtxn, key)?;
181 } else {
182 database.put(wtxn, key, &buffer)?;
183 }
184 }
185 }
186 Ok(())
187 }
188 fn compute_levels_for_field_id(
189 &self,
190 field_id: FieldId,
191 txn: &RoTxn<'_>,
192 ) -> Result<Vec<grenad::Reader<BufReader<File>>>> {
193 let subwriters = self.compute_higher_levels(txn, field_id, 32, &mut |_, _| Ok(()))?;
194
195 Ok(subwriters)
196 }
197 #[allow(clippy::type_complexity)]
198 fn read_level_0<'t>(
199 &self,
200 rtxn: &'t RoTxn<'t>,
201 field_id: u16,
202 handle_group: &mut dyn FnMut(&[RoaringBitmap], &'t [u8]) -> Result<()>,
203 ) -> Result<()> {
204 let mut bitmaps = vec![];
208
209 let mut level_0_prefix = vec![];
210 level_0_prefix.extend_from_slice(&field_id.to_be_bytes());
211 level_0_prefix.push(0);
212
213 let level_0_iter = self
214 .db
215 .remap_types::<Bytes, Bytes>()
216 .prefix_iter(rtxn, level_0_prefix.as_slice())?
217 .remap_types::<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>();
218
219 let mut left_bound: &[u8] = &[];
220 let mut first_iteration_for_new_group = true;
221 for el in level_0_iter {
222 let (key, value) = el?;
223 let bound = key.left_bound;
224 let docids = value.bitmap;
225
226 if first_iteration_for_new_group {
227 left_bound = bound;
228 first_iteration_for_new_group = false;
229 }
230 bitmaps.push(docids);
231
232 if bitmaps.len() == self.group_size as usize {
233 handle_group(&bitmaps, left_bound)?;
234 first_iteration_for_new_group = true;
235 bitmaps.clear();
236 }
237 }
238 if !bitmaps.is_empty() {
240 handle_group(&bitmaps, left_bound)?;
241 bitmaps.clear();
242 }
243 Ok(())
244 }
245
246 #[allow(clippy::type_complexity)]
252 fn compute_higher_levels<'t>(
253 &self,
254 rtxn: &'t RoTxn<'t>,
255 field_id: u16,
256 level: u8,
257 handle_group: &mut dyn FnMut(&[RoaringBitmap], &'t [u8]) -> Result<()>,
258 ) -> Result<Vec<grenad::Reader<BufReader<File>>>> {
259 if level == 0 {
260 self.read_level_0(rtxn, field_id, handle_group)?;
261 return Ok(vec![]);
263 }
264 let mut cur_writer = create_writer(CompressionType::None, None, tempfile::tempfile()?);
270 let mut cur_writer_len: usize = 0;
271
272 let mut group_sizes = vec![];
273 let mut left_bounds = vec![];
274 let mut bitmaps = vec![];
275
276 let mut sub_writers = self.compute_higher_levels(
279 rtxn,
280 field_id,
281 level - 1,
282 &mut |sub_bitmaps, left_bound| {
283 let mut combined_bitmap = RoaringBitmap::default();
284 for bitmap in sub_bitmaps {
285 combined_bitmap |= bitmap;
286 }
287 group_sizes.push(sub_bitmaps.len() as u8);
290 left_bounds.push(left_bound);
291
292 bitmaps.push(combined_bitmap);
293 if bitmaps.len() != self.group_size as usize {
294 return Ok(());
295 }
296 let left_bound = left_bounds.first().unwrap();
297 handle_group(&bitmaps, left_bound)?;
298
299 for ((bitmap, left_bound), group_size) in
300 bitmaps.drain(..).zip(left_bounds.drain(..)).zip(group_sizes.drain(..))
301 {
302 let key = FacetGroupKey { field_id, level, left_bound };
303 let key = FacetGroupKeyCodec::<BytesRefCodec>::bytes_encode(&key)
304 .map_err(Error::Encoding)?;
305 let value = FacetGroupValue { size: group_size, bitmap };
306 let value =
307 FacetGroupValueCodec::bytes_encode(&value).map_err(Error::Encoding)?;
308 cur_writer.insert(key, value)?;
309 cur_writer_len += 1;
310 }
311 Ok(())
312 },
313 )?;
314 if !bitmaps.is_empty() && (cur_writer_len >= self.min_level_size as usize - 1) {
320 assert!(bitmaps.len() < self.group_size as usize);
322 assert!(cur_writer_len > 0);
323
324 let left_bound = left_bounds.first().unwrap();
325 handle_group(&bitmaps, left_bound)?;
326
327 for ((bitmap, left_bound), group_size) in
329 bitmaps.drain(..).zip(left_bounds.drain(..)).zip(group_sizes.drain(..))
330 {
331 let key = FacetGroupKey { field_id, level, left_bound };
332 let key = FacetGroupKeyCodec::<BytesRefCodec>::bytes_encode(&key)
333 .map_err(Error::Encoding)?;
334 let value = FacetGroupValue { size: group_size, bitmap };
335 let value = FacetGroupValueCodec::bytes_encode(&value).map_err(Error::Encoding)?;
336 cur_writer.insert(key, value)?;
337 cur_writer_len += 1;
338 }
339 }
340 if cur_writer_len >= self.min_level_size as usize {
342 sub_writers.push(writer_into_reader(cur_writer)?);
343 } else {
344 if !bitmaps.is_empty() {
347 handle_group(&bitmaps, left_bounds.first().unwrap())?;
348 }
349 }
350 Ok(sub_writers)
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use std::iter::once;
357
358 use roaring::RoaringBitmap;
359
360 use crate::documents::mmap_from_objects;
361 use crate::heed_codec::facet::OrderedF64Codec;
362 use crate::heed_codec::StrRefCodec;
363 use crate::index::tests::TempIndex;
364 use crate::update::facet::test_helpers::{ordered_string, FacetIndex};
365 use crate::{db_snap, milli_snap, FilterableAttributesRule};
366
367 #[test]
368 fn insert() {
369 let test = |name: &str, group_size: u8, min_level_size: u8| {
370 let index =
371 FacetIndex::<OrderedF64Codec>::new(group_size, 0 , min_level_size);
372
373 let mut elements = Vec::<((u16, f64), RoaringBitmap)>::new();
374 for i in 0..1_000u32 {
375 elements.push(((0, i as f64), once(i).collect()));
377 }
378 for i in 0..100u32 {
379 elements.push(((1, i as f64), once(i).collect()));
381 }
382 let mut wtxn = index.env.write_txn().unwrap();
383 index.bulk_insert(&mut wtxn, &[0, 1], elements.iter());
384
385 index.verify_structure_validity(&wtxn, 0);
386 index.verify_structure_validity(&wtxn, 1);
387
388 wtxn.commit().unwrap();
389
390 milli_snap!(format!("{index}"), name);
391 };
392
393 test("default", 4, 5);
394 test("small_group_small_min_level", 2, 2);
395 test("small_group_large_min_level", 2, 128);
396 test("large_group_small_min_level", 16, 2);
397 test("odd_group_odd_min_level", 7, 3);
398 }
399 #[test]
400 fn insert_delete_field_insert() {
401 let test = |name: &str, group_size: u8, min_level_size: u8| {
402 let index =
403 FacetIndex::<OrderedF64Codec>::new(group_size, 0 , min_level_size);
404 let mut wtxn = index.env.write_txn().unwrap();
405
406 let mut elements = Vec::<((u16, f64), RoaringBitmap)>::new();
407 for i in 0..100u32 {
408 elements.push(((0, i as f64), once(i).collect()));
410 }
411 for i in 0..100u32 {
412 elements.push(((1, i as f64), once(i).collect()));
414 }
415 index.bulk_insert(&mut wtxn, &[0, 1], elements.iter());
416
417 index.verify_structure_validity(&wtxn, 0);
418 index.verify_structure_validity(&wtxn, 1);
419 for i in 0..100u32 {
421 index.delete_single_docid(&mut wtxn, 0, &(i as f64), i);
422 }
423 index.verify_structure_validity(&wtxn, 0);
424 index.verify_structure_validity(&wtxn, 1);
425
426 let mut elements = Vec::<((u16, f64), RoaringBitmap)>::new();
427 for i in 0..110u32 {
429 elements.push(((1, i as f64), once(i).collect()));
431 }
432 index.verify_structure_validity(&wtxn, 0);
433 index.verify_structure_validity(&wtxn, 1);
434 index.bulk_insert(&mut wtxn, &[0, 1], elements.iter());
435
436 wtxn.commit().unwrap();
437
438 milli_snap!(format!("{index}"), name);
439 };
440
441 test("default", 4, 5);
442 test("small_group_small_min_level", 2, 2);
443 test("small_group_large_min_level", 2, 128);
444 test("large_group_small_min_level", 16, 2);
445 test("odd_group_odd_min_level", 7, 3);
446 }
447
448 #[test]
449 fn bug_3165() {
450 let index = TempIndex::new_with_map_size(4096 * 1000 * 100);
461
462 index
463 .update_settings(|settings| {
464 settings.set_primary_key("id".to_owned());
465 settings
466 .set_filterable_fields(vec![FilterableAttributesRule::Field("id".to_string())]);
467 })
468 .unwrap();
469
470 let mut documents = vec![];
471 for i in 0..=22_540 {
472 documents.push(
473 serde_json::json! {
474 {
475 "id": i as u64,
476 }
477 }
478 .as_object()
479 .unwrap()
480 .clone(),
481 );
482 }
483
484 let documents = mmap_from_objects(documents);
485 index.add_documents(documents).unwrap();
486
487 db_snap!(index, facet_id_f64_docids, "initial", @"c34f499261f3510d862fa0283bbe843a");
488 }
489
490 #[test]
491 fn insert_string() {
492 let test = |name: &str, group_size: u8, min_level_size: u8| {
493 let index = FacetIndex::<StrRefCodec>::new(group_size, 0 , min_level_size);
494
495 let strings = (0..1_000).map(|i| ordered_string(i as usize)).collect::<Vec<_>>();
496 let mut elements = Vec::<((u16, &str), RoaringBitmap)>::new();
497 for i in 0..1_000u32 {
498 elements.push(((0, &strings[i as usize]), once(i).collect()));
500 }
501 for i in 0..100u32 {
502 elements.push(((1, &strings[i as usize]), once(i).collect()));
504 }
505 let mut wtxn = index.env.write_txn().unwrap();
506 index.bulk_insert(&mut wtxn, &[0, 1], elements.iter());
507
508 index.verify_structure_validity(&wtxn, 0);
509 index.verify_structure_validity(&wtxn, 1);
510
511 wtxn.commit().unwrap();
512
513 milli_snap!(format!("{index}"), name);
514 };
515
516 test("default", 4, 5);
517 test("small_group_small_min_level", 2, 2);
518 test("small_group_large_min_level", 2, 128);
519 test("large_group_small_min_level", 16, 2);
520 test("odd_group_odd_min_level", 7, 3);
521 }
522}