1use std::borrow::Cow;
2use std::collections::btree_map::Entry as BEntry;
3use std::collections::hash_map::Entry as HEntry;
4use std::collections::{BTreeMap, HashMap};
5use std::fs::File;
6use std::io::{Read, Seek};
7
8use either::Either;
9use fxhash::FxHashMap;
10use itertools::Itertools;
11use obkv::{KvReader, KvReaderU16, KvWriter};
12use roaring::RoaringBitmap;
13use serde_json::Value;
14use smartstring::SmartString;
15
16use super::helpers::{
17 create_sorter, sorter_into_reader, EitherObkvMerge, ObkvsKeepLastAdditionMergeDeletions,
18 ObkvsMergeAdditionsAndDeletions,
19};
20use super::{create_writer, IndexDocumentsMethod, IndexerConfig, KeepFirst};
21use crate::attribute_patterns::PatternMatch;
22use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
23use crate::error::{Error, InternalError, UserError};
24use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
25use crate::flatten_serde_json;
26use crate::index::{db_name, main_key};
27use crate::json_depth_checker;
28use crate::update::del_add::{
29 into_del_add_obkv, into_del_add_obkv_conditional_operation, DelAdd, DelAddOperation,
30 KvReaderDelAdd,
31};
32use crate::update::index_documents::GrenadParameters;
33use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
34use crate::update::{AvailableIds, UpdateIndexingStep};
35use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
36use crate::vector::settings::WriteBackToDocuments;
37use crate::vector::ArroyWrapper;
38use crate::{FieldDistribution, FieldId, FieldIdMapMissingEntry, Index, Result};
39
40pub struct TransformOutput {
41 pub primary_key: String,
42 pub settings_diff: InnerIndexSettingsDiff,
43 pub field_distribution: FieldDistribution,
44 pub documents_count: usize,
45 pub original_documents: Option<File>,
46 pub flattened_documents: Option<File>,
47}
48
49pub struct Transform<'a, 'i> {
56 pub index: &'i Index,
57 fields_ids_map: FieldIdMapWithMetadata,
58
59 indexer_settings: &'a IndexerConfig,
60 pub index_documents_method: IndexDocumentsMethod,
61 available_documents_ids: AvailableIds,
62
63 original_sorter: grenad::Sorter<EitherObkvMerge>,
67 flattened_sorter: grenad::Sorter<EitherObkvMerge>,
68
69 replaced_documents_ids: RoaringBitmap,
70 new_documents_ids: RoaringBitmap,
71 new_external_documents_ids_builder: FxHashMap<SmartString<smartstring::Compact>, u64>,
73 documents_count: usize,
74}
75
76#[repr(u8)]
79pub enum Operation {
80 Addition,
81 Deletion,
82}
83
84fn create_fields_mapping(
89 index_field_map: &mut FieldIdMapWithMetadata,
90 batch_field_map: &DocumentsBatchIndex,
91) -> Result<HashMap<FieldId, FieldId>> {
92 batch_field_map
93 .iter()
94 .sorted_by_key(|(&id, _)| id)
97 .map(|(field, name)| match index_field_map.id(name) {
98 Some(id) => Ok((*field, id)),
99 None => index_field_map
100 .insert(name)
101 .ok_or(Error::UserError(UserError::AttributeLimitReached))
102 .map(|id| (*field, id)),
103 })
104 .collect()
105}
106
107impl<'a, 'i> Transform<'a, 'i> {
108 pub fn new(
109 wtxn: &mut heed::RwTxn<'_>,
110 index: &'i Index,
111 indexer_settings: &'a IndexerConfig,
112 index_documents_method: IndexDocumentsMethod,
113 _autogenerate_docids: bool,
114 ) -> Result<Self> {
115 use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments};
116
117 let merge_function = match index_documents_method {
120 ReplaceDocuments => Either::Left(ObkvsKeepLastAdditionMergeDeletions),
121 UpdateDocuments => Either::Right(ObkvsMergeAdditionsAndDeletions),
122 };
123
124 let original_sorter = create_sorter(
126 grenad::SortAlgorithm::Stable,
127 merge_function,
128 indexer_settings.chunk_compression_type,
129 indexer_settings.chunk_compression_level,
130 indexer_settings.max_nb_chunks,
131 indexer_settings.max_memory.map(|mem| mem / 2),
132 true,
133 );
134
135 let flattened_sorter = create_sorter(
137 grenad::SortAlgorithm::Stable,
138 merge_function,
139 indexer_settings.chunk_compression_type,
140 indexer_settings.chunk_compression_level,
141 indexer_settings.max_nb_chunks,
142 indexer_settings.max_memory.map(|mem| mem / 2),
143 true,
144 );
145 let documents_ids = index.documents_ids(wtxn)?;
146 let fields_ids_map = index.fields_ids_map(wtxn)?;
147 let builder = MetadataBuilder::from_index(index, wtxn)?;
148 let fields_ids_map = FieldIdMapWithMetadata::new(fields_ids_map, builder);
149
150 Ok(Transform {
151 index,
152 fields_ids_map,
153 indexer_settings,
154 available_documents_ids: AvailableIds::new(&documents_ids),
155 original_sorter,
156 flattened_sorter,
157 index_documents_method,
158 replaced_documents_ids: RoaringBitmap::new(),
159 new_documents_ids: RoaringBitmap::new(),
160 new_external_documents_ids_builder: FxHashMap::default(),
161 documents_count: 0,
162 })
163 }
164
165 #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
166 pub fn read_documents<R, FP, FA>(
167 &mut self,
168 reader: EnrichedDocumentsBatchReader<R>,
169 wtxn: &mut heed::RwTxn<'_>,
170 progress_callback: FP,
171 should_abort: FA,
172 ) -> Result<usize>
173 where
174 R: Read + Seek,
175 FP: Fn(UpdateIndexingStep) + Sync,
176 FA: Fn() -> bool + Sync,
177 {
178 let (mut cursor, fields_index) = reader.into_cursor_and_fields_index();
179 let external_documents_ids = self.index.external_documents_ids();
180 let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
181
182 let primary_key = cursor.primary_key().to_string();
183 let primary_key_id =
184 self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
185
186 let mut obkv_buffer = Vec::new();
187 let mut document_sorter_value_buffer = Vec::new();
188 let mut document_sorter_key_buffer = Vec::new();
189 let mut documents_count = 0;
190 let mut docid_buffer: Vec<u8> = Vec::new();
191 let mut field_buffer: Vec<(u16, Cow<'_, [u8]>)> = Vec::new();
192 while let Some(enriched_document) = cursor.next_enriched_document()? {
193 let EnrichedDocument { document, document_id } = enriched_document;
194
195 if should_abort() {
196 return Err(Error::InternalError(InternalError::AbortedIndexation));
197 }
198
199 let mut field_buffer_cache = drop_and_reuse(field_buffer);
202 if self.indexer_settings.log_every_n.is_some_and(|len| documents_count % len == 0) {
203 progress_callback(UpdateIndexingStep::RemapDocumentAddition {
204 documents_seen: documents_count,
205 });
206 }
207
208 let external_id = document_id.value();
211 if document_id.is_generated() {
212 serde_json::to_writer(&mut docid_buffer, external_id)
213 .map_err(InternalError::SerdeJson)?;
214 field_buffer_cache.push((primary_key_id, Cow::from(&docid_buffer)));
215 }
216
217 for (k, v) in document.iter() {
218 let mapped_id =
219 *mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?;
220 field_buffer_cache.push((mapped_id, Cow::from(v)));
221 }
222
223 field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2));
227
228 let mut writer = KvWriter::new(&mut obkv_buffer);
230 for (k, v) in field_buffer_cache.iter() {
231 writer.insert(*k, v)?;
232 }
233
234 let mut original_docid = None;
235 let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) {
236 HEntry::Occupied(entry) => *entry.get() as u32,
237 HEntry::Vacant(entry) => {
238 let docid = match external_documents_ids.get(wtxn, entry.key())? {
239 Some(docid) => {
240 if self.replaced_documents_ids.insert(docid) {
243 original_docid = Some(docid);
244 }
245
246 docid
247 }
248 None => self
249 .available_documents_ids
250 .next()
251 .ok_or(UserError::DocumentLimitReached)?,
252 };
253 entry.insert(docid as u64);
254 docid
255 }
256 };
257
258 let mut skip_insertion = false;
259 if let Some(original_docid) = original_docid {
260 let original_key = original_docid;
261 let base_obkv = self
262 .index
263 .documents
264 .remap_data_type::<heed::types::Bytes>()
265 .get(wtxn, &original_key)?
266 .ok_or(InternalError::DatabaseMissingEntry {
267 db_name: db_name::DOCUMENTS,
268 key: None,
269 })?;
270
271 if base_obkv == obkv_buffer {
273 self.replaced_documents_ids.remove(original_docid);
275 self.new_external_documents_ids_builder.remove(external_id);
277 skip_insertion = true;
278 } else {
279 let deladd_operation = match self.index_documents_method {
281 IndexDocumentsMethod::UpdateDocuments => {
282 DelAddOperation::DeletionAndAddition
283 }
284 IndexDocumentsMethod::ReplaceDocuments => DelAddOperation::Deletion,
285 };
286 document_sorter_key_buffer.clear();
287 document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
288 document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
289 document_sorter_value_buffer.clear();
290 document_sorter_value_buffer.push(Operation::Addition as u8);
291 into_del_add_obkv(
292 KvReaderU16::from_slice(base_obkv),
293 deladd_operation,
294 &mut document_sorter_value_buffer,
295 )?;
296 self.original_sorter
297 .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
298 let base_obkv = KvReader::from_slice(base_obkv);
299 if let Some(flattened_obkv) =
300 Self::flatten_from_fields_ids_map(base_obkv, &mut self.fields_ids_map)?
301 {
302 document_sorter_value_buffer.clear();
304 document_sorter_value_buffer.push(Operation::Addition as u8);
305 into_del_add_obkv(
306 KvReaderU16::from_slice(&flattened_obkv),
307 deladd_operation,
308 &mut document_sorter_value_buffer,
309 )?;
310 }
311 self.flattened_sorter
312 .insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
313 }
314 }
315
316 if !skip_insertion {
317 self.new_documents_ids.insert(docid);
318
319 document_sorter_key_buffer.clear();
320 document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
321 document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
322 document_sorter_value_buffer.clear();
323 document_sorter_value_buffer.push(Operation::Addition as u8);
324 into_del_add_obkv(
325 KvReaderU16::from_slice(&obkv_buffer),
326 DelAddOperation::Addition,
327 &mut document_sorter_value_buffer,
328 )?;
329 self.original_sorter
331 .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
332
333 let flattened_obkv = KvReader::from_slice(&obkv_buffer);
334 if let Some(obkv) =
335 Self::flatten_from_fields_ids_map(flattened_obkv, &mut self.fields_ids_map)?
336 {
337 document_sorter_value_buffer.clear();
338 document_sorter_value_buffer.push(Operation::Addition as u8);
339 into_del_add_obkv(
340 KvReaderU16::from_slice(&obkv),
341 DelAddOperation::Addition,
342 &mut document_sorter_value_buffer,
343 )?
344 }
345 self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
346 }
347 documents_count += 1;
348
349 progress_callback(UpdateIndexingStep::RemapDocumentAddition {
350 documents_seen: documents_count,
351 });
352
353 field_buffer = drop_and_reuse(field_buffer_cache);
354 docid_buffer.clear();
355 obkv_buffer.clear();
356 }
357
358 progress_callback(UpdateIndexingStep::RemapDocumentAddition {
359 documents_seen: documents_count,
360 });
361
362 self.index.put_fields_ids_map(wtxn, self.fields_ids_map.as_fields_ids_map())?;
363 self.index.put_primary_key(wtxn, &primary_key)?;
364 self.documents_count += documents_count;
365 Ok(documents_count)
368 }
369
370 #[tracing::instrument(
373 level = "trace",
374 skip(obkv, fields_ids_map),
375 target = "indexing::transform"
376 )]
377 fn flatten_from_fields_ids_map(
378 obkv: &KvReader<FieldId>,
379 fields_ids_map: &mut FieldIdMapWithMetadata,
380 ) -> Result<Option<Vec<u8>>> {
381 if obkv
382 .iter()
383 .all(|(_, value)| !json_depth_checker::should_flatten_from_unchecked_slice(value))
384 {
385 return Ok(None);
386 }
387
388 let mut key_value: Vec<(FieldId, Cow<'_, [u8]>)> = Vec::new();
393
394 let mut doc = serde_json::Map::new();
396
397 for (key, value) in obkv.iter() {
400 if json_depth_checker::should_flatten_from_unchecked_slice(value) {
401 let key = fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId {
402 field_id: key,
403 process: "Flatten from fields ids map.",
404 })?;
405
406 let value = serde_json::from_slice::<Value>(value)
407 .map_err(crate::error::InternalError::SerdeJson)?;
408 doc.insert(key.to_string(), value);
409 } else {
410 key_value.push((key, value.into()));
411 }
412 }
413
414 let flattened = flatten_serde_json::flatten(&doc);
415
416 for (key, value) in flattened.into_iter() {
419 let fid = fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?;
420 let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
421 key_value.push((fid, value.into()));
422 }
423
424 key_value.sort_unstable_by_key(|(key, _)| *key);
427
428 let mut buffer = Vec::new();
429 Self::create_obkv_from_key_value(&mut key_value, &mut buffer)?;
430 Ok(Some(buffer))
431 }
432
433 fn create_obkv_from_key_value(
435 key_value: &mut [(FieldId, Cow<'_, [u8]>)],
436 output_buffer: &mut Vec<u8>,
437 ) -> Result<()> {
438 debug_assert!(
439 key_value.windows(2).all(|vec| vec[0].0 <= vec[1].0),
440 "The slice of key / value pair must be sorted."
441 );
442
443 output_buffer.clear();
444 let mut writer = KvWriter::new(output_buffer);
445
446 let mut skip_next_value = false;
447 for things in key_value.windows(2) {
448 if skip_next_value {
449 skip_next_value = false;
450 continue;
451 }
452 let (key1, value1) = &things[0];
453 let (key2, value2) = &things[1];
454
455 if key1 == key2 {
463 skip_next_value = true;
464
465 let value1 = serde_json::from_slice(value1)
466 .map_err(crate::error::InternalError::SerdeJson)?;
467 let value2 = serde_json::from_slice(value2)
468 .map_err(crate::error::InternalError::SerdeJson)?;
469 let value = match (value1, value2) {
470 (Value::Array(mut left), Value::Array(mut right)) => {
471 left.append(&mut right);
472 Value::Array(left)
473 }
474 (Value::Array(mut array), value) | (value, Value::Array(mut array)) => {
475 array.push(value);
476 Value::Array(array)
477 }
478 (left, right) => Value::Array(vec![left, right]),
479 };
480
481 let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
482 writer.insert(*key1, value)?;
483 } else {
484 writer.insert(*key1, value1)?;
485 }
486 }
487
488 if !skip_next_value {
489 let (key, value) = key_value.last().unwrap();
491 writer.insert(*key, value)?;
492 }
493
494 Ok(())
495 }
496
497 #[tracing::instrument(level = "trace", skip_all, target = "indexing::transform")]
501 pub(crate) fn output_from_sorter<F>(
502 self,
503 wtxn: &mut heed::RwTxn<'_>,
504 progress_callback: F,
505 ) -> Result<TransformOutput>
506 where
507 F: Fn(UpdateIndexingStep) + Sync,
508 {
509 let primary_key = self
510 .index
511 .primary_key(wtxn)?
512 .ok_or(Error::InternalError(InternalError::DatabaseMissingEntry {
513 db_name: db_name::MAIN,
514 key: Some(main_key::PRIMARY_KEY_KEY),
515 }))?
516 .to_string();
517
518 let mut writer = create_writer(
520 self.indexer_settings.chunk_compression_type,
521 self.indexer_settings.chunk_compression_level,
522 tempfile::tempfile()?,
523 );
524
525 let mut field_distribution = self.index.field_distribution(wtxn)?;
529
530 let mut iter = self.original_sorter.into_stream_merger_iter()?;
532 let mut documents_count = 0;
534
535 while let Some((key, val)) = iter.next()? {
536 let val = &val[1..];
538
539 documents_count += 1;
541 progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {
542 documents_seen: documents_count,
543 total_documents: self.documents_count,
544 });
545
546 for (key, value) in KvReader::from_slice(val) {
547 let reader = KvReaderDelAdd::from_slice(value);
548 match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) {
549 (None, None) => (),
550 (None, Some(_)) => {
551 let name = self.fields_ids_map.name(key).ok_or(
553 FieldIdMapMissingEntry::FieldId {
554 field_id: key,
555 process: "Computing field distribution in transform.",
556 },
557 )?;
558 *field_distribution.entry(name.to_string()).or_insert(0) += 1;
559 }
560 (Some(_), None) => {
561 let name = self.fields_ids_map.name(key).ok_or(
563 FieldIdMapMissingEntry::FieldId {
564 field_id: key,
565 process: "Computing field distribution in transform.",
566 },
567 )?;
568 match field_distribution.entry(name.to_string()) {
569 BEntry::Vacant(_) => { }
571 BEntry::Occupied(mut entry) => {
572 match entry.get_mut().checked_sub(1) {
574 Some(0) => {
575 entry.remove();
576 }
577 Some(new_val) => {
578 *entry.get_mut() = new_val;
579 }
580 None => {
581 unreachable!("Attempting to remove a field that wasn't in the field distribution")
582 }
583 }
584 }
585 }
586 }
587 (Some(_), Some(_)) => {
588 }
590 }
591 }
592 writer.insert(key, val)?;
593 }
594
595 let mut original_documents = writer.into_inner()?;
596 original_documents.rewind()?;
598
599 let mut writer = create_writer(
601 self.indexer_settings.chunk_compression_type,
602 self.indexer_settings.chunk_compression_level,
603 tempfile::tempfile()?,
604 );
605
606 let mut iter = self.flattened_sorter.into_stream_merger_iter()?;
610 while let Some((key, val)) = iter.next()? {
611 let val = &val[1..];
613 writer.insert(key, val)?;
614 }
615 let mut flattened_documents = writer.into_inner()?;
616 flattened_documents.rewind()?;
617
618 let mut new_external_documents_ids_builder: Vec<_> =
619 self.new_external_documents_ids_builder.into_iter().collect();
620
621 new_external_documents_ids_builder
622 .sort_unstable_by(|(left, _), (right, _)| left.cmp(right));
623 let mut fst_new_external_documents_ids_builder = fst::MapBuilder::memory();
624 new_external_documents_ids_builder.into_iter().try_for_each(|(key, value)| {
625 fst_new_external_documents_ids_builder.insert(key, value)
626 })?;
627
628 let old_inner_settings = InnerIndexSettings::from_index(self.index, wtxn, None)?;
629 let fields_ids_map = self.fields_ids_map;
630 let primary_key_id = self.index.primary_key(wtxn)?.and_then(|name| fields_ids_map.id(name));
631 let mut new_inner_settings = old_inner_settings.clone();
632 new_inner_settings.fields_ids_map = fields_ids_map;
633
634 let embedding_config_updates = Default::default();
635 let settings_update_only = false;
636 let settings_diff = InnerIndexSettingsDiff::new(
637 old_inner_settings,
638 new_inner_settings,
639 primary_key_id,
640 embedding_config_updates,
641 settings_update_only,
642 );
643
644 Ok(TransformOutput {
645 primary_key,
646 settings_diff,
647 field_distribution,
648 documents_count: self.documents_count,
649 original_documents: Some(
650 original_documents.into_inner().map_err(|err| err.into_error())?,
651 ),
652 flattened_documents: Some(
653 flattened_documents.into_inner().map_err(|err| err.into_error())?,
654 ),
655 })
656 }
657
658 #[allow(clippy::too_many_arguments)] fn rebind_existing_document(
663 old_obkv: &KvReader<FieldId>,
664 settings_diff: &InnerIndexSettingsDiff,
665 mut injected_vectors: serde_json::Map<String, serde_json::Value>,
666 old_vectors_fid: Option<FieldId>,
667 original_obkv_buffer: Option<&mut Vec<u8>>,
668 flattened_obkv_buffer: Option<&mut Vec<u8>>,
669 ) -> Result<()> {
670 let is_primary_key = |id: FieldId| -> bool { settings_diff.primary_key_id == Some(id) };
672
673 let facet_fids_changed = settings_diff.facet_fids_changed();
675
676 let necessary_faceted_field = |id: FieldId| -> Option<DelAddOperation> {
677 if facet_fids_changed {
678 let field_name = settings_diff.new.fields_ids_map.name(id).unwrap();
679 match (
682 settings_diff.old.match_faceted_field(field_name),
683 settings_diff.new.match_faceted_field(field_name),
684 ) {
685 (PatternMatch::NoMatch, PatternMatch::NoMatch) => None,
686 (PatternMatch::NoMatch, _) => Some(DelAddOperation::Addition),
687 (_, PatternMatch::NoMatch) => Some(DelAddOperation::Deletion),
688 (_, _) => Some(DelAddOperation::DeletionAndAddition),
689 }
690 } else {
691 None
692 }
693 };
694
695 let reindex_vectors = settings_diff.reindex_vectors();
698
699 let mut operations = HashMap::new();
701 let mut error_seen = false;
702
703 let mut obkv_writer = KvWriter::<_, FieldId>::memory();
704 'write_fid: for (id, val) in old_obkv.iter() {
705 if !injected_vectors.is_empty() {
706 'inject_vectors: {
707 let Some(vectors_fid) = old_vectors_fid else { break 'inject_vectors };
708
709 if id < vectors_fid {
710 break 'inject_vectors;
711 }
712
713 let mut existing_vectors = if id == vectors_fid {
714 let existing_vectors: std::result::Result<
715 serde_json::Map<String, serde_json::Value>,
716 serde_json::Error,
717 > = serde_json::from_slice(val);
718
719 match existing_vectors {
720 Ok(existing_vectors) => existing_vectors,
721 Err(error) => {
722 if !error_seen {
723 tracing::error!(%error, "Unexpected `_vectors` field that is not a map. Treating as an empty map");
724 error_seen = true;
725 }
726 Default::default()
727 }
728 }
729 } else {
730 Default::default()
731 };
732
733 existing_vectors.append(&mut injected_vectors);
734
735 operations.insert(vectors_fid, DelAddOperation::DeletionAndAddition);
736 obkv_writer
737 .insert(vectors_fid, serde_json::to_vec(&existing_vectors).unwrap())?;
738 if id == vectors_fid {
739 continue 'write_fid;
740 }
741 }
742 }
743
744 if is_primary_key(id) || reindex_vectors {
745 operations.insert(id, DelAddOperation::DeletionAndAddition);
746 obkv_writer.insert(id, val)?;
747 } else {
748 let facet_operation = necessary_faceted_field(id);
749 let searchable_operation = settings_diff.reindex_searchable_id(id);
750 let operation = match (facet_operation, searchable_operation) {
751 (Some(facet_operation), Some(searchable_operation)) => {
752 Some(facet_operation.merge(searchable_operation))
753 }
754 (Some(operation), None) | (None, Some(operation)) => Some(operation),
755 (None, None) => None,
756 };
757
758 if let Some(operation) = operation {
759 operations.insert(id, operation);
760 obkv_writer.insert(id, val)?;
761 }
762 }
763 }
764 if !injected_vectors.is_empty() {
765 'inject_vectors: {
766 let Some(vectors_fid) = old_vectors_fid else { break 'inject_vectors };
767
768 operations.insert(vectors_fid, DelAddOperation::DeletionAndAddition);
769 obkv_writer.insert(vectors_fid, serde_json::to_vec(&injected_vectors).unwrap())?;
770 }
771 }
772
773 let data = obkv_writer.into_inner()?;
774 let obkv = KvReader::<FieldId>::from_slice(&data);
775
776 if let Some(original_obkv_buffer) = original_obkv_buffer {
777 original_obkv_buffer.clear();
778 into_del_add_obkv(obkv, DelAddOperation::DeletionAndAddition, original_obkv_buffer)?;
779 }
780
781 if let Some(flattened_obkv_buffer) = flattened_obkv_buffer {
782 let mut fields_ids_map = settings_diff.new.fields_ids_map.clone();
784 let flattened = Self::flatten_from_fields_ids_map(obkv, &mut fields_ids_map)?;
785 let flattened = flattened.as_deref().map_or(obkv, KvReader::from_slice);
786
787 flattened_obkv_buffer.clear();
788 into_del_add_obkv_conditional_operation(flattened, flattened_obkv_buffer, |id| {
789 operations.get(&id).copied().unwrap_or(DelAddOperation::DeletionAndAddition)
790 })?;
791 }
792
793 Ok(())
794 }
795
796 #[tracing::instrument(
801 level = "trace"
802 skip(self, wtxn, settings_diff),
803 target = "indexing::documents"
804 )]
805 pub fn prepare_for_documents_reindexing(
806 self,
807 wtxn: &mut heed::RwTxn<'i>,
808 settings_diff: InnerIndexSettingsDiff,
809 ) -> Result<TransformOutput> {
810 let primary_key = self
812 .index
813 .primary_key(wtxn)?
814 .ok_or(InternalError::DatabaseMissingEntry {
815 db_name: db_name::MAIN,
816 key: Some(main_key::PRIMARY_KEY_KEY),
817 })?
818 .to_string();
819 let field_distribution = self.index.field_distribution(wtxn)?;
820
821 let documents_ids = self.index.documents_ids(wtxn)?;
822 let documents_count = documents_ids.len() as usize;
823
824 let mut original_sorter = if settings_diff.reindex_vectors() {
826 Some(create_sorter(
827 grenad::SortAlgorithm::Stable,
828 KeepFirst,
829 self.indexer_settings.chunk_compression_type,
830 self.indexer_settings.chunk_compression_level,
831 self.indexer_settings.max_nb_chunks,
832 self.indexer_settings.max_memory.map(|mem| mem / 2),
833 true,
834 ))
835 } else {
836 None
837 };
838
839 let readers: BTreeMap<&str, (ArroyWrapper, &RoaringBitmap)> = settings_diff
840 .embedding_config_updates
841 .iter()
842 .filter_map(|(name, action)| {
843 if let Some(WriteBackToDocuments { embedder_id, user_provided }) =
844 action.write_back()
845 {
846 let reader = ArroyWrapper::new(
847 self.index.vector_arroy,
848 *embedder_id,
849 action.was_quantized,
850 );
851 Some((name.as_str(), (reader, user_provided)))
852 } else {
853 None
854 }
855 })
856 .collect();
857
858 let old_vectors_fid =
859 settings_diff.old.fields_ids_map.id(crate::constants::RESERVED_VECTORS_FIELD_NAME);
860
861 let mut flattened_sorter =
863 if settings_diff.reindex_searchable() || settings_diff.reindex_facets() {
864 Some(create_sorter(
865 grenad::SortAlgorithm::Stable,
866 KeepFirst,
867 self.indexer_settings.chunk_compression_type,
868 self.indexer_settings.chunk_compression_level,
869 self.indexer_settings.max_nb_chunks,
870 self.indexer_settings.max_memory.map(|mem| mem / 2),
871 true,
872 ))
873 } else {
874 None
875 };
876
877 if original_sorter.is_some() || flattened_sorter.is_some() {
878 let mut original_obkv_buffer = Vec::new();
879 let mut flattened_obkv_buffer = Vec::new();
880 let mut document_sorter_key_buffer = Vec::new();
881 for result in self.index.external_documents_ids().iter(wtxn)? {
882 let (external_id, docid) = result?;
883 let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
884 InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
885 )?;
886
887 let injected_vectors: std::result::Result<
888 serde_json::Map<String, serde_json::Value>,
889 arroy::Error,
890 > = readers
891 .iter()
892 .filter_map(|(name, (reader, user_provided))| {
893 if !user_provided.contains(docid) {
894 return None;
895 }
896 match reader.item_vectors(wtxn, docid) {
897 Ok(vectors) if vectors.is_empty() => None,
898 Ok(vectors) => Some(Ok((
899 name.to_string(),
900 serde_json::to_value(ExplicitVectors {
901 embeddings: Some(
902 VectorOrArrayOfVectors::from_array_of_vectors(vectors),
903 ),
904 regenerate: false,
905 })
906 .unwrap(),
907 ))),
908 Err(e) => Some(Err(e)),
909 }
910 })
911 .collect();
912
913 let injected_vectors = injected_vectors?;
914
915 Self::rebind_existing_document(
916 old_obkv,
917 &settings_diff,
918 injected_vectors,
919 old_vectors_fid,
920 Some(&mut original_obkv_buffer).filter(|_| original_sorter.is_some()),
921 Some(&mut flattened_obkv_buffer).filter(|_| flattened_sorter.is_some()),
922 )?;
923
924 if let Some(original_sorter) = original_sorter.as_mut() {
925 document_sorter_key_buffer.clear();
926 document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
927 document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
928 original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?;
929 }
930 if let Some(flattened_sorter) = flattened_sorter.as_mut() {
931 flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?;
932 }
933 }
934 }
935
936 for (_, (reader, _)) in readers {
938 let dimensions = reader.dimensions(wtxn)?;
939 reader.clear(wtxn, dimensions)?;
940 }
941
942 let grenad_params = GrenadParameters {
943 chunk_compression_type: self.indexer_settings.chunk_compression_type,
944 chunk_compression_level: self.indexer_settings.chunk_compression_level,
945 max_memory: self.indexer_settings.max_memory,
946 max_nb_chunks: self.indexer_settings.max_nb_chunks, };
948
949 let flattened_documents = match flattened_sorter {
951 Some(flattened_sorter) => Some(sorter_into_reader(flattened_sorter, grenad_params)?),
952 None => None,
953 };
954 let original_documents = match original_sorter {
955 Some(original_sorter) => Some(sorter_into_reader(original_sorter, grenad_params)?),
956 None => None,
957 };
958
959 Ok(TransformOutput {
960 primary_key,
961 field_distribution,
962 settings_diff,
963 documents_count,
964 original_documents: original_documents.map(|od| od.into_inner().into_inner()),
965 flattened_documents: flattened_documents.map(|fd| fd.into_inner().into_inner()),
966 })
967 }
968}
969
970fn drop_and_reuse<U, T>(mut vec: Vec<U>) -> Vec<T> {
974 debug_assert_eq!(std::mem::align_of::<U>(), std::mem::align_of::<T>());
975 debug_assert_eq!(std::mem::size_of::<U>(), std::mem::size_of::<T>());
976 vec.clear();
977 debug_assert!(vec.is_empty());
978 vec.into_iter().map(|_| unreachable!()).collect()
979}
980
981#[cfg(test)]
982mod test {
983 use grenad::MergeFunction;
984 use obkv::KvReaderU16;
985
986 use super::*;
987
988 #[test]
989 fn merge_obkvs() {
990 let mut additive_doc_0 = Vec::new();
991 let mut deletive_doc_0 = Vec::new();
992 let mut del_add_doc_0 = Vec::new();
993 let mut kv_writer = KvWriter::memory();
994 kv_writer.insert(0_u8, [0]).unwrap();
995 let buffer = kv_writer.into_inner().unwrap();
996 into_del_add_obkv(
997 KvReaderU16::from_slice(&buffer),
998 DelAddOperation::Addition,
999 &mut additive_doc_0,
1000 )
1001 .unwrap();
1002 additive_doc_0.insert(0, Operation::Addition as u8);
1003 into_del_add_obkv(
1004 KvReaderU16::from_slice(&buffer),
1005 DelAddOperation::Deletion,
1006 &mut deletive_doc_0,
1007 )
1008 .unwrap();
1009 deletive_doc_0.insert(0, Operation::Deletion as u8);
1010 into_del_add_obkv(
1011 KvReaderU16::from_slice(&buffer),
1012 DelAddOperation::DeletionAndAddition,
1013 &mut del_add_doc_0,
1014 )
1015 .unwrap();
1016 del_add_doc_0.insert(0, Operation::Addition as u8);
1017
1018 let mut additive_doc_1 = Vec::new();
1019 let mut kv_writer = KvWriter::memory();
1020 kv_writer.insert(1_u8, [1]).unwrap();
1021 let buffer = kv_writer.into_inner().unwrap();
1022 into_del_add_obkv(
1023 KvReaderU16::from_slice(&buffer),
1024 DelAddOperation::Addition,
1025 &mut additive_doc_1,
1026 )
1027 .unwrap();
1028 additive_doc_1.insert(0, Operation::Addition as u8);
1029
1030 let mut additive_doc_0_1 = Vec::new();
1031 let mut kv_writer = KvWriter::memory();
1032 kv_writer.insert(0_u8, [0]).unwrap();
1033 kv_writer.insert(1_u8, [1]).unwrap();
1034 let buffer = kv_writer.into_inner().unwrap();
1035 into_del_add_obkv(
1036 KvReaderU16::from_slice(&buffer),
1037 DelAddOperation::Addition,
1038 &mut additive_doc_0_1,
1039 )
1040 .unwrap();
1041 additive_doc_0_1.insert(0, Operation::Addition as u8);
1042
1043 let ret = MergeFunction::merge(
1044 &ObkvsMergeAdditionsAndDeletions,
1045 &[],
1046 &[Cow::from(additive_doc_0.as_slice())],
1047 )
1048 .unwrap();
1049 assert_eq!(*ret, additive_doc_0);
1050
1051 let ret = MergeFunction::merge(
1052 &ObkvsMergeAdditionsAndDeletions,
1053 &[],
1054 &[Cow::from(deletive_doc_0.as_slice()), Cow::from(additive_doc_0.as_slice())],
1055 )
1056 .unwrap();
1057 assert_eq!(*ret, del_add_doc_0);
1058
1059 let ret = MergeFunction::merge(
1060 &ObkvsMergeAdditionsAndDeletions,
1061 &[],
1062 &[Cow::from(additive_doc_0.as_slice()), Cow::from(deletive_doc_0.as_slice())],
1063 )
1064 .unwrap();
1065 assert_eq!(*ret, deletive_doc_0);
1066
1067 let ret = MergeFunction::merge(
1068 &ObkvsMergeAdditionsAndDeletions,
1069 &[],
1070 &[
1071 Cow::from(additive_doc_1.as_slice()),
1072 Cow::from(deletive_doc_0.as_slice()),
1073 Cow::from(additive_doc_0.as_slice()),
1074 ],
1075 )
1076 .unwrap();
1077 assert_eq!(*ret, del_add_doc_0);
1078
1079 let ret = MergeFunction::merge(
1080 &ObkvsMergeAdditionsAndDeletions,
1081 &[],
1082 &[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())],
1083 )
1084 .unwrap();
1085 assert_eq!(*ret, additive_doc_0_1);
1086
1087 let ret = MergeFunction::merge(
1088 &ObkvsKeepLastAdditionMergeDeletions,
1089 &[],
1090 &[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())],
1091 )
1092 .unwrap();
1093 assert_eq!(*ret, additive_doc_0);
1094
1095 let ret = MergeFunction::merge(
1096 &ObkvsKeepLastAdditionMergeDeletions,
1097 &[],
1098 &[
1099 Cow::from(deletive_doc_0.as_slice()),
1100 Cow::from(additive_doc_1.as_slice()),
1101 Cow::from(additive_doc_0.as_slice()),
1102 ],
1103 )
1104 .unwrap();
1105 assert_eq!(*ret, del_add_doc_0);
1106 }
1107}