1use std::cell::RefCell;
2
3use hashbrown::HashMap;
4use heed::types::Bytes;
5use heed::{Database, RoTxn};
6use memmap2::Mmap;
7use rayon::iter::{IntoParallelIterator, ParallelIterator};
8use roaring::RoaringBitmap;
9
10use super::channel::*;
11use super::extract::{
12 merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
13 FacetKind, GeoExtractorData,
14};
15use crate::update::facet::new_incremental::FacetFieldIdChange;
16use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};
17
18#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
19pub fn merge_and_send_rtree<'extractor, MSP>(
20 datastore: impl IntoIterator<Item = RefCell<GeoExtractorData<'extractor>>>,
21 rtxn: &RoTxn,
22 index: &Index,
23 geo_sender: GeoSender<'_, '_>,
24 must_stop_processing: &MSP,
25) -> Result<()>
26where
27 MSP: Fn() -> bool + Sync,
28{
29 let mut rtree = index.geo_rtree(rtxn)?.unwrap_or_default();
30 let mut faceted = index.geo_faceted_documents_ids(rtxn)?;
31
32 for data in datastore {
33 if must_stop_processing() {
34 return Err(InternalError::AbortedIndexation.into());
35 }
36
37 let mut frozen = data.into_inner().freeze()?;
38 for result in frozen.iter_and_clear_removed()? {
39 let extracted_geo_point = result?;
40 let removed = rtree.remove(&GeoPoint::from(extracted_geo_point));
41 debug_assert!(removed.is_some());
42 let removed = faceted.remove(extracted_geo_point.docid);
43 debug_assert!(removed);
44 }
45
46 for result in frozen.iter_and_clear_inserted()? {
47 let extracted_geo_point = result?;
48 rtree.insert(GeoPoint::from(extracted_geo_point));
49 let inserted = faceted.insert(extracted_geo_point.docid);
50 debug_assert!(inserted);
51 }
52 }
53
54 let mut file = tempfile::tempfile()?;
55 bincode::serialize_into(&mut file, &rtree).map_err(InternalError::BincodeError)?;
56 file.sync_all()?;
57
58 let rtree_mmap = unsafe { Mmap::map(&file)? };
59 geo_sender.set_rtree(rtree_mmap).unwrap();
60 geo_sender.set_geo_faceted(&faceted)?;
61
62 Ok(())
63}
64
65#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
66pub fn merge_and_send_docids<'extractor, MSP, D>(
67 mut caches: Vec<BalancedCaches<'extractor>>,
68 database: Database<Bytes, Bytes>,
69 index: &Index,
70 docids_sender: WordDocidsSender<D>,
71 must_stop_processing: &MSP,
72) -> Result<()>
73where
74 MSP: Fn() -> bool + Sync,
75 D: DatabaseType + Sync,
76{
77 transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| {
78 let rtxn = index.read_txn()?;
79 if must_stop_processing() {
80 return Err(InternalError::AbortedIndexation.into());
81 }
82 merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
83 let current = database.get(&rtxn, key)?;
84 match merge_cbo_bitmaps(current, del, add)? {
85 Operation::Write(bitmap) => docids_sender.write(key, &bitmap),
86 Operation::Delete => docids_sender.delete(key),
87 Operation::Ignore => Ok(()),
88 }
89 })
90 })
91}
92
93#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
94pub fn merge_and_send_facet_docids<'extractor>(
95 mut caches: Vec<BalancedCaches<'extractor>>,
96 database: FacetDatabases,
97 index: &Index,
98 rtxn: &RoTxn,
99 docids_sender: FacetDocidsSender,
100) -> Result<FacetFieldIdsDelta> {
101 let max_string_count = (index.facet_id_string_docids.len(rtxn)? / 500) as usize;
102 let max_number_count = (index.facet_id_f64_docids.len(rtxn)? / 500) as usize;
103 let max_string_count = max_string_count.clamp(1000, 100_000);
104 let max_number_count = max_number_count.clamp(1000, 100_000);
105 transpose_and_freeze_caches(&mut caches)?
106 .into_par_iter()
107 .map(|frozen| {
108 let mut facet_field_ids_delta =
109 FacetFieldIdsDelta::new(max_string_count, max_number_count);
110 let rtxn = index.read_txn()?;
111 merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
112 let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
113 match merge_cbo_bitmaps(current, del, add)? {
114 Operation::Write(bitmap) => {
115 facet_field_ids_delta.register_from_key(key);
116 docids_sender.write(key, &bitmap)?;
117 Ok(())
118 }
119 Operation::Delete => {
120 facet_field_ids_delta.register_from_key(key);
121 docids_sender.delete(key)?;
122 Ok(())
123 }
124 Operation::Ignore => Ok(()),
125 }
126 })?;
127 Ok(facet_field_ids_delta)
128 })
129 .reduce(
130 || Ok(FacetFieldIdsDelta::new(max_string_count, max_number_count)),
131 |lhs, rhs| Ok(lhs?.merge(rhs?)),
132 )
133}
134
135pub struct FacetDatabases<'a> {
136 index: &'a Index,
137}
138
139impl<'a> FacetDatabases<'a> {
140 pub fn new(index: &'a Index) -> Self {
141 Self { index }
142 }
143
144 fn get_cbo_roaring_bytes_value<'t>(
145 &self,
146 rtxn: &'t RoTxn<'_>,
147 key: &[u8],
148 ) -> heed::Result<Option<&'t [u8]>> {
149 let (facet_kind, key) = FacetKind::extract_from_key(key);
150
151 let value =
152 super::channel::Database::from(facet_kind).database(self.index).get(rtxn, key)?;
153 match facet_kind {
154 FacetKind::String | FacetKind::Number => Ok(value.map(|v| &v[1..])),
156 _ => Ok(value),
157 }
158 }
159}
160
161#[derive(Debug)]
162pub enum FacetFieldIdDelta {
163 Bulk,
164 Incremental(Vec<FacetFieldIdChange>),
165}
166
167impl FacetFieldIdDelta {
168 fn push(&mut self, facet_value: &[u8], max_count: usize) {
169 *self = match std::mem::replace(self, FacetFieldIdDelta::Bulk) {
170 FacetFieldIdDelta::Bulk => FacetFieldIdDelta::Bulk,
171 FacetFieldIdDelta::Incremental(mut v) => {
172 if v.len() >= max_count {
173 FacetFieldIdDelta::Bulk
174 } else {
175 v.push(FacetFieldIdChange { facet_value: facet_value.into() });
176 FacetFieldIdDelta::Incremental(v)
177 }
178 }
179 }
180 }
181
182 fn merge(&mut self, rhs: Option<Self>, max_count: usize) {
183 let Some(rhs) = rhs else {
184 return;
185 };
186 *self = match (std::mem::replace(self, FacetFieldIdDelta::Bulk), rhs) {
187 (FacetFieldIdDelta::Bulk, _) | (_, FacetFieldIdDelta::Bulk) => FacetFieldIdDelta::Bulk,
188 (
189 FacetFieldIdDelta::Incremental(mut left),
190 FacetFieldIdDelta::Incremental(mut right),
191 ) => {
192 if left.len() + right.len() >= max_count {
193 FacetFieldIdDelta::Bulk
194 } else {
195 left.append(&mut right);
196 FacetFieldIdDelta::Incremental(left)
197 }
198 }
199 };
200 }
201}
202
203#[derive(Debug)]
204pub struct FacetFieldIdsDelta {
205 modified_facet_string_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>,
207 modified_facet_number_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>,
208 max_string_count: usize,
209 max_number_count: usize,
210}
211
212impl FacetFieldIdsDelta {
213 pub fn new(max_string_count: usize, max_number_count: usize) -> Self {
214 Self {
215 max_string_count,
216 max_number_count,
217 modified_facet_string_ids: Default::default(),
218 modified_facet_number_ids: Default::default(),
219 }
220 }
221
222 fn register_facet_string_id(&mut self, field_id: FieldId, facet_value: &[u8]) {
223 self.modified_facet_string_ids
224 .entry(field_id)
225 .or_insert(FacetFieldIdDelta::Incremental(Default::default()))
226 .push(facet_value, self.max_string_count);
227 }
228
229 fn register_facet_number_id(&mut self, field_id: FieldId, facet_value: &[u8]) {
230 self.modified_facet_number_ids
231 .entry(field_id)
232 .or_insert(FacetFieldIdDelta::Incremental(Default::default()))
233 .push(facet_value, self.max_number_count);
234 }
235
236 fn register_from_key(&mut self, key: &[u8]) {
237 let (facet_kind, field_id, facet_value) = self.extract_key_data(key);
238 match (facet_kind, facet_value) {
239 (FacetKind::Number, Some(facet_value)) => {
240 self.register_facet_number_id(field_id, facet_value)
241 }
242 (FacetKind::String, Some(facet_value)) => {
243 self.register_facet_string_id(field_id, facet_value)
244 }
245 _ => (),
246 }
247 }
248
249 fn extract_key_data<'key>(&self, key: &'key [u8]) -> (FacetKind, FieldId, Option<&'key [u8]>) {
250 let facet_kind = FacetKind::from(key[0]);
251 let field_id = FieldId::from_be_bytes([key[1], key[2]]);
252 let facet_value = if key.len() >= 4 {
253 Some(&key[4..])
255 } else {
256 None
257 };
258
259 (facet_kind, field_id, facet_value)
260 }
261
262 pub fn consume_facet_string_delta(
263 &mut self,
264 ) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
265 self.modified_facet_string_ids.drain()
266 }
267
268 pub fn consume_facet_number_delta(
269 &mut self,
270 ) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
271 self.modified_facet_number_ids.drain()
272 }
273
274 pub fn merge(mut self, rhs: Self) -> Self {
275 let Self { modified_facet_number_ids, modified_facet_string_ids, .. } = rhs;
277 modified_facet_number_ids.into_iter().for_each(|(fid, mut delta)| {
278 let old_delta = self.modified_facet_number_ids.remove(&fid);
279 delta.merge(old_delta, self.max_number_count);
280 self.modified_facet_number_ids.insert(fid, delta);
281 });
282 modified_facet_string_ids.into_iter().for_each(|(fid, mut delta)| {
283 let old_delta = self.modified_facet_string_ids.remove(&fid);
284 delta.merge(old_delta, self.max_string_count);
285 self.modified_facet_string_ids.insert(fid, delta);
286 });
287 self
288 }
289}
290
291enum Operation {
292 Write(RoaringBitmap),
293 Delete,
294 Ignore,
295}
296
297fn merge_cbo_bitmaps(
299 current: Option<&[u8]>,
300 del: Option<RoaringBitmap>,
301 add: Option<RoaringBitmap>,
302) -> Result<Operation> {
303 let current = current.map(CboRoaringBitmapCodec::deserialize_from).transpose()?;
304 match (current, del, add) {
305 (None, None, None) => Ok(Operation::Ignore), (None, None, Some(add)) => Ok(Operation::Write(add)),
307 (None, Some(_del), None) => Ok(Operation::Ignore), (None, Some(_del), Some(add)) => Ok(Operation::Write(add)),
309 (Some(_current), None, None) => Ok(Operation::Ignore), (Some(current), None, Some(add)) => Ok(Operation::Write(current | add)),
311 (Some(current), Some(del), add) => {
312 debug_assert!(
313 del.is_subset(¤t),
314 "del is not a subset of current, which must be impossible."
315 );
316 let output = match add {
317 Some(add) => (¤t - (&del - &add)) | (add - del),
318 None => ¤t - del,
319 };
320 if output.is_empty() {
321 Ok(Operation::Delete)
322 } else if current == output {
323 Ok(Operation::Ignore)
324 } else {
325 Ok(Operation::Write(output))
326 }
327 }
328 }
329}