milli_core/update/new/indexer/
document_operation.rs

1use std::sync::atomic::Ordering;
2
3use bumpalo::collections::CollectIn;
4use bumpalo::Bump;
5use bumparaw_collections::RawMap;
6use hashbrown::hash_map::Entry;
7use heed::RoTxn;
8use memmap2::Mmap;
9use rayon::slice::ParallelSlice;
10use rustc_hash::FxBuildHasher;
11use serde_json::value::RawValue;
12use serde_json::Deserializer;
13
14use super::super::document_change::DocumentChange;
15use super::document_changes::{DocumentChangeContext, DocumentChanges};
16use super::guess_primary_key::retrieve_or_guess_primary_key;
17use crate::documents::PrimaryKey;
18use crate::progress::{AtomicPayloadStep, Progress};
19use crate::update::new::document::Versions;
20use crate::update::new::steps::IndexingStep;
21use crate::update::new::thread_local::MostlySend;
22use crate::update::new::{Deletion, Insertion, Update};
23use crate::update::{AvailableIds, IndexDocumentsMethod};
24use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError};
25
26#[derive(Default)]
27pub struct DocumentOperation<'pl> {
28    operations: Vec<Payload<'pl>>,
29}
30
31impl<'pl> DocumentOperation<'pl> {
32    pub fn new() -> Self {
33        Self { operations: Default::default() }
34    }
35
36    /// Append a replacement of documents.
37    ///
38    /// The payload is expected to be in the NDJSON format
39    pub fn replace_documents(&mut self, payload: &'pl Mmap) -> Result<()> {
40        #[cfg(unix)]
41        payload.advise(memmap2::Advice::Sequential)?;
42        self.operations.push(Payload::Replace(&payload[..]));
43        Ok(())
44    }
45
46    /// Append an update of documents.
47    ///
48    /// The payload is expected to be in the NDJSON format
49    pub fn update_documents(&mut self, payload: &'pl Mmap) -> Result<()> {
50        #[cfg(unix)]
51        payload.advise(memmap2::Advice::Sequential)?;
52        self.operations.push(Payload::Update(&payload[..]));
53        Ok(())
54    }
55
56    /// Append a deletion of documents IDs.
57    ///
58    /// The list is a set of external documents IDs.
59    pub fn delete_documents(&mut self, to_delete: &'pl [&'pl str]) {
60        self.operations.push(Payload::Deletion(to_delete))
61    }
62
63    #[allow(clippy::too_many_arguments)]
64    #[tracing::instrument(level = "trace", skip_all, target = "indexing::document_operation")]
65    pub fn into_changes<MSP>(
66        self,
67        indexer: &'pl Bump,
68        index: &Index,
69        rtxn: &'pl RoTxn<'pl>,
70        primary_key_from_op: Option<&'pl str>,
71        new_fields_ids_map: &mut FieldsIdsMap,
72        must_stop_processing: &MSP,
73        progress: Progress,
74    ) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)>
75    where
76        MSP: Fn() -> bool,
77    {
78        progress.update_progress(IndexingStep::PreparingPayloads);
79        let Self { operations } = self;
80
81        let documents_ids = index.documents_ids(rtxn)?;
82        let mut operations_stats = Vec::new();
83        let mut available_docids = AvailableIds::new(&documents_ids);
84        let mut docids_version_offsets = hashbrown::HashMap::new();
85        let mut primary_key = None;
86
87        let payload_count = operations.len();
88        let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32);
89        progress.update_progress(progress_step);
90
91        for (payload_index, operation) in operations.into_iter().enumerate() {
92            if must_stop_processing() {
93                return Err(InternalError::AbortedIndexation.into());
94            }
95            step.store(payload_index as u32, Ordering::Relaxed);
96
97            let mut bytes = 0;
98            let result = match operation {
99                Payload::Replace(payload) => extract_addition_payload_changes(
100                    indexer,
101                    index,
102                    rtxn,
103                    primary_key_from_op,
104                    &mut primary_key,
105                    new_fields_ids_map,
106                    &mut available_docids,
107                    &mut bytes,
108                    &docids_version_offsets,
109                    IndexDocumentsMethod::ReplaceDocuments,
110                    payload,
111                ),
112                Payload::Update(payload) => extract_addition_payload_changes(
113                    indexer,
114                    index,
115                    rtxn,
116                    primary_key_from_op,
117                    &mut primary_key,
118                    new_fields_ids_map,
119                    &mut available_docids,
120                    &mut bytes,
121                    &docids_version_offsets,
122                    IndexDocumentsMethod::UpdateDocuments,
123                    payload,
124                ),
125                Payload::Deletion(to_delete) => extract_deletion_payload_changes(
126                    index,
127                    rtxn,
128                    &mut available_docids,
129                    &docids_version_offsets,
130                    to_delete,
131                ),
132            };
133
134            let mut document_count = 0;
135            let error = match result {
136                Ok(new_docids_version_offsets) => {
137                    document_count = new_docids_version_offsets.len() as u64;
138                    // If we don't have any error then we can merge the content of this payload
139                    // into to main payload. Else we just drop this payload extraction.
140                    merge_version_offsets(&mut docids_version_offsets, new_docids_version_offsets);
141                    None
142                }
143                Err(Error::UserError(user_error)) => Some(user_error),
144                Err(e) => return Err(e),
145            };
146            operations_stats.push(PayloadStats { document_count, bytes, error });
147        }
148        step.store(payload_count as u32, Ordering::Relaxed);
149
150        // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
151        let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> =
152            docids_version_offsets.drain().collect_in(indexer);
153
154        // Reorder the offsets to make sure we iterate on the file sequentially
155        // And finally sort them. This clearly speeds up reading the update files.
156        docids_version_offsets
157            .sort_unstable_by_key(|(_, po)| first_update_pointer(&po.operations).unwrap_or(0));
158
159        let docids_version_offsets = docids_version_offsets.into_bump_slice();
160        Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key))
161    }
162}
163
164#[allow(clippy::too_many_arguments)]
165fn extract_addition_payload_changes<'r, 'pl: 'r>(
166    indexer: &'pl Bump,
167    index: &Index,
168    rtxn: &'r RoTxn<'r>,
169    primary_key_from_op: Option<&'r str>,
170    primary_key: &mut Option<PrimaryKey<'r>>,
171    new_fields_ids_map: &mut FieldsIdsMap,
172    available_docids: &mut AvailableIds,
173    bytes: &mut u64,
174    main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>,
175    method: IndexDocumentsMethod,
176    payload: &'pl [u8],
177) -> Result<hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>> {
178    use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments};
179
180    let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new();
181
182    let mut previous_offset = 0;
183    let mut iter = Deserializer::from_slice(payload).into_iter::<&RawValue>();
184    while let Some(doc) = iter.next().transpose().map_err(InternalError::SerdeJson)? {
185        *bytes = previous_offset as u64;
186
187        // Only guess the primary key if it is the first document
188        let retrieved_primary_key = if previous_offset == 0 {
189            let doc = RawMap::from_raw_value_and_hasher(doc, FxBuildHasher, indexer)
190                .map(Some)
191                .map_err(UserError::SerdeJson)?;
192
193            let result = retrieve_or_guess_primary_key(
194                rtxn,
195                index,
196                new_fields_ids_map,
197                primary_key_from_op,
198                doc,
199            );
200
201            let (pk, _has_been_changed) = match result {
202                Ok(Ok(pk)) => pk,
203                Ok(Err(user_error)) => return Err(Error::UserError(user_error)),
204                Err(error) => return Err(error),
205            };
206
207            primary_key.get_or_insert(pk)
208        } else {
209            // primary key was retrieved in the first iteration or in a previous payload
210            primary_key.as_ref().unwrap()
211        };
212
213        let external_id =
214            retrieved_primary_key.extract_fields_and_docid(doc, new_fields_ids_map, indexer)?;
215
216        let external_id = external_id.to_de();
217        let current_offset = iter.byte_offset();
218        let document_offset = DocumentOffset { content: &payload[previous_offset..current_offset] };
219
220        match main_docids_version_offsets.get(external_id) {
221            None => {
222                match index.external_documents_ids().get(rtxn, external_id) {
223                    Ok(Some(docid)) => match new_docids_version_offsets.entry(external_id) {
224                        Entry::Occupied(mut entry) => match method {
225                            ReplaceDocuments => entry.get_mut().push_replacement(document_offset),
226                            UpdateDocuments => entry.get_mut().push_update(document_offset),
227                        },
228                        Entry::Vacant(entry) => {
229                            match method {
230                                ReplaceDocuments => {
231                                    entry.insert(PayloadOperations::new_replacement(
232                                        docid,
233                                        false, // is new
234                                        document_offset,
235                                    ));
236                                }
237                                UpdateDocuments => {
238                                    entry.insert(PayloadOperations::new_update(
239                                        docid,
240                                        false, // is new
241                                        document_offset,
242                                    ));
243                                }
244                            }
245                        }
246                    },
247                    Ok(None) => match new_docids_version_offsets.entry(external_id) {
248                        Entry::Occupied(mut entry) => match method {
249                            ReplaceDocuments => entry.get_mut().push_replacement(document_offset),
250                            UpdateDocuments => entry.get_mut().push_update(document_offset),
251                        },
252                        Entry::Vacant(entry) => {
253                            let docid = match available_docids.next() {
254                                Some(docid) => docid,
255                                None => return Err(UserError::DocumentLimitReached.into()),
256                            };
257
258                            match method {
259                                ReplaceDocuments => {
260                                    entry.insert(PayloadOperations::new_replacement(
261                                        docid,
262                                        true, // is new
263                                        document_offset,
264                                    ));
265                                }
266                                UpdateDocuments => {
267                                    entry.insert(PayloadOperations::new_update(
268                                        docid,
269                                        true, // is new
270                                        document_offset,
271                                    ));
272                                }
273                            }
274                        }
275                    },
276                    Err(e) => return Err(e.into()),
277                }
278            }
279            Some(payload_operations) => match new_docids_version_offsets.entry(external_id) {
280                Entry::Occupied(mut entry) => match method {
281                    ReplaceDocuments => entry.get_mut().push_replacement(document_offset),
282                    UpdateDocuments => entry.get_mut().push_update(document_offset),
283                },
284                Entry::Vacant(entry) => match method {
285                    ReplaceDocuments => {
286                        entry.insert(PayloadOperations::new_replacement(
287                            payload_operations.docid,
288                            payload_operations.is_new,
289                            document_offset,
290                        ));
291                    }
292                    UpdateDocuments => {
293                        entry.insert(PayloadOperations::new_update(
294                            payload_operations.docid,
295                            payload_operations.is_new,
296                            document_offset,
297                        ));
298                    }
299                },
300            },
301        }
302
303        previous_offset = iter.byte_offset();
304    }
305
306    if payload.is_empty() {
307        let result = retrieve_or_guess_primary_key(
308            rtxn,
309            index,
310            new_fields_ids_map,
311            primary_key_from_op,
312            None,
313        );
314        match result {
315            Ok(Ok((pk, _))) => {
316                primary_key.get_or_insert(pk);
317            }
318            Ok(Err(UserError::NoPrimaryKeyCandidateFound)) => (),
319            Ok(Err(user_error)) => return Err(Error::UserError(user_error)),
320            Err(error) => return Err(error),
321        };
322    }
323
324    Ok(new_docids_version_offsets)
325}
326
327fn extract_deletion_payload_changes<'s, 'pl: 's>(
328    index: &Index,
329    rtxn: &RoTxn,
330    available_docids: &mut AvailableIds,
331    main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
332    to_delete: &'pl [&'pl str],
333) -> Result<hashbrown::HashMap<&'s str, PayloadOperations<'pl>>> {
334    let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new();
335
336    for external_id in to_delete {
337        match main_docids_version_offsets.get(external_id) {
338            None => {
339                match index.external_documents_ids().get(rtxn, external_id) {
340                    Ok(Some(docid)) => {
341                        match new_docids_version_offsets.entry(external_id) {
342                            Entry::Occupied(mut entry) => entry.get_mut().push_deletion(),
343                            Entry::Vacant(entry) => {
344                                entry.insert(PayloadOperations::new_deletion(
345                                    docid, false, // is new
346                                ));
347                            }
348                        }
349                    }
350                    Ok(None) => {
351                        let docid = match available_docids.next() {
352                            Some(docid) => docid,
353                            None => return Err(UserError::DocumentLimitReached.into()),
354                        };
355                        match new_docids_version_offsets.entry(external_id) {
356                            Entry::Occupied(mut entry) => entry.get_mut().push_deletion(),
357                            Entry::Vacant(entry) => {
358                                entry.insert(PayloadOperations::new_deletion(
359                                    docid, true, // is new
360                                ));
361                            }
362                        }
363                    }
364                    Err(e) => return Err(e.into()),
365                }
366            }
367            Some(payload_operations) => match new_docids_version_offsets.entry(external_id) {
368                Entry::Occupied(mut entry) => entry.get_mut().push_deletion(),
369                Entry::Vacant(entry) => {
370                    entry.insert(PayloadOperations::new_deletion(
371                        payload_operations.docid,
372                        payload_operations.is_new,
373                    ));
374                }
375            },
376        }
377    }
378
379    Ok(new_docids_version_offsets)
380}
381
382fn merge_version_offsets<'s, 'pl>(
383    main: &mut hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
384    new: hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
385) {
386    // We cannot swap like nothing because documents
387    // operations must be in the right order.
388    if main.is_empty() {
389        return *main = new;
390    }
391
392    for (key, new_payload) in new {
393        match main.entry(key) {
394            Entry::Occupied(mut entry) => entry.get_mut().append_operations(new_payload.operations),
395            Entry::Vacant(entry) => {
396                entry.insert(new_payload);
397            }
398        }
399    }
400}
401
402impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
403    type Item = (&'pl str, PayloadOperations<'pl>);
404
405    fn iter(
406        &self,
407        chunk_size: usize,
408    ) -> impl rayon::prelude::IndexedParallelIterator<Item = impl AsRef<[Self::Item]>> {
409        self.docids_version_offsets.par_chunks(chunk_size)
410    }
411
412    fn item_to_document_change<'doc, T: MostlySend + 'doc>(
413        &'doc self,
414        context: &'doc DocumentChangeContext<T>,
415        item: &'doc Self::Item,
416    ) -> Result<Option<DocumentChange<'doc>>>
417    where
418        'pl: 'doc,
419    {
420        let (external_doc, payload_operations) = item;
421        payload_operations.merge(external_doc, &context.doc_alloc)
422    }
423
424    fn len(&self) -> usize {
425        self.docids_version_offsets.len()
426    }
427}
428
429pub struct DocumentOperationChanges<'pl> {
430    docids_version_offsets: &'pl [(&'pl str, PayloadOperations<'pl>)],
431}
432
433pub enum Payload<'pl> {
434    Replace(&'pl [u8]),
435    Update(&'pl [u8]),
436    Deletion(&'pl [&'pl str]),
437}
438
439pub struct PayloadStats {
440    pub bytes: u64,
441    pub document_count: u64,
442    pub error: Option<UserError>,
443}
444
445pub struct PayloadOperations<'pl> {
446    /// The internal document id of the document.
447    pub docid: DocumentId,
448    /// Wether this document is not in the current database (visible by the rtxn).
449    pub is_new: bool,
450    /// The operations to perform, in order, on this document.
451    pub operations: Vec<InnerDocOp<'pl>>,
452}
453
454impl<'pl> PayloadOperations<'pl> {
455    fn new_replacement(docid: DocumentId, is_new: bool, offset: DocumentOffset<'pl>) -> Self {
456        Self { docid, is_new, operations: vec![InnerDocOp::Replace(offset)] }
457    }
458
459    fn new_update(docid: DocumentId, is_new: bool, offset: DocumentOffset<'pl>) -> Self {
460        Self { docid, is_new, operations: vec![InnerDocOp::Update(offset)] }
461    }
462
463    fn new_deletion(docid: DocumentId, is_new: bool) -> Self {
464        Self { docid, is_new, operations: vec![InnerDocOp::Deletion] }
465    }
466}
467
468impl<'pl> PayloadOperations<'pl> {
469    fn push_replacement(&mut self, offset: DocumentOffset<'pl>) {
470        self.operations.clear();
471        self.operations.push(InnerDocOp::Replace(offset))
472    }
473
474    fn push_update(&mut self, offset: DocumentOffset<'pl>) {
475        self.operations.push(InnerDocOp::Update(offset))
476    }
477
478    fn push_deletion(&mut self) {
479        self.operations.clear();
480        self.operations.push(InnerDocOp::Deletion);
481    }
482
483    fn append_operations(&mut self, mut operations: Vec<InnerDocOp<'pl>>) {
484        debug_assert!(!operations.is_empty());
485        if matches!(operations.first(), Some(InnerDocOp::Deletion | InnerDocOp::Replace(_))) {
486            self.operations.clear();
487        }
488        self.operations.append(&mut operations);
489    }
490
491    /// Returns only the most recent version of a document based on the updates from the payloads.
492    ///
493    /// This function is only meant to be used when doing a replacement and not an update.
494    fn merge<'doc>(
495        &self,
496        external_doc: &'doc str,
497        doc_alloc: &'doc Bump,
498    ) -> Result<Option<DocumentChange<'doc>>>
499    where
500        'pl: 'doc,
501    {
502        match self.operations.last() {
503            Some(InnerDocOp::Replace(DocumentOffset { content })) => {
504                let document = serde_json::from_slice(content).unwrap();
505                let document =
506                    RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc)
507                        .map_err(UserError::SerdeJson)?;
508
509                if self.is_new {
510                    Ok(Some(DocumentChange::Insertion(Insertion::create(
511                        self.docid,
512                        external_doc,
513                        Versions::single(document),
514                    ))))
515                } else {
516                    Ok(Some(DocumentChange::Update(Update::create(
517                        self.docid,
518                        external_doc,
519                        Versions::single(document),
520                        true,
521                    ))))
522                }
523            }
524            Some(InnerDocOp::Update(_)) => {
525                // Search the first operation that is a tombstone which resets the document.
526                let last_tombstone = self
527                    .operations
528                    .iter()
529                    .rposition(|op| matches!(op, InnerDocOp::Deletion | InnerDocOp::Replace(_)));
530
531                // Track when we must ignore previous document versions from the rtxn.
532                let from_scratch = last_tombstone.is_some();
533
534                // We ignore deletion and keep the replacement to create the appropriate versions.
535                let operations = match last_tombstone {
536                    Some(i) => match self.operations[i] {
537                        InnerDocOp::Deletion => &self.operations[i + 1..],
538                        InnerDocOp::Replace(_) => &self.operations[i..],
539                        InnerDocOp::Update(_) => unreachable!("Found a non-tombstone operation"),
540                    },
541                    None => &self.operations[..],
542                };
543
544                // We collect the versions to generate the appropriate document.
545                let versions = operations.iter().map(|operation| {
546                    let DocumentOffset { content } = match operation {
547                        InnerDocOp::Replace(offset) | InnerDocOp::Update(offset) => offset,
548                        InnerDocOp::Deletion => unreachable!("Deletion in document operations"),
549                    };
550
551                    let document = serde_json::from_slice(content).unwrap();
552                    let document =
553                        RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc)
554                            .map_err(UserError::SerdeJson)?;
555
556                    Ok(document)
557                });
558
559                let Some(versions) = Versions::multiple(versions)? else { return Ok(None) };
560
561                if self.is_new {
562                    Ok(Some(DocumentChange::Insertion(Insertion::create(
563                        self.docid,
564                        external_doc,
565                        versions,
566                    ))))
567                } else {
568                    Ok(Some(DocumentChange::Update(Update::create(
569                        self.docid,
570                        external_doc,
571                        versions,
572                        from_scratch,
573                    ))))
574                }
575            }
576            Some(InnerDocOp::Deletion) => {
577                if self.is_new {
578                    Ok(None)
579                } else {
580                    let deletion = Deletion::create(self.docid, external_doc);
581                    Ok(Some(DocumentChange::Deletion(deletion)))
582                }
583            }
584            None => unreachable!("We must not have an empty set of operations on a document"),
585        }
586    }
587}
588
589#[derive(Clone)]
590pub enum InnerDocOp<'pl> {
591    Replace(DocumentOffset<'pl>),
592    Update(DocumentOffset<'pl>),
593    Deletion,
594}
595
596/// Represents an offset where a document lives
597/// in an mmapped grenad reader file.
598#[derive(Clone)]
599pub struct DocumentOffset<'pl> {
600    /// The mmapped payload files.
601    pub content: &'pl [u8],
602}
603
604/// Returns the first pointer of the first change in a document.
605///
606/// This is used to sort the documents in update file content order
607/// and read the update file in order to largely speed up the indexation.
608pub fn first_update_pointer(docops: &[InnerDocOp]) -> Option<usize> {
609    docops.iter().find_map(|ido: &_| match ido {
610        InnerDocOp::Replace(replace) => Some(replace.content.as_ptr() as usize),
611        InnerDocOp::Update(update) => Some(update.content.as_ptr() as usize),
612        InnerDocOp::Deletion => None,
613    })
614}