1use std::sync::atomic::Ordering;
2
3use bumpalo::collections::CollectIn;
4use bumpalo::Bump;
5use bumparaw_collections::RawMap;
6use hashbrown::hash_map::Entry;
7use heed::RoTxn;
8use memmap2::Mmap;
9use rayon::slice::ParallelSlice;
10use rustc_hash::FxBuildHasher;
11use serde_json::value::RawValue;
12use serde_json::Deserializer;
13
14use super::super::document_change::DocumentChange;
15use super::document_changes::{DocumentChangeContext, DocumentChanges};
16use super::guess_primary_key::retrieve_or_guess_primary_key;
17use crate::documents::PrimaryKey;
18use crate::progress::{AtomicPayloadStep, Progress};
19use crate::update::new::document::Versions;
20use crate::update::new::steps::IndexingStep;
21use crate::update::new::thread_local::MostlySend;
22use crate::update::new::{Deletion, Insertion, Update};
23use crate::update::{AvailableIds, IndexDocumentsMethod};
24use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError};
25
26#[derive(Default)]
27pub struct DocumentOperation<'pl> {
28 operations: Vec<Payload<'pl>>,
29}
30
31impl<'pl> DocumentOperation<'pl> {
32 pub fn new() -> Self {
33 Self { operations: Default::default() }
34 }
35
36 pub fn replace_documents(&mut self, payload: &'pl Mmap) -> Result<()> {
40 #[cfg(unix)]
41 payload.advise(memmap2::Advice::Sequential)?;
42 self.operations.push(Payload::Replace(&payload[..]));
43 Ok(())
44 }
45
46 pub fn update_documents(&mut self, payload: &'pl Mmap) -> Result<()> {
50 #[cfg(unix)]
51 payload.advise(memmap2::Advice::Sequential)?;
52 self.operations.push(Payload::Update(&payload[..]));
53 Ok(())
54 }
55
56 pub fn delete_documents(&mut self, to_delete: &'pl [&'pl str]) {
60 self.operations.push(Payload::Deletion(to_delete))
61 }
62
63 #[allow(clippy::too_many_arguments)]
64 #[tracing::instrument(level = "trace", skip_all, target = "indexing::document_operation")]
65 pub fn into_changes<MSP>(
66 self,
67 indexer: &'pl Bump,
68 index: &Index,
69 rtxn: &'pl RoTxn<'pl>,
70 primary_key_from_op: Option<&'pl str>,
71 new_fields_ids_map: &mut FieldsIdsMap,
72 must_stop_processing: &MSP,
73 progress: Progress,
74 ) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)>
75 where
76 MSP: Fn() -> bool,
77 {
78 progress.update_progress(IndexingStep::PreparingPayloads);
79 let Self { operations } = self;
80
81 let documents_ids = index.documents_ids(rtxn)?;
82 let mut operations_stats = Vec::new();
83 let mut available_docids = AvailableIds::new(&documents_ids);
84 let mut docids_version_offsets = hashbrown::HashMap::new();
85 let mut primary_key = None;
86
87 let payload_count = operations.len();
88 let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32);
89 progress.update_progress(progress_step);
90
91 for (payload_index, operation) in operations.into_iter().enumerate() {
92 if must_stop_processing() {
93 return Err(InternalError::AbortedIndexation.into());
94 }
95 step.store(payload_index as u32, Ordering::Relaxed);
96
97 let mut bytes = 0;
98 let result = match operation {
99 Payload::Replace(payload) => extract_addition_payload_changes(
100 indexer,
101 index,
102 rtxn,
103 primary_key_from_op,
104 &mut primary_key,
105 new_fields_ids_map,
106 &mut available_docids,
107 &mut bytes,
108 &docids_version_offsets,
109 IndexDocumentsMethod::ReplaceDocuments,
110 payload,
111 ),
112 Payload::Update(payload) => extract_addition_payload_changes(
113 indexer,
114 index,
115 rtxn,
116 primary_key_from_op,
117 &mut primary_key,
118 new_fields_ids_map,
119 &mut available_docids,
120 &mut bytes,
121 &docids_version_offsets,
122 IndexDocumentsMethod::UpdateDocuments,
123 payload,
124 ),
125 Payload::Deletion(to_delete) => extract_deletion_payload_changes(
126 index,
127 rtxn,
128 &mut available_docids,
129 &docids_version_offsets,
130 to_delete,
131 ),
132 };
133
134 let mut document_count = 0;
135 let error = match result {
136 Ok(new_docids_version_offsets) => {
137 document_count = new_docids_version_offsets.len() as u64;
138 merge_version_offsets(&mut docids_version_offsets, new_docids_version_offsets);
141 None
142 }
143 Err(Error::UserError(user_error)) => Some(user_error),
144 Err(e) => return Err(e),
145 };
146 operations_stats.push(PayloadStats { document_count, bytes, error });
147 }
148 step.store(payload_count as u32, Ordering::Relaxed);
149
150 let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> =
152 docids_version_offsets.drain().collect_in(indexer);
153
154 docids_version_offsets
157 .sort_unstable_by_key(|(_, po)| first_update_pointer(&po.operations).unwrap_or(0));
158
159 let docids_version_offsets = docids_version_offsets.into_bump_slice();
160 Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key))
161 }
162}
163
164#[allow(clippy::too_many_arguments)]
165fn extract_addition_payload_changes<'r, 'pl: 'r>(
166 indexer: &'pl Bump,
167 index: &Index,
168 rtxn: &'r RoTxn<'r>,
169 primary_key_from_op: Option<&'r str>,
170 primary_key: &mut Option<PrimaryKey<'r>>,
171 new_fields_ids_map: &mut FieldsIdsMap,
172 available_docids: &mut AvailableIds,
173 bytes: &mut u64,
174 main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>,
175 method: IndexDocumentsMethod,
176 payload: &'pl [u8],
177) -> Result<hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>> {
178 use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments};
179
180 let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new();
181
182 let mut previous_offset = 0;
183 let mut iter = Deserializer::from_slice(payload).into_iter::<&RawValue>();
184 while let Some(doc) = iter.next().transpose().map_err(InternalError::SerdeJson)? {
185 *bytes = previous_offset as u64;
186
187 let retrieved_primary_key = if previous_offset == 0 {
189 let doc = RawMap::from_raw_value_and_hasher(doc, FxBuildHasher, indexer)
190 .map(Some)
191 .map_err(UserError::SerdeJson)?;
192
193 let result = retrieve_or_guess_primary_key(
194 rtxn,
195 index,
196 new_fields_ids_map,
197 primary_key_from_op,
198 doc,
199 );
200
201 let (pk, _has_been_changed) = match result {
202 Ok(Ok(pk)) => pk,
203 Ok(Err(user_error)) => return Err(Error::UserError(user_error)),
204 Err(error) => return Err(error),
205 };
206
207 primary_key.get_or_insert(pk)
208 } else {
209 primary_key.as_ref().unwrap()
211 };
212
213 let external_id =
214 retrieved_primary_key.extract_fields_and_docid(doc, new_fields_ids_map, indexer)?;
215
216 let external_id = external_id.to_de();
217 let current_offset = iter.byte_offset();
218 let document_offset = DocumentOffset { content: &payload[previous_offset..current_offset] };
219
220 match main_docids_version_offsets.get(external_id) {
221 None => {
222 match index.external_documents_ids().get(rtxn, external_id) {
223 Ok(Some(docid)) => match new_docids_version_offsets.entry(external_id) {
224 Entry::Occupied(mut entry) => match method {
225 ReplaceDocuments => entry.get_mut().push_replacement(document_offset),
226 UpdateDocuments => entry.get_mut().push_update(document_offset),
227 },
228 Entry::Vacant(entry) => {
229 match method {
230 ReplaceDocuments => {
231 entry.insert(PayloadOperations::new_replacement(
232 docid,
233 false, document_offset,
235 ));
236 }
237 UpdateDocuments => {
238 entry.insert(PayloadOperations::new_update(
239 docid,
240 false, document_offset,
242 ));
243 }
244 }
245 }
246 },
247 Ok(None) => match new_docids_version_offsets.entry(external_id) {
248 Entry::Occupied(mut entry) => match method {
249 ReplaceDocuments => entry.get_mut().push_replacement(document_offset),
250 UpdateDocuments => entry.get_mut().push_update(document_offset),
251 },
252 Entry::Vacant(entry) => {
253 let docid = match available_docids.next() {
254 Some(docid) => docid,
255 None => return Err(UserError::DocumentLimitReached.into()),
256 };
257
258 match method {
259 ReplaceDocuments => {
260 entry.insert(PayloadOperations::new_replacement(
261 docid,
262 true, document_offset,
264 ));
265 }
266 UpdateDocuments => {
267 entry.insert(PayloadOperations::new_update(
268 docid,
269 true, document_offset,
271 ));
272 }
273 }
274 }
275 },
276 Err(e) => return Err(e.into()),
277 }
278 }
279 Some(payload_operations) => match new_docids_version_offsets.entry(external_id) {
280 Entry::Occupied(mut entry) => match method {
281 ReplaceDocuments => entry.get_mut().push_replacement(document_offset),
282 UpdateDocuments => entry.get_mut().push_update(document_offset),
283 },
284 Entry::Vacant(entry) => match method {
285 ReplaceDocuments => {
286 entry.insert(PayloadOperations::new_replacement(
287 payload_operations.docid,
288 payload_operations.is_new,
289 document_offset,
290 ));
291 }
292 UpdateDocuments => {
293 entry.insert(PayloadOperations::new_update(
294 payload_operations.docid,
295 payload_operations.is_new,
296 document_offset,
297 ));
298 }
299 },
300 },
301 }
302
303 previous_offset = iter.byte_offset();
304 }
305
306 if payload.is_empty() {
307 let result = retrieve_or_guess_primary_key(
308 rtxn,
309 index,
310 new_fields_ids_map,
311 primary_key_from_op,
312 None,
313 );
314 match result {
315 Ok(Ok((pk, _))) => {
316 primary_key.get_or_insert(pk);
317 }
318 Ok(Err(UserError::NoPrimaryKeyCandidateFound)) => (),
319 Ok(Err(user_error)) => return Err(Error::UserError(user_error)),
320 Err(error) => return Err(error),
321 };
322 }
323
324 Ok(new_docids_version_offsets)
325}
326
327fn extract_deletion_payload_changes<'s, 'pl: 's>(
328 index: &Index,
329 rtxn: &RoTxn,
330 available_docids: &mut AvailableIds,
331 main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
332 to_delete: &'pl [&'pl str],
333) -> Result<hashbrown::HashMap<&'s str, PayloadOperations<'pl>>> {
334 let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new();
335
336 for external_id in to_delete {
337 match main_docids_version_offsets.get(external_id) {
338 None => {
339 match index.external_documents_ids().get(rtxn, external_id) {
340 Ok(Some(docid)) => {
341 match new_docids_version_offsets.entry(external_id) {
342 Entry::Occupied(mut entry) => entry.get_mut().push_deletion(),
343 Entry::Vacant(entry) => {
344 entry.insert(PayloadOperations::new_deletion(
345 docid, false, ));
347 }
348 }
349 }
350 Ok(None) => {
351 let docid = match available_docids.next() {
352 Some(docid) => docid,
353 None => return Err(UserError::DocumentLimitReached.into()),
354 };
355 match new_docids_version_offsets.entry(external_id) {
356 Entry::Occupied(mut entry) => entry.get_mut().push_deletion(),
357 Entry::Vacant(entry) => {
358 entry.insert(PayloadOperations::new_deletion(
359 docid, true, ));
361 }
362 }
363 }
364 Err(e) => return Err(e.into()),
365 }
366 }
367 Some(payload_operations) => match new_docids_version_offsets.entry(external_id) {
368 Entry::Occupied(mut entry) => entry.get_mut().push_deletion(),
369 Entry::Vacant(entry) => {
370 entry.insert(PayloadOperations::new_deletion(
371 payload_operations.docid,
372 payload_operations.is_new,
373 ));
374 }
375 },
376 }
377 }
378
379 Ok(new_docids_version_offsets)
380}
381
382fn merge_version_offsets<'s, 'pl>(
383 main: &mut hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
384 new: hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
385) {
386 if main.is_empty() {
389 return *main = new;
390 }
391
392 for (key, new_payload) in new {
393 match main.entry(key) {
394 Entry::Occupied(mut entry) => entry.get_mut().append_operations(new_payload.operations),
395 Entry::Vacant(entry) => {
396 entry.insert(new_payload);
397 }
398 }
399 }
400}
401
402impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
403 type Item = (&'pl str, PayloadOperations<'pl>);
404
405 fn iter(
406 &self,
407 chunk_size: usize,
408 ) -> impl rayon::prelude::IndexedParallelIterator<Item = impl AsRef<[Self::Item]>> {
409 self.docids_version_offsets.par_chunks(chunk_size)
410 }
411
412 fn item_to_document_change<'doc, T: MostlySend + 'doc>(
413 &'doc self,
414 context: &'doc DocumentChangeContext<T>,
415 item: &'doc Self::Item,
416 ) -> Result<Option<DocumentChange<'doc>>>
417 where
418 'pl: 'doc,
419 {
420 let (external_doc, payload_operations) = item;
421 payload_operations.merge(external_doc, &context.doc_alloc)
422 }
423
424 fn len(&self) -> usize {
425 self.docids_version_offsets.len()
426 }
427}
428
429pub struct DocumentOperationChanges<'pl> {
430 docids_version_offsets: &'pl [(&'pl str, PayloadOperations<'pl>)],
431}
432
433pub enum Payload<'pl> {
434 Replace(&'pl [u8]),
435 Update(&'pl [u8]),
436 Deletion(&'pl [&'pl str]),
437}
438
439pub struct PayloadStats {
440 pub bytes: u64,
441 pub document_count: u64,
442 pub error: Option<UserError>,
443}
444
445pub struct PayloadOperations<'pl> {
446 pub docid: DocumentId,
448 pub is_new: bool,
450 pub operations: Vec<InnerDocOp<'pl>>,
452}
453
454impl<'pl> PayloadOperations<'pl> {
455 fn new_replacement(docid: DocumentId, is_new: bool, offset: DocumentOffset<'pl>) -> Self {
456 Self { docid, is_new, operations: vec![InnerDocOp::Replace(offset)] }
457 }
458
459 fn new_update(docid: DocumentId, is_new: bool, offset: DocumentOffset<'pl>) -> Self {
460 Self { docid, is_new, operations: vec![InnerDocOp::Update(offset)] }
461 }
462
463 fn new_deletion(docid: DocumentId, is_new: bool) -> Self {
464 Self { docid, is_new, operations: vec![InnerDocOp::Deletion] }
465 }
466}
467
468impl<'pl> PayloadOperations<'pl> {
469 fn push_replacement(&mut self, offset: DocumentOffset<'pl>) {
470 self.operations.clear();
471 self.operations.push(InnerDocOp::Replace(offset))
472 }
473
474 fn push_update(&mut self, offset: DocumentOffset<'pl>) {
475 self.operations.push(InnerDocOp::Update(offset))
476 }
477
478 fn push_deletion(&mut self) {
479 self.operations.clear();
480 self.operations.push(InnerDocOp::Deletion);
481 }
482
483 fn append_operations(&mut self, mut operations: Vec<InnerDocOp<'pl>>) {
484 debug_assert!(!operations.is_empty());
485 if matches!(operations.first(), Some(InnerDocOp::Deletion | InnerDocOp::Replace(_))) {
486 self.operations.clear();
487 }
488 self.operations.append(&mut operations);
489 }
490
491 fn merge<'doc>(
495 &self,
496 external_doc: &'doc str,
497 doc_alloc: &'doc Bump,
498 ) -> Result<Option<DocumentChange<'doc>>>
499 where
500 'pl: 'doc,
501 {
502 match self.operations.last() {
503 Some(InnerDocOp::Replace(DocumentOffset { content })) => {
504 let document = serde_json::from_slice(content).unwrap();
505 let document =
506 RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc)
507 .map_err(UserError::SerdeJson)?;
508
509 if self.is_new {
510 Ok(Some(DocumentChange::Insertion(Insertion::create(
511 self.docid,
512 external_doc,
513 Versions::single(document),
514 ))))
515 } else {
516 Ok(Some(DocumentChange::Update(Update::create(
517 self.docid,
518 external_doc,
519 Versions::single(document),
520 true,
521 ))))
522 }
523 }
524 Some(InnerDocOp::Update(_)) => {
525 let last_tombstone = self
527 .operations
528 .iter()
529 .rposition(|op| matches!(op, InnerDocOp::Deletion | InnerDocOp::Replace(_)));
530
531 let from_scratch = last_tombstone.is_some();
533
534 let operations = match last_tombstone {
536 Some(i) => match self.operations[i] {
537 InnerDocOp::Deletion => &self.operations[i + 1..],
538 InnerDocOp::Replace(_) => &self.operations[i..],
539 InnerDocOp::Update(_) => unreachable!("Found a non-tombstone operation"),
540 },
541 None => &self.operations[..],
542 };
543
544 let versions = operations.iter().map(|operation| {
546 let DocumentOffset { content } = match operation {
547 InnerDocOp::Replace(offset) | InnerDocOp::Update(offset) => offset,
548 InnerDocOp::Deletion => unreachable!("Deletion in document operations"),
549 };
550
551 let document = serde_json::from_slice(content).unwrap();
552 let document =
553 RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc)
554 .map_err(UserError::SerdeJson)?;
555
556 Ok(document)
557 });
558
559 let Some(versions) = Versions::multiple(versions)? else { return Ok(None) };
560
561 if self.is_new {
562 Ok(Some(DocumentChange::Insertion(Insertion::create(
563 self.docid,
564 external_doc,
565 versions,
566 ))))
567 } else {
568 Ok(Some(DocumentChange::Update(Update::create(
569 self.docid,
570 external_doc,
571 versions,
572 from_scratch,
573 ))))
574 }
575 }
576 Some(InnerDocOp::Deletion) => {
577 if self.is_new {
578 Ok(None)
579 } else {
580 let deletion = Deletion::create(self.docid, external_doc);
581 Ok(Some(DocumentChange::Deletion(deletion)))
582 }
583 }
584 None => unreachable!("We must not have an empty set of operations on a document"),
585 }
586 }
587}
588
589#[derive(Clone)]
590pub enum InnerDocOp<'pl> {
591 Replace(DocumentOffset<'pl>),
592 Update(DocumentOffset<'pl>),
593 Deletion,
594}
595
596#[derive(Clone)]
599pub struct DocumentOffset<'pl> {
600 pub content: &'pl [u8],
602}
603
604pub fn first_update_pointer(docops: &[InnerDocOp]) -> Option<usize> {
609 docops.iter().find_map(|ido: &_| match ido {
610 InnerDocOp::Replace(replace) => Some(replace.content.as_ptr() as usize),
611 InnerDocOp::Update(update) => Some(update.content.as_ptr() as usize),
612 InnerDocOp::Deletion => None,
613 })
614}