milli_core/update/index_documents/
transform.rs

1use std::borrow::Cow;
2use std::collections::btree_map::Entry as BEntry;
3use std::collections::hash_map::Entry as HEntry;
4use std::collections::{BTreeMap, HashMap};
5use std::fs::File;
6use std::io::{Read, Seek};
7
8use either::Either;
9use fxhash::FxHashMap;
10use itertools::Itertools;
11use obkv::{KvReader, KvReaderU16, KvWriter};
12use roaring::RoaringBitmap;
13use serde_json::Value;
14use smartstring::SmartString;
15
16use super::helpers::{
17    create_sorter, sorter_into_reader, EitherObkvMerge, ObkvsKeepLastAdditionMergeDeletions,
18    ObkvsMergeAdditionsAndDeletions,
19};
20use super::{create_writer, IndexDocumentsMethod, IndexerConfig, KeepFirst};
21use crate::attribute_patterns::PatternMatch;
22use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
23use crate::error::{Error, InternalError, UserError};
24use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
25use crate::flatten_serde_json;
26use crate::index::{db_name, main_key};
27use crate::json_depth_checker;
28use crate::update::del_add::{
29    into_del_add_obkv, into_del_add_obkv_conditional_operation, DelAdd, DelAddOperation,
30    KvReaderDelAdd,
31};
32use crate::update::index_documents::GrenadParameters;
33use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
34use crate::update::{AvailableIds, UpdateIndexingStep};
35use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
36use crate::vector::settings::WriteBackToDocuments;
37use crate::vector::ArroyWrapper;
38use crate::{FieldDistribution, FieldId, FieldIdMapMissingEntry, Index, Result};
39
40pub struct TransformOutput {
41    pub primary_key: String,
42    pub settings_diff: InnerIndexSettingsDiff,
43    pub field_distribution: FieldDistribution,
44    pub documents_count: usize,
45    pub original_documents: Option<File>,
46    pub flattened_documents: Option<File>,
47}
48
49/// Extract the external ids, deduplicate and compute the new internal documents ids
50/// and fields ids, writing all the documents under their internal ids into a final file.
51///
52/// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids,
53/// the replaced documents ids, the number of documents in this update and the file
54/// containing all those documents.
55pub struct Transform<'a, 'i> {
56    pub index: &'i Index,
57    fields_ids_map: FieldIdMapWithMetadata,
58
59    indexer_settings: &'a IndexerConfig,
60    pub index_documents_method: IndexDocumentsMethod,
61    available_documents_ids: AvailableIds,
62
63    // Both grenad follows the same format:
64    // key | value
65    // u32 | 1 byte for the Operation byte, the rest is the obkv of the document stored
66    original_sorter: grenad::Sorter<EitherObkvMerge>,
67    flattened_sorter: grenad::Sorter<EitherObkvMerge>,
68
69    replaced_documents_ids: RoaringBitmap,
70    new_documents_ids: RoaringBitmap,
71    // To increase the cache locality and decrease the heap usage we use compact smartstring.
72    new_external_documents_ids_builder: FxHashMap<SmartString<smartstring::Compact>, u64>,
73    documents_count: usize,
74}
75
76/// This enum is specific to the grenad sorter stored in the transform.
77/// It's used as the first byte of the grenads and tells you if the document id was an addition or a deletion.
78#[repr(u8)]
79pub enum Operation {
80    Addition,
81    Deletion,
82}
83
84/// Create a mapping between the field ids found in the document batch and the one that were
85/// already present in the index.
86///
87/// If new fields are present in the addition, they are added to the index field ids map.
88fn create_fields_mapping(
89    index_field_map: &mut FieldIdMapWithMetadata,
90    batch_field_map: &DocumentsBatchIndex,
91) -> Result<HashMap<FieldId, FieldId>> {
92    batch_field_map
93        .iter()
94        // we sort by id here to ensure a deterministic mapping of the fields, that preserves
95        // the original ordering.
96        .sorted_by_key(|(&id, _)| id)
97        .map(|(field, name)| match index_field_map.id(name) {
98            Some(id) => Ok((*field, id)),
99            None => index_field_map
100                .insert(name)
101                .ok_or(Error::UserError(UserError::AttributeLimitReached))
102                .map(|id| (*field, id)),
103        })
104        .collect()
105}
106
107impl<'a, 'i> Transform<'a, 'i> {
108    pub fn new(
109        wtxn: &mut heed::RwTxn<'_>,
110        index: &'i Index,
111        indexer_settings: &'a IndexerConfig,
112        index_documents_method: IndexDocumentsMethod,
113        _autogenerate_docids: bool,
114    ) -> Result<Self> {
115        use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments};
116
117        // We must choose the appropriate merge function for when two or more documents
118        // with the same user id must be merged or fully replaced in the same batch.
119        let merge_function = match index_documents_method {
120            ReplaceDocuments => Either::Left(ObkvsKeepLastAdditionMergeDeletions),
121            UpdateDocuments => Either::Right(ObkvsMergeAdditionsAndDeletions),
122        };
123
124        // We initialize the sorter with the user indexing settings.
125        let original_sorter = create_sorter(
126            grenad::SortAlgorithm::Stable,
127            merge_function,
128            indexer_settings.chunk_compression_type,
129            indexer_settings.chunk_compression_level,
130            indexer_settings.max_nb_chunks,
131            indexer_settings.max_memory.map(|mem| mem / 2),
132            true,
133        );
134
135        // We initialize the sorter with the user indexing settings.
136        let flattened_sorter = create_sorter(
137            grenad::SortAlgorithm::Stable,
138            merge_function,
139            indexer_settings.chunk_compression_type,
140            indexer_settings.chunk_compression_level,
141            indexer_settings.max_nb_chunks,
142            indexer_settings.max_memory.map(|mem| mem / 2),
143            true,
144        );
145        let documents_ids = index.documents_ids(wtxn)?;
146        let fields_ids_map = index.fields_ids_map(wtxn)?;
147        let builder = MetadataBuilder::from_index(index, wtxn)?;
148        let fields_ids_map = FieldIdMapWithMetadata::new(fields_ids_map, builder);
149
150        Ok(Transform {
151            index,
152            fields_ids_map,
153            indexer_settings,
154            available_documents_ids: AvailableIds::new(&documents_ids),
155            original_sorter,
156            flattened_sorter,
157            index_documents_method,
158            replaced_documents_ids: RoaringBitmap::new(),
159            new_documents_ids: RoaringBitmap::new(),
160            new_external_documents_ids_builder: FxHashMap::default(),
161            documents_count: 0,
162        })
163    }
164
165    #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
166    pub fn read_documents<R, FP, FA>(
167        &mut self,
168        reader: EnrichedDocumentsBatchReader<R>,
169        wtxn: &mut heed::RwTxn<'_>,
170        progress_callback: FP,
171        should_abort: FA,
172    ) -> Result<usize>
173    where
174        R: Read + Seek,
175        FP: Fn(UpdateIndexingStep) + Sync,
176        FA: Fn() -> bool + Sync,
177    {
178        let (mut cursor, fields_index) = reader.into_cursor_and_fields_index();
179        let external_documents_ids = self.index.external_documents_ids();
180        let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
181
182        let primary_key = cursor.primary_key().to_string();
183        let primary_key_id =
184            self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
185
186        let mut obkv_buffer = Vec::new();
187        let mut document_sorter_value_buffer = Vec::new();
188        let mut document_sorter_key_buffer = Vec::new();
189        let mut documents_count = 0;
190        let mut docid_buffer: Vec<u8> = Vec::new();
191        let mut field_buffer: Vec<(u16, Cow<'_, [u8]>)> = Vec::new();
192        while let Some(enriched_document) = cursor.next_enriched_document()? {
193            let EnrichedDocument { document, document_id } = enriched_document;
194
195            if should_abort() {
196                return Err(Error::InternalError(InternalError::AbortedIndexation));
197            }
198
199            // drop_and_reuse is called instead of .clear() to communicate to the compiler that field_buffer
200            // does not keep references from the cursor between loop iterations
201            let mut field_buffer_cache = drop_and_reuse(field_buffer);
202            if self.indexer_settings.log_every_n.is_some_and(|len| documents_count % len == 0) {
203                progress_callback(UpdateIndexingStep::RemapDocumentAddition {
204                    documents_seen: documents_count,
205                });
206            }
207
208            // When the document id has been auto-generated by the `enrich_documents_batch`
209            // we must insert this document id into the remaped document.
210            let external_id = document_id.value();
211            if document_id.is_generated() {
212                serde_json::to_writer(&mut docid_buffer, external_id)
213                    .map_err(InternalError::SerdeJson)?;
214                field_buffer_cache.push((primary_key_id, Cow::from(&docid_buffer)));
215            }
216
217            for (k, v) in document.iter() {
218                let mapped_id =
219                    *mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?;
220                field_buffer_cache.push((mapped_id, Cow::from(v)));
221            }
222
223            // Insertion in a obkv need to be done with keys ordered. For now they are ordered
224            // according to the document addition key order, so we sort it according to the
225            // fieldids map keys order.
226            field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2));
227
228            // Build the new obkv document.
229            let mut writer = KvWriter::new(&mut obkv_buffer);
230            for (k, v) in field_buffer_cache.iter() {
231                writer.insert(*k, v)?;
232            }
233
234            let mut original_docid = None;
235            let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) {
236                HEntry::Occupied(entry) => *entry.get() as u32,
237                HEntry::Vacant(entry) => {
238                    let docid = match external_documents_ids.get(wtxn, entry.key())? {
239                        Some(docid) => {
240                            // If it was already in the list of replaced documents it means it was deleted
241                            // by the remove_document method. We should starts as if it never existed.
242                            if self.replaced_documents_ids.insert(docid) {
243                                original_docid = Some(docid);
244                            }
245
246                            docid
247                        }
248                        None => self
249                            .available_documents_ids
250                            .next()
251                            .ok_or(UserError::DocumentLimitReached)?,
252                    };
253                    entry.insert(docid as u64);
254                    docid
255                }
256            };
257
258            let mut skip_insertion = false;
259            if let Some(original_docid) = original_docid {
260                let original_key = original_docid;
261                let base_obkv = self
262                    .index
263                    .documents
264                    .remap_data_type::<heed::types::Bytes>()
265                    .get(wtxn, &original_key)?
266                    .ok_or(InternalError::DatabaseMissingEntry {
267                        db_name: db_name::DOCUMENTS,
268                        key: None,
269                    })?;
270
271                // we check if the two documents are exactly equal. If it's the case we can skip this document entirely
272                if base_obkv == obkv_buffer {
273                    // we're not replacing anything
274                    self.replaced_documents_ids.remove(original_docid);
275                    // and we need to put back the original id as it was before
276                    self.new_external_documents_ids_builder.remove(external_id);
277                    skip_insertion = true;
278                } else {
279                    // we associate the base document with the new key, everything will get merged later.
280                    let deladd_operation = match self.index_documents_method {
281                        IndexDocumentsMethod::UpdateDocuments => {
282                            DelAddOperation::DeletionAndAddition
283                        }
284                        IndexDocumentsMethod::ReplaceDocuments => DelAddOperation::Deletion,
285                    };
286                    document_sorter_key_buffer.clear();
287                    document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
288                    document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
289                    document_sorter_value_buffer.clear();
290                    document_sorter_value_buffer.push(Operation::Addition as u8);
291                    into_del_add_obkv(
292                        KvReaderU16::from_slice(base_obkv),
293                        deladd_operation,
294                        &mut document_sorter_value_buffer,
295                    )?;
296                    self.original_sorter
297                        .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
298                    let base_obkv = KvReader::from_slice(base_obkv);
299                    if let Some(flattened_obkv) =
300                        Self::flatten_from_fields_ids_map(base_obkv, &mut self.fields_ids_map)?
301                    {
302                        // we recreate our buffer with the flattened documents
303                        document_sorter_value_buffer.clear();
304                        document_sorter_value_buffer.push(Operation::Addition as u8);
305                        into_del_add_obkv(
306                            KvReaderU16::from_slice(&flattened_obkv),
307                            deladd_operation,
308                            &mut document_sorter_value_buffer,
309                        )?;
310                    }
311                    self.flattened_sorter
312                        .insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
313                }
314            }
315
316            if !skip_insertion {
317                self.new_documents_ids.insert(docid);
318
319                document_sorter_key_buffer.clear();
320                document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
321                document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
322                document_sorter_value_buffer.clear();
323                document_sorter_value_buffer.push(Operation::Addition as u8);
324                into_del_add_obkv(
325                    KvReaderU16::from_slice(&obkv_buffer),
326                    DelAddOperation::Addition,
327                    &mut document_sorter_value_buffer,
328                )?;
329                // We use the extracted/generated user id as the key for this document.
330                self.original_sorter
331                    .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
332
333                let flattened_obkv = KvReader::from_slice(&obkv_buffer);
334                if let Some(obkv) =
335                    Self::flatten_from_fields_ids_map(flattened_obkv, &mut self.fields_ids_map)?
336                {
337                    document_sorter_value_buffer.clear();
338                    document_sorter_value_buffer.push(Operation::Addition as u8);
339                    into_del_add_obkv(
340                        KvReaderU16::from_slice(&obkv),
341                        DelAddOperation::Addition,
342                        &mut document_sorter_value_buffer,
343                    )?
344                }
345                self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
346            }
347            documents_count += 1;
348
349            progress_callback(UpdateIndexingStep::RemapDocumentAddition {
350                documents_seen: documents_count,
351            });
352
353            field_buffer = drop_and_reuse(field_buffer_cache);
354            docid_buffer.clear();
355            obkv_buffer.clear();
356        }
357
358        progress_callback(UpdateIndexingStep::RemapDocumentAddition {
359            documents_seen: documents_count,
360        });
361
362        self.index.put_fields_ids_map(wtxn, self.fields_ids_map.as_fields_ids_map())?;
363        self.index.put_primary_key(wtxn, &primary_key)?;
364        self.documents_count += documents_count;
365        // Now that we have a valid sorter that contains the user id and the obkv we
366        // give it to the last transforming function which returns the TransformOutput.
367        Ok(documents_count)
368    }
369
370    // Flatten a document from the fields ids map contained in self and insert the new
371    // created fields. Returns `None` if the document doesn't need to be flattened.
372    #[tracing::instrument(
373        level = "trace",
374        skip(obkv, fields_ids_map),
375        target = "indexing::transform"
376    )]
377    fn flatten_from_fields_ids_map(
378        obkv: &KvReader<FieldId>,
379        fields_ids_map: &mut FieldIdMapWithMetadata,
380    ) -> Result<Option<Vec<u8>>> {
381        if obkv
382            .iter()
383            .all(|(_, value)| !json_depth_checker::should_flatten_from_unchecked_slice(value))
384        {
385            return Ok(None);
386        }
387
388        // store the keys and values the original obkv + the flattened json
389        // We first extract all the key+value out of the obkv. If a value is not nested
390        // we keep a reference on its value. If the value is nested we'll get its value
391        // as an owned `Vec<u8>` after flattening it.
392        let mut key_value: Vec<(FieldId, Cow<'_, [u8]>)> = Vec::new();
393
394        // the object we're going to use to store the fields that need to be flattened.
395        let mut doc = serde_json::Map::new();
396
397        // we recreate a json containing only the fields that needs to be flattened.
398        // all the raw values get inserted directly in the `key_value` vec.
399        for (key, value) in obkv.iter() {
400            if json_depth_checker::should_flatten_from_unchecked_slice(value) {
401                let key = fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId {
402                    field_id: key,
403                    process: "Flatten from fields ids map.",
404                })?;
405
406                let value = serde_json::from_slice::<Value>(value)
407                    .map_err(crate::error::InternalError::SerdeJson)?;
408                doc.insert(key.to_string(), value);
409            } else {
410                key_value.push((key, value.into()));
411            }
412        }
413
414        let flattened = flatten_serde_json::flatten(&doc);
415
416        // Once we have the flattened version we insert all the new generated fields_ids
417        // (if any) in the fields ids map and serialize the value.
418        for (key, value) in flattened.into_iter() {
419            let fid = fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?;
420            let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
421            key_value.push((fid, value.into()));
422        }
423
424        // we sort the key. If there was a conflict between the obkv and the new generated value the
425        // keys will be consecutive.
426        key_value.sort_unstable_by_key(|(key, _)| *key);
427
428        let mut buffer = Vec::new();
429        Self::create_obkv_from_key_value(&mut key_value, &mut buffer)?;
430        Ok(Some(buffer))
431    }
432
433    /// Generate an obkv from a slice of key / value sorted by key.
434    fn create_obkv_from_key_value(
435        key_value: &mut [(FieldId, Cow<'_, [u8]>)],
436        output_buffer: &mut Vec<u8>,
437    ) -> Result<()> {
438        debug_assert!(
439            key_value.windows(2).all(|vec| vec[0].0 <= vec[1].0),
440            "The slice of key / value pair must be sorted."
441        );
442
443        output_buffer.clear();
444        let mut writer = KvWriter::new(output_buffer);
445
446        let mut skip_next_value = false;
447        for things in key_value.windows(2) {
448            if skip_next_value {
449                skip_next_value = false;
450                continue;
451            }
452            let (key1, value1) = &things[0];
453            let (key2, value2) = &things[1];
454
455            // now we're going to look for conflicts between the keys. For example the following documents would cause a conflict:
456            // { "doggo.name": "jean", "doggo": { "name": "paul" } }
457            // we should find a first "doggo.name" from the obkv and a second one from the flattening.
458            // but we must generate the following document:
459            // { "doggo.name": ["jean", "paul"] }
460            // thus we're going to merge the value from the obkv and the flattened document in a single array and skip the next
461            // iteration.
462            if key1 == key2 {
463                skip_next_value = true;
464
465                let value1 = serde_json::from_slice(value1)
466                    .map_err(crate::error::InternalError::SerdeJson)?;
467                let value2 = serde_json::from_slice(value2)
468                    .map_err(crate::error::InternalError::SerdeJson)?;
469                let value = match (value1, value2) {
470                    (Value::Array(mut left), Value::Array(mut right)) => {
471                        left.append(&mut right);
472                        Value::Array(left)
473                    }
474                    (Value::Array(mut array), value) | (value, Value::Array(mut array)) => {
475                        array.push(value);
476                        Value::Array(array)
477                    }
478                    (left, right) => Value::Array(vec![left, right]),
479                };
480
481                let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
482                writer.insert(*key1, value)?;
483            } else {
484                writer.insert(*key1, value1)?;
485            }
486        }
487
488        if !skip_next_value {
489            // the unwrap is safe here, we know there was at least one value in the document
490            let (key, value) = key_value.last().unwrap();
491            writer.insert(*key, value)?;
492        }
493
494        Ok(())
495    }
496
497    /// Generate the `TransformOutput` based on the given sorter that can be generated from any
498    /// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document
499    /// id for the user side and the value must be an obkv where keys are valid fields ids.
500    #[tracing::instrument(level = "trace", skip_all, target = "indexing::transform")]
501    pub(crate) fn output_from_sorter<F>(
502        self,
503        wtxn: &mut heed::RwTxn<'_>,
504        progress_callback: F,
505    ) -> Result<TransformOutput>
506    where
507        F: Fn(UpdateIndexingStep) + Sync,
508    {
509        let primary_key = self
510            .index
511            .primary_key(wtxn)?
512            .ok_or(Error::InternalError(InternalError::DatabaseMissingEntry {
513                db_name: db_name::MAIN,
514                key: Some(main_key::PRIMARY_KEY_KEY),
515            }))?
516            .to_string();
517
518        // We create a final writer to write the new documents in order from the sorter.
519        let mut writer = create_writer(
520            self.indexer_settings.chunk_compression_type,
521            self.indexer_settings.chunk_compression_level,
522            tempfile::tempfile()?,
523        );
524
525        // To compute the field distribution we need to;
526        // 1. Remove all the deleted documents from the field distribution
527        // 2. Add all the new documents to the field distribution
528        let mut field_distribution = self.index.field_distribution(wtxn)?;
529
530        // Here we are going to do the document count + field distribution + `write_into_stream_writer`
531        let mut iter = self.original_sorter.into_stream_merger_iter()?;
532        // used only for the callback
533        let mut documents_count = 0;
534
535        while let Some((key, val)) = iter.next()? {
536            // skip first byte corresponding to the operation type (Deletion or Addition).
537            let val = &val[1..];
538
539            // send a callback to show at which step we are
540            documents_count += 1;
541            progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {
542                documents_seen: documents_count,
543                total_documents: self.documents_count,
544            });
545
546            for (key, value) in KvReader::from_slice(val) {
547                let reader = KvReaderDelAdd::from_slice(value);
548                match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) {
549                    (None, None) => (),
550                    (None, Some(_)) => {
551                        // New field
552                        let name = self.fields_ids_map.name(key).ok_or(
553                            FieldIdMapMissingEntry::FieldId {
554                                field_id: key,
555                                process: "Computing field distribution in transform.",
556                            },
557                        )?;
558                        *field_distribution.entry(name.to_string()).or_insert(0) += 1;
559                    }
560                    (Some(_), None) => {
561                        // Field removed
562                        let name = self.fields_ids_map.name(key).ok_or(
563                            FieldIdMapMissingEntry::FieldId {
564                                field_id: key,
565                                process: "Computing field distribution in transform.",
566                            },
567                        )?;
568                        match field_distribution.entry(name.to_string()) {
569                            BEntry::Vacant(_) => { /* Bug? trying to remove a non-existing field */
570                            }
571                            BEntry::Occupied(mut entry) => {
572                                // attempt to remove one
573                                match entry.get_mut().checked_sub(1) {
574                                    Some(0) => {
575                                        entry.remove();
576                                    }
577                                    Some(new_val) => {
578                                        *entry.get_mut() = new_val;
579                                    }
580                                    None => {
581                                        unreachable!("Attempting to remove a field that wasn't in the field distribution")
582                                    }
583                                }
584                            }
585                        }
586                    }
587                    (Some(_), Some(_)) => {
588                        // Value change, no field distribution change
589                    }
590                }
591            }
592            writer.insert(key, val)?;
593        }
594
595        let mut original_documents = writer.into_inner()?;
596        // We then extract the file and reset the seek to be able to read it again.
597        original_documents.rewind()?;
598
599        // We create a final writer to write the new documents in order from the sorter.
600        let mut writer = create_writer(
601            self.indexer_settings.chunk_compression_type,
602            self.indexer_settings.chunk_compression_level,
603            tempfile::tempfile()?,
604        );
605
606        // Once we have written all the documents into the final sorter, we write the nested documents
607        // into this writer.
608        // We get rids of the `Operation` byte and skip the deleted documents as well.
609        let mut iter = self.flattened_sorter.into_stream_merger_iter()?;
610        while let Some((key, val)) = iter.next()? {
611            // skip first byte corresponding to the operation type (Deletion or Addition).
612            let val = &val[1..];
613            writer.insert(key, val)?;
614        }
615        let mut flattened_documents = writer.into_inner()?;
616        flattened_documents.rewind()?;
617
618        let mut new_external_documents_ids_builder: Vec<_> =
619            self.new_external_documents_ids_builder.into_iter().collect();
620
621        new_external_documents_ids_builder
622            .sort_unstable_by(|(left, _), (right, _)| left.cmp(right));
623        let mut fst_new_external_documents_ids_builder = fst::MapBuilder::memory();
624        new_external_documents_ids_builder.into_iter().try_for_each(|(key, value)| {
625            fst_new_external_documents_ids_builder.insert(key, value)
626        })?;
627
628        let old_inner_settings = InnerIndexSettings::from_index(self.index, wtxn, None)?;
629        let fields_ids_map = self.fields_ids_map;
630        let primary_key_id = self.index.primary_key(wtxn)?.and_then(|name| fields_ids_map.id(name));
631        let mut new_inner_settings = old_inner_settings.clone();
632        new_inner_settings.fields_ids_map = fields_ids_map;
633
634        let embedding_config_updates = Default::default();
635        let settings_update_only = false;
636        let settings_diff = InnerIndexSettingsDiff::new(
637            old_inner_settings,
638            new_inner_settings,
639            primary_key_id,
640            embedding_config_updates,
641            settings_update_only,
642        );
643
644        Ok(TransformOutput {
645            primary_key,
646            settings_diff,
647            field_distribution,
648            documents_count: self.documents_count,
649            original_documents: Some(
650                original_documents.into_inner().map_err(|err| err.into_error())?,
651            ),
652            flattened_documents: Some(
653                flattened_documents.into_inner().map_err(|err| err.into_error())?,
654            ),
655        })
656    }
657
658    /// Rebind the field_ids of the provided document to their values
659    /// based on the field_ids_maps difference between the old and the new settings,
660    /// then fill the provided buffers with delta documents using KvWritterDelAdd.
661    #[allow(clippy::too_many_arguments)] // need the vectors + fid, feel free to create a struct xo xo
662    fn rebind_existing_document(
663        old_obkv: &KvReader<FieldId>,
664        settings_diff: &InnerIndexSettingsDiff,
665        mut injected_vectors: serde_json::Map<String, serde_json::Value>,
666        old_vectors_fid: Option<FieldId>,
667        original_obkv_buffer: Option<&mut Vec<u8>>,
668        flattened_obkv_buffer: Option<&mut Vec<u8>>,
669    ) -> Result<()> {
670        // Always keep the primary key.
671        let is_primary_key = |id: FieldId| -> bool { settings_diff.primary_key_id == Some(id) };
672
673        // If only a faceted field has been added, keep only this field.
674        let facet_fids_changed = settings_diff.facet_fids_changed();
675
676        let necessary_faceted_field = |id: FieldId| -> Option<DelAddOperation> {
677            if facet_fids_changed {
678                let field_name = settings_diff.new.fields_ids_map.name(id).unwrap();
679                // if the faceted fields changed, we need to keep all the field that are
680                // faceted in the old or new settings.
681                match (
682                    settings_diff.old.match_faceted_field(field_name),
683                    settings_diff.new.match_faceted_field(field_name),
684                ) {
685                    (PatternMatch::NoMatch, PatternMatch::NoMatch) => None,
686                    (PatternMatch::NoMatch, _) => Some(DelAddOperation::Addition),
687                    (_, PatternMatch::NoMatch) => Some(DelAddOperation::Deletion),
688                    (_, _) => Some(DelAddOperation::DeletionAndAddition),
689                }
690            } else {
691                None
692            }
693        };
694
695        // Alway provide all fields when vectors are involved because
696        // we need the fields for the prompt/templating.
697        let reindex_vectors = settings_diff.reindex_vectors();
698
699        // The operations that we must perform on the different fields.
700        let mut operations = HashMap::new();
701        let mut error_seen = false;
702
703        let mut obkv_writer = KvWriter::<_, FieldId>::memory();
704        'write_fid: for (id, val) in old_obkv.iter() {
705            if !injected_vectors.is_empty() {
706                'inject_vectors: {
707                    let Some(vectors_fid) = old_vectors_fid else { break 'inject_vectors };
708
709                    if id < vectors_fid {
710                        break 'inject_vectors;
711                    }
712
713                    let mut existing_vectors = if id == vectors_fid {
714                        let existing_vectors: std::result::Result<
715                            serde_json::Map<String, serde_json::Value>,
716                            serde_json::Error,
717                        > = serde_json::from_slice(val);
718
719                        match existing_vectors {
720                            Ok(existing_vectors) => existing_vectors,
721                            Err(error) => {
722                                if !error_seen {
723                                    tracing::error!(%error, "Unexpected `_vectors` field that is not a map. Treating as an empty map");
724                                    error_seen = true;
725                                }
726                                Default::default()
727                            }
728                        }
729                    } else {
730                        Default::default()
731                    };
732
733                    existing_vectors.append(&mut injected_vectors);
734
735                    operations.insert(vectors_fid, DelAddOperation::DeletionAndAddition);
736                    obkv_writer
737                        .insert(vectors_fid, serde_json::to_vec(&existing_vectors).unwrap())?;
738                    if id == vectors_fid {
739                        continue 'write_fid;
740                    }
741                }
742            }
743
744            if is_primary_key(id) || reindex_vectors {
745                operations.insert(id, DelAddOperation::DeletionAndAddition);
746                obkv_writer.insert(id, val)?;
747            } else {
748                let facet_operation = necessary_faceted_field(id);
749                let searchable_operation = settings_diff.reindex_searchable_id(id);
750                let operation = match (facet_operation, searchable_operation) {
751                    (Some(facet_operation), Some(searchable_operation)) => {
752                        Some(facet_operation.merge(searchable_operation))
753                    }
754                    (Some(operation), None) | (None, Some(operation)) => Some(operation),
755                    (None, None) => None,
756                };
757
758                if let Some(operation) = operation {
759                    operations.insert(id, operation);
760                    obkv_writer.insert(id, val)?;
761                }
762            }
763        }
764        if !injected_vectors.is_empty() {
765            'inject_vectors: {
766                let Some(vectors_fid) = old_vectors_fid else { break 'inject_vectors };
767
768                operations.insert(vectors_fid, DelAddOperation::DeletionAndAddition);
769                obkv_writer.insert(vectors_fid, serde_json::to_vec(&injected_vectors).unwrap())?;
770            }
771        }
772
773        let data = obkv_writer.into_inner()?;
774        let obkv = KvReader::<FieldId>::from_slice(&data);
775
776        if let Some(original_obkv_buffer) = original_obkv_buffer {
777            original_obkv_buffer.clear();
778            into_del_add_obkv(obkv, DelAddOperation::DeletionAndAddition, original_obkv_buffer)?;
779        }
780
781        if let Some(flattened_obkv_buffer) = flattened_obkv_buffer {
782            // take the non-flattened version if flatten_from_fields_ids_map returns None.
783            let mut fields_ids_map = settings_diff.new.fields_ids_map.clone();
784            let flattened = Self::flatten_from_fields_ids_map(obkv, &mut fields_ids_map)?;
785            let flattened = flattened.as_deref().map_or(obkv, KvReader::from_slice);
786
787            flattened_obkv_buffer.clear();
788            into_del_add_obkv_conditional_operation(flattened, flattened_obkv_buffer, |id| {
789                operations.get(&id).copied().unwrap_or(DelAddOperation::DeletionAndAddition)
790            })?;
791        }
792
793        Ok(())
794    }
795
796    /// Clear all databases. Returns a `TransformOutput` with a file that contains the documents
797    /// of the index with the attributes reordered accordingly to the `FieldsIdsMap` given as argument.
798    ///
799    // TODO this can be done in parallel by using the rayon `ThreadPool`.
800    #[tracing::instrument(
801        level = "trace"
802        skip(self, wtxn, settings_diff),
803        target = "indexing::documents"
804    )]
805    pub fn prepare_for_documents_reindexing(
806        self,
807        wtxn: &mut heed::RwTxn<'i>,
808        settings_diff: InnerIndexSettingsDiff,
809    ) -> Result<TransformOutput> {
810        // There already has been a document addition, the primary key should be set by now.
811        let primary_key = self
812            .index
813            .primary_key(wtxn)?
814            .ok_or(InternalError::DatabaseMissingEntry {
815                db_name: db_name::MAIN,
816                key: Some(main_key::PRIMARY_KEY_KEY),
817            })?
818            .to_string();
819        let field_distribution = self.index.field_distribution(wtxn)?;
820
821        let documents_ids = self.index.documents_ids(wtxn)?;
822        let documents_count = documents_ids.len() as usize;
823
824        // We initialize the sorter with the user indexing settings.
825        let mut original_sorter = if settings_diff.reindex_vectors() {
826            Some(create_sorter(
827                grenad::SortAlgorithm::Stable,
828                KeepFirst,
829                self.indexer_settings.chunk_compression_type,
830                self.indexer_settings.chunk_compression_level,
831                self.indexer_settings.max_nb_chunks,
832                self.indexer_settings.max_memory.map(|mem| mem / 2),
833                true,
834            ))
835        } else {
836            None
837        };
838
839        let readers: BTreeMap<&str, (ArroyWrapper, &RoaringBitmap)> = settings_diff
840            .embedding_config_updates
841            .iter()
842            .filter_map(|(name, action)| {
843                if let Some(WriteBackToDocuments { embedder_id, user_provided }) =
844                    action.write_back()
845                {
846                    let reader = ArroyWrapper::new(
847                        self.index.vector_arroy,
848                        *embedder_id,
849                        action.was_quantized,
850                    );
851                    Some((name.as_str(), (reader, user_provided)))
852                } else {
853                    None
854                }
855            })
856            .collect();
857
858        let old_vectors_fid =
859            settings_diff.old.fields_ids_map.id(crate::constants::RESERVED_VECTORS_FIELD_NAME);
860
861        // We initialize the sorter with the user indexing settings.
862        let mut flattened_sorter =
863            if settings_diff.reindex_searchable() || settings_diff.reindex_facets() {
864                Some(create_sorter(
865                    grenad::SortAlgorithm::Stable,
866                    KeepFirst,
867                    self.indexer_settings.chunk_compression_type,
868                    self.indexer_settings.chunk_compression_level,
869                    self.indexer_settings.max_nb_chunks,
870                    self.indexer_settings.max_memory.map(|mem| mem / 2),
871                    true,
872                ))
873            } else {
874                None
875            };
876
877        if original_sorter.is_some() || flattened_sorter.is_some() {
878            let mut original_obkv_buffer = Vec::new();
879            let mut flattened_obkv_buffer = Vec::new();
880            let mut document_sorter_key_buffer = Vec::new();
881            for result in self.index.external_documents_ids().iter(wtxn)? {
882                let (external_id, docid) = result?;
883                let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
884                    InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
885                )?;
886
887                let injected_vectors: std::result::Result<
888                    serde_json::Map<String, serde_json::Value>,
889                    arroy::Error,
890                > = readers
891                    .iter()
892                    .filter_map(|(name, (reader, user_provided))| {
893                        if !user_provided.contains(docid) {
894                            return None;
895                        }
896                        match reader.item_vectors(wtxn, docid) {
897                            Ok(vectors) if vectors.is_empty() => None,
898                            Ok(vectors) => Some(Ok((
899                                name.to_string(),
900                                serde_json::to_value(ExplicitVectors {
901                                    embeddings: Some(
902                                        VectorOrArrayOfVectors::from_array_of_vectors(vectors),
903                                    ),
904                                    regenerate: false,
905                                })
906                                .unwrap(),
907                            ))),
908                            Err(e) => Some(Err(e)),
909                        }
910                    })
911                    .collect();
912
913                let injected_vectors = injected_vectors?;
914
915                Self::rebind_existing_document(
916                    old_obkv,
917                    &settings_diff,
918                    injected_vectors,
919                    old_vectors_fid,
920                    Some(&mut original_obkv_buffer).filter(|_| original_sorter.is_some()),
921                    Some(&mut flattened_obkv_buffer).filter(|_| flattened_sorter.is_some()),
922                )?;
923
924                if let Some(original_sorter) = original_sorter.as_mut() {
925                    document_sorter_key_buffer.clear();
926                    document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
927                    document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
928                    original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?;
929                }
930                if let Some(flattened_sorter) = flattened_sorter.as_mut() {
931                    flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?;
932                }
933            }
934        }
935
936        // delete all vectors from the embedders that need removal
937        for (_, (reader, _)) in readers {
938            let dimensions = reader.dimensions(wtxn)?;
939            reader.clear(wtxn, dimensions)?;
940        }
941
942        let grenad_params = GrenadParameters {
943            chunk_compression_type: self.indexer_settings.chunk_compression_type,
944            chunk_compression_level: self.indexer_settings.chunk_compression_level,
945            max_memory: self.indexer_settings.max_memory,
946            max_nb_chunks: self.indexer_settings.max_nb_chunks, // default value, may be chosen.
947        };
948
949        // Once we have written all the documents, we merge everything into a Reader.
950        let flattened_documents = match flattened_sorter {
951            Some(flattened_sorter) => Some(sorter_into_reader(flattened_sorter, grenad_params)?),
952            None => None,
953        };
954        let original_documents = match original_sorter {
955            Some(original_sorter) => Some(sorter_into_reader(original_sorter, grenad_params)?),
956            None => None,
957        };
958
959        Ok(TransformOutput {
960            primary_key,
961            field_distribution,
962            settings_diff,
963            documents_count,
964            original_documents: original_documents.map(|od| od.into_inner().into_inner()),
965            flattened_documents: flattened_documents.map(|fd| fd.into_inner().into_inner()),
966        })
967    }
968}
969
970/// Drops all the value of type `U` in vec, and reuses the allocation to create a `Vec<T>`.
971///
972/// The size and alignment of T and U must match.
973fn drop_and_reuse<U, T>(mut vec: Vec<U>) -> Vec<T> {
974    debug_assert_eq!(std::mem::align_of::<U>(), std::mem::align_of::<T>());
975    debug_assert_eq!(std::mem::size_of::<U>(), std::mem::size_of::<T>());
976    vec.clear();
977    debug_assert!(vec.is_empty());
978    vec.into_iter().map(|_| unreachable!()).collect()
979}
980
981#[cfg(test)]
982mod test {
983    use grenad::MergeFunction;
984    use obkv::KvReaderU16;
985
986    use super::*;
987
988    #[test]
989    fn merge_obkvs() {
990        let mut additive_doc_0 = Vec::new();
991        let mut deletive_doc_0 = Vec::new();
992        let mut del_add_doc_0 = Vec::new();
993        let mut kv_writer = KvWriter::memory();
994        kv_writer.insert(0_u8, [0]).unwrap();
995        let buffer = kv_writer.into_inner().unwrap();
996        into_del_add_obkv(
997            KvReaderU16::from_slice(&buffer),
998            DelAddOperation::Addition,
999            &mut additive_doc_0,
1000        )
1001        .unwrap();
1002        additive_doc_0.insert(0, Operation::Addition as u8);
1003        into_del_add_obkv(
1004            KvReaderU16::from_slice(&buffer),
1005            DelAddOperation::Deletion,
1006            &mut deletive_doc_0,
1007        )
1008        .unwrap();
1009        deletive_doc_0.insert(0, Operation::Deletion as u8);
1010        into_del_add_obkv(
1011            KvReaderU16::from_slice(&buffer),
1012            DelAddOperation::DeletionAndAddition,
1013            &mut del_add_doc_0,
1014        )
1015        .unwrap();
1016        del_add_doc_0.insert(0, Operation::Addition as u8);
1017
1018        let mut additive_doc_1 = Vec::new();
1019        let mut kv_writer = KvWriter::memory();
1020        kv_writer.insert(1_u8, [1]).unwrap();
1021        let buffer = kv_writer.into_inner().unwrap();
1022        into_del_add_obkv(
1023            KvReaderU16::from_slice(&buffer),
1024            DelAddOperation::Addition,
1025            &mut additive_doc_1,
1026        )
1027        .unwrap();
1028        additive_doc_1.insert(0, Operation::Addition as u8);
1029
1030        let mut additive_doc_0_1 = Vec::new();
1031        let mut kv_writer = KvWriter::memory();
1032        kv_writer.insert(0_u8, [0]).unwrap();
1033        kv_writer.insert(1_u8, [1]).unwrap();
1034        let buffer = kv_writer.into_inner().unwrap();
1035        into_del_add_obkv(
1036            KvReaderU16::from_slice(&buffer),
1037            DelAddOperation::Addition,
1038            &mut additive_doc_0_1,
1039        )
1040        .unwrap();
1041        additive_doc_0_1.insert(0, Operation::Addition as u8);
1042
1043        let ret = MergeFunction::merge(
1044            &ObkvsMergeAdditionsAndDeletions,
1045            &[],
1046            &[Cow::from(additive_doc_0.as_slice())],
1047        )
1048        .unwrap();
1049        assert_eq!(*ret, additive_doc_0);
1050
1051        let ret = MergeFunction::merge(
1052            &ObkvsMergeAdditionsAndDeletions,
1053            &[],
1054            &[Cow::from(deletive_doc_0.as_slice()), Cow::from(additive_doc_0.as_slice())],
1055        )
1056        .unwrap();
1057        assert_eq!(*ret, del_add_doc_0);
1058
1059        let ret = MergeFunction::merge(
1060            &ObkvsMergeAdditionsAndDeletions,
1061            &[],
1062            &[Cow::from(additive_doc_0.as_slice()), Cow::from(deletive_doc_0.as_slice())],
1063        )
1064        .unwrap();
1065        assert_eq!(*ret, deletive_doc_0);
1066
1067        let ret = MergeFunction::merge(
1068            &ObkvsMergeAdditionsAndDeletions,
1069            &[],
1070            &[
1071                Cow::from(additive_doc_1.as_slice()),
1072                Cow::from(deletive_doc_0.as_slice()),
1073                Cow::from(additive_doc_0.as_slice()),
1074            ],
1075        )
1076        .unwrap();
1077        assert_eq!(*ret, del_add_doc_0);
1078
1079        let ret = MergeFunction::merge(
1080            &ObkvsMergeAdditionsAndDeletions,
1081            &[],
1082            &[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())],
1083        )
1084        .unwrap();
1085        assert_eq!(*ret, additive_doc_0_1);
1086
1087        let ret = MergeFunction::merge(
1088            &ObkvsKeepLastAdditionMergeDeletions,
1089            &[],
1090            &[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())],
1091        )
1092        .unwrap();
1093        assert_eq!(*ret, additive_doc_0);
1094
1095        let ret = MergeFunction::merge(
1096            &ObkvsKeepLastAdditionMergeDeletions,
1097            &[],
1098            &[
1099                Cow::from(deletive_doc_0.as_slice()),
1100                Cow::from(additive_doc_1.as_slice()),
1101                Cow::from(additive_doc_0.as_slice()),
1102            ],
1103        )
1104        .unwrap();
1105        assert_eq!(*ret, del_add_doc_0);
1106    }
1107}