milli_core/update/new/
merger.rs

1use std::cell::RefCell;
2
3use hashbrown::HashMap;
4use heed::types::Bytes;
5use heed::{Database, RoTxn};
6use memmap2::Mmap;
7use rayon::iter::{IntoParallelIterator, ParallelIterator};
8use roaring::RoaringBitmap;
9
10use super::channel::*;
11use super::extract::{
12    merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
13    FacetKind, GeoExtractorData,
14};
15use crate::update::facet::new_incremental::FacetFieldIdChange;
16use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};
17
18#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
19pub fn merge_and_send_rtree<'extractor, MSP>(
20    datastore: impl IntoIterator<Item = RefCell<GeoExtractorData<'extractor>>>,
21    rtxn: &RoTxn,
22    index: &Index,
23    geo_sender: GeoSender<'_, '_>,
24    must_stop_processing: &MSP,
25) -> Result<()>
26where
27    MSP: Fn() -> bool + Sync,
28{
29    let mut rtree = index.geo_rtree(rtxn)?.unwrap_or_default();
30    let mut faceted = index.geo_faceted_documents_ids(rtxn)?;
31
32    for data in datastore {
33        if must_stop_processing() {
34            return Err(InternalError::AbortedIndexation.into());
35        }
36
37        let mut frozen = data.into_inner().freeze()?;
38        for result in frozen.iter_and_clear_removed()? {
39            let extracted_geo_point = result?;
40            let removed = rtree.remove(&GeoPoint::from(extracted_geo_point));
41            debug_assert!(removed.is_some());
42            let removed = faceted.remove(extracted_geo_point.docid);
43            debug_assert!(removed);
44        }
45
46        for result in frozen.iter_and_clear_inserted()? {
47            let extracted_geo_point = result?;
48            rtree.insert(GeoPoint::from(extracted_geo_point));
49            let inserted = faceted.insert(extracted_geo_point.docid);
50            debug_assert!(inserted);
51        }
52    }
53
54    let mut file = tempfile::tempfile()?;
55    bincode::serialize_into(&mut file, &rtree).map_err(InternalError::BincodeError)?;
56    file.sync_all()?;
57
58    let rtree_mmap = unsafe { Mmap::map(&file)? };
59    geo_sender.set_rtree(rtree_mmap).unwrap();
60    geo_sender.set_geo_faceted(&faceted)?;
61
62    Ok(())
63}
64
65#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
66pub fn merge_and_send_docids<'extractor, MSP, D>(
67    mut caches: Vec<BalancedCaches<'extractor>>,
68    database: Database<Bytes, Bytes>,
69    index: &Index,
70    docids_sender: WordDocidsSender<D>,
71    must_stop_processing: &MSP,
72) -> Result<()>
73where
74    MSP: Fn() -> bool + Sync,
75    D: DatabaseType + Sync,
76{
77    transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| {
78        let rtxn = index.read_txn()?;
79        if must_stop_processing() {
80            return Err(InternalError::AbortedIndexation.into());
81        }
82        merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
83            let current = database.get(&rtxn, key)?;
84            match merge_cbo_bitmaps(current, del, add)? {
85                Operation::Write(bitmap) => docids_sender.write(key, &bitmap),
86                Operation::Delete => docids_sender.delete(key),
87                Operation::Ignore => Ok(()),
88            }
89        })
90    })
91}
92
93#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
94pub fn merge_and_send_facet_docids<'extractor>(
95    mut caches: Vec<BalancedCaches<'extractor>>,
96    database: FacetDatabases,
97    index: &Index,
98    rtxn: &RoTxn,
99    docids_sender: FacetDocidsSender,
100) -> Result<FacetFieldIdsDelta> {
101    let max_string_count = (index.facet_id_string_docids.len(rtxn)? / 500) as usize;
102    let max_number_count = (index.facet_id_f64_docids.len(rtxn)? / 500) as usize;
103    let max_string_count = max_string_count.clamp(1000, 100_000);
104    let max_number_count = max_number_count.clamp(1000, 100_000);
105    transpose_and_freeze_caches(&mut caches)?
106        .into_par_iter()
107        .map(|frozen| {
108            let mut facet_field_ids_delta =
109                FacetFieldIdsDelta::new(max_string_count, max_number_count);
110            let rtxn = index.read_txn()?;
111            merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
112                let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
113                match merge_cbo_bitmaps(current, del, add)? {
114                    Operation::Write(bitmap) => {
115                        facet_field_ids_delta.register_from_key(key);
116                        docids_sender.write(key, &bitmap)?;
117                        Ok(())
118                    }
119                    Operation::Delete => {
120                        facet_field_ids_delta.register_from_key(key);
121                        docids_sender.delete(key)?;
122                        Ok(())
123                    }
124                    Operation::Ignore => Ok(()),
125                }
126            })?;
127            Ok(facet_field_ids_delta)
128        })
129        .reduce(
130            || Ok(FacetFieldIdsDelta::new(max_string_count, max_number_count)),
131            |lhs, rhs| Ok(lhs?.merge(rhs?)),
132        )
133}
134
135pub struct FacetDatabases<'a> {
136    index: &'a Index,
137}
138
139impl<'a> FacetDatabases<'a> {
140    pub fn new(index: &'a Index) -> Self {
141        Self { index }
142    }
143
144    fn get_cbo_roaring_bytes_value<'t>(
145        &self,
146        rtxn: &'t RoTxn<'_>,
147        key: &[u8],
148    ) -> heed::Result<Option<&'t [u8]>> {
149        let (facet_kind, key) = FacetKind::extract_from_key(key);
150
151        let value =
152            super::channel::Database::from(facet_kind).database(self.index).get(rtxn, key)?;
153        match facet_kind {
154            // skip level group size
155            FacetKind::String | FacetKind::Number => Ok(value.map(|v| &v[1..])),
156            _ => Ok(value),
157        }
158    }
159}
160
161#[derive(Debug)]
162pub enum FacetFieldIdDelta {
163    Bulk,
164    Incremental(Vec<FacetFieldIdChange>),
165}
166
167impl FacetFieldIdDelta {
168    fn push(&mut self, facet_value: &[u8], max_count: usize) {
169        *self = match std::mem::replace(self, FacetFieldIdDelta::Bulk) {
170            FacetFieldIdDelta::Bulk => FacetFieldIdDelta::Bulk,
171            FacetFieldIdDelta::Incremental(mut v) => {
172                if v.len() >= max_count {
173                    FacetFieldIdDelta::Bulk
174                } else {
175                    v.push(FacetFieldIdChange { facet_value: facet_value.into() });
176                    FacetFieldIdDelta::Incremental(v)
177                }
178            }
179        }
180    }
181
182    fn merge(&mut self, rhs: Option<Self>, max_count: usize) {
183        let Some(rhs) = rhs else {
184            return;
185        };
186        *self = match (std::mem::replace(self, FacetFieldIdDelta::Bulk), rhs) {
187            (FacetFieldIdDelta::Bulk, _) | (_, FacetFieldIdDelta::Bulk) => FacetFieldIdDelta::Bulk,
188            (
189                FacetFieldIdDelta::Incremental(mut left),
190                FacetFieldIdDelta::Incremental(mut right),
191            ) => {
192                if left.len() + right.len() >= max_count {
193                    FacetFieldIdDelta::Bulk
194                } else {
195                    left.append(&mut right);
196                    FacetFieldIdDelta::Incremental(left)
197                }
198            }
199        };
200    }
201}
202
203#[derive(Debug)]
204pub struct FacetFieldIdsDelta {
205    /// The field ids that have been modified
206    modified_facet_string_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>,
207    modified_facet_number_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>,
208    max_string_count: usize,
209    max_number_count: usize,
210}
211
212impl FacetFieldIdsDelta {
213    pub fn new(max_string_count: usize, max_number_count: usize) -> Self {
214        Self {
215            max_string_count,
216            max_number_count,
217            modified_facet_string_ids: Default::default(),
218            modified_facet_number_ids: Default::default(),
219        }
220    }
221
222    fn register_facet_string_id(&mut self, field_id: FieldId, facet_value: &[u8]) {
223        self.modified_facet_string_ids
224            .entry(field_id)
225            .or_insert(FacetFieldIdDelta::Incremental(Default::default()))
226            .push(facet_value, self.max_string_count);
227    }
228
229    fn register_facet_number_id(&mut self, field_id: FieldId, facet_value: &[u8]) {
230        self.modified_facet_number_ids
231            .entry(field_id)
232            .or_insert(FacetFieldIdDelta::Incremental(Default::default()))
233            .push(facet_value, self.max_number_count);
234    }
235
236    fn register_from_key(&mut self, key: &[u8]) {
237        let (facet_kind, field_id, facet_value) = self.extract_key_data(key);
238        match (facet_kind, facet_value) {
239            (FacetKind::Number, Some(facet_value)) => {
240                self.register_facet_number_id(field_id, facet_value)
241            }
242            (FacetKind::String, Some(facet_value)) => {
243                self.register_facet_string_id(field_id, facet_value)
244            }
245            _ => (),
246        }
247    }
248
249    fn extract_key_data<'key>(&self, key: &'key [u8]) -> (FacetKind, FieldId, Option<&'key [u8]>) {
250        let facet_kind = FacetKind::from(key[0]);
251        let field_id = FieldId::from_be_bytes([key[1], key[2]]);
252        let facet_value = if key.len() >= 4 {
253            // level is also stored in the key at [3] (always 0)
254            Some(&key[4..])
255        } else {
256            None
257        };
258
259        (facet_kind, field_id, facet_value)
260    }
261
262    pub fn consume_facet_string_delta(
263        &mut self,
264    ) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
265        self.modified_facet_string_ids.drain()
266    }
267
268    pub fn consume_facet_number_delta(
269        &mut self,
270    ) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
271        self.modified_facet_number_ids.drain()
272    }
273
274    pub fn merge(mut self, rhs: Self) -> Self {
275        // rhs.max_xx_count is assumed to be equal to self.max_xx_count, and so gets unused
276        let Self { modified_facet_number_ids, modified_facet_string_ids, .. } = rhs;
277        modified_facet_number_ids.into_iter().for_each(|(fid, mut delta)| {
278            let old_delta = self.modified_facet_number_ids.remove(&fid);
279            delta.merge(old_delta, self.max_number_count);
280            self.modified_facet_number_ids.insert(fid, delta);
281        });
282        modified_facet_string_ids.into_iter().for_each(|(fid, mut delta)| {
283            let old_delta = self.modified_facet_string_ids.remove(&fid);
284            delta.merge(old_delta, self.max_string_count);
285            self.modified_facet_string_ids.insert(fid, delta);
286        });
287        self
288    }
289}
290
291enum Operation {
292    Write(RoaringBitmap),
293    Delete,
294    Ignore,
295}
296
297/// A function that merges the DelAdd CboRoaringBitmaps with the current bitmap.
298fn merge_cbo_bitmaps(
299    current: Option<&[u8]>,
300    del: Option<RoaringBitmap>,
301    add: Option<RoaringBitmap>,
302) -> Result<Operation> {
303    let current = current.map(CboRoaringBitmapCodec::deserialize_from).transpose()?;
304    match (current, del, add) {
305        (None, None, None) => Ok(Operation::Ignore), // but it's strange
306        (None, None, Some(add)) => Ok(Operation::Write(add)),
307        (None, Some(_del), None) => Ok(Operation::Ignore), // but it's strange
308        (None, Some(_del), Some(add)) => Ok(Operation::Write(add)),
309        (Some(_current), None, None) => Ok(Operation::Ignore), // but it's strange
310        (Some(current), None, Some(add)) => Ok(Operation::Write(current | add)),
311        (Some(current), Some(del), add) => {
312            debug_assert!(
313                del.is_subset(&current),
314                "del is not a subset of current, which must be impossible."
315            );
316            let output = match add {
317                Some(add) => (&current - (&del - &add)) | (add - del),
318                None => &current - del,
319            };
320            if output.is_empty() {
321                Ok(Operation::Delete)
322            } else if current == output {
323                Ok(Operation::Ignore)
324            } else {
325                Ok(Operation::Write(output))
326            }
327        }
328    }
329}