milli_core/update/new/indexer/
document_changes.rs

1use std::cell::{Cell, RefCell};
2use std::sync::atomic::Ordering;
3use std::sync::{Arc, RwLock};
4
5use bumpalo::Bump;
6use heed::{RoTxn, WithoutTls};
7use rayon::iter::IndexedParallelIterator;
8
9use super::super::document_change::DocumentChange;
10use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
11use crate::progress::{AtomicDocumentStep, Progress};
12use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
13use crate::update::new::steps::IndexingStep;
14use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
15use crate::update::GrenadParameters;
16use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
17
18pub struct DocumentChangeContext<
19    'doc,             // covariant lifetime of a single `process` call
20    'extractor: 'doc, // invariant lifetime of the extractor_allocs
21    'fid: 'doc,       // invariant lifetime of the new_fields_ids_map
22    'indexer: 'doc,   // covariant lifetime of objects that outlive a single `process` call
23    T: MostlySend,
24> {
25    /// The index we're indexing in
26    pub index: &'indexer Index,
27    /// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents
28    /// inside of the DB.
29    pub db_fields_ids_map: &'indexer FieldsIdsMap,
30    /// A transaction providing data from the DB before all indexing operations
31    pub rtxn: RoTxn<'indexer, WithoutTls>,
32
33    /// Global field id map that is up to date with the current state of the indexing process.
34    ///
35    /// - Inserting a field will take a lock
36    /// - Retrieving a field may take a lock as well
37    pub new_fields_ids_map: &'doc std::cell::RefCell<GlobalFieldsIdsMap<'fid>>,
38
39    /// Data allocated in this allocator is cleared between each call to `process`.
40    pub doc_alloc: Bump,
41
42    /// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills.
43    pub extractor_alloc: &'extractor Bump,
44
45    /// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents
46    doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
47
48    /// Extractor-specific data
49    pub data: &'doc T,
50}
51
52impl<
53        'doc,             // covariant lifetime of a single `process` call
54        'data: 'doc,      // invariant on T lifetime of the datastore
55        'extractor: 'doc, // invariant lifetime of extractor_allocs
56        'fid: 'doc,       // invariant lifetime of fields ids map
57        'indexer: 'doc,   // covariant lifetime of objects that survive a `process` call
58        T: MostlySend,
59    > DocumentChangeContext<'doc, 'extractor, 'fid, 'indexer, T>
60{
61    #[allow(clippy::too_many_arguments)]
62    pub fn new<F>(
63        index: &'indexer Index,
64        db_fields_ids_map: &'indexer FieldsIdsMap,
65        new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
66        extractor_allocs: &'extractor ThreadLocal<FullySend<Bump>>,
67        doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
68        datastore: &'data ThreadLocal<T>,
69        fields_ids_map_store: &'doc ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
70        init_data: F,
71    ) -> Result<Self>
72    where
73        F: FnOnce(&'extractor Bump) -> Result<T>,
74    {
75        let doc_alloc =
76            doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024))));
77        let doc_alloc = doc_alloc.0.take();
78        let fields_ids_map = fields_ids_map_store
79            .get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into());
80
81        let fields_ids_map = &fields_ids_map.0;
82        let extractor_alloc = extractor_allocs.get_or_default();
83
84        let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?;
85
86        let txn = index.read_txn()?;
87        Ok(DocumentChangeContext {
88            index,
89            rtxn: txn,
90            db_fields_ids_map,
91            new_fields_ids_map: fields_ids_map,
92            doc_alloc,
93            extractor_alloc: &extractor_alloc.0,
94            data,
95            doc_allocs,
96        })
97    }
98}
99
100/// An internal iterator (i.e. using `foreach`) of `DocumentChange`s
101pub trait Extractor<'extractor>: Sync {
102    type Data: MostlySend;
103
104    fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data>;
105
106    fn process<'doc>(
107        &'doc self,
108        changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
109        context: &'doc DocumentChangeContext<Self::Data>,
110    ) -> Result<()>;
111}
112
113pub trait DocumentChanges<'pl // lifetime of the underlying payload
114>: Sync {
115    type Item: Send;
116
117    fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator<Item = impl AsRef<[Self::Item]>>;
118
119    fn len(&self) -> usize;
120
121    fn is_empty(&self) -> bool {
122        self.len() == 0
123    }
124
125    fn item_to_document_change<'doc, // lifetime of a single `process` call
126     T: MostlySend>(
127        &'doc self,
128        context: &'doc DocumentChangeContext<T>,
129        item: &'doc Self::Item,
130    ) -> Result<Option<DocumentChange<'doc>>> where 'pl: 'doc // the payload must survive the process calls
131    ;
132}
133
134pub struct IndexingContext<
135    'fid,     // invariant lifetime of fields ids map
136    'indexer, // covariant lifetime of objects that are borrowed  during the entire indexing operation
137    'index,   // covariant lifetime of the index
138    MSP,
139> where
140    MSP: Fn() -> bool + Sync,
141{
142    pub index: &'index Index,
143    pub db_fields_ids_map: &'indexer FieldsIdsMap,
144    pub new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
145    pub doc_allocs: &'indexer ThreadLocal<FullySend<Cell<Bump>>>,
146    pub fields_ids_map_store: &'indexer ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
147    pub must_stop_processing: &'indexer MSP,
148    pub progress: &'indexer Progress,
149    pub grenad_parameters: &'indexer GrenadParameters,
150}
151
152impl<MSP> Copy
153    for IndexingContext<
154        '_, // invariant lifetime of fields ids map
155        '_, // covariant lifetime of objects that are borrowed  during the entire indexing operation
156        '_, // covariant lifetime of the index
157        MSP,
158    >
159where
160    MSP: Fn() -> bool + Sync,
161{
162}
163
164impl<MSP> Clone
165    for IndexingContext<
166        '_, // invariant lifetime of fields ids map
167        '_, // covariant lifetime of objects that are borrowed  during the entire indexing operation
168        '_, // covariant lifetime of the index
169        MSP,
170    >
171where
172    MSP: Fn() -> bool + Sync,
173{
174    fn clone(&self) -> Self {
175        *self
176    }
177}
178
179const CHUNK_SIZE: usize = 100;
180
181pub fn extract<
182    'pl,        // covariant lifetime of the underlying payload
183    'extractor, // invariant lifetime of extractor_alloc
184    'fid,       // invariant lifetime of fields ids map
185    'indexer,   // covariant lifetime of objects that are borrowed during the entire indexing
186    'data,      // invariant on EX::Data lifetime of datastore
187    'index,     // covariant lifetime of the index
188    EX,
189    DC: DocumentChanges<'pl>,
190    MSP,
191>(
192    document_changes: &DC,
193    extractor: &EX,
194    IndexingContext {
195        index,
196        db_fields_ids_map,
197        new_fields_ids_map,
198        doc_allocs,
199        fields_ids_map_store,
200        must_stop_processing,
201        progress,
202        grenad_parameters: _,
203    }: IndexingContext<'fid, 'indexer, 'index, MSP>,
204    extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
205    datastore: &'data ThreadLocal<EX::Data>,
206    step: IndexingStep,
207) -> Result<()>
208where
209    EX: Extractor<'extractor>,
210    MSP: Fn() -> bool + Sync,
211{
212    tracing::trace!("We are resetting the extractor allocators");
213    progress.update_progress(step);
214    // Clean up and reuse the extractor allocs
215    for extractor_alloc in extractor_allocs.iter_mut() {
216        tracing::trace!("\tWith {} bytes reset", extractor_alloc.0.allocated_bytes());
217        extractor_alloc.0.reset();
218    }
219
220    let total_documents = document_changes.len() as u32;
221    let (step, progress_step) = AtomicDocumentStep::new(total_documents);
222    progress.update_progress(progress_step);
223
224    let pi = document_changes.iter(CHUNK_SIZE);
225    pi.try_arc_for_each_try_init(
226        || {
227            DocumentChangeContext::new(
228                index,
229                db_fields_ids_map,
230                new_fields_ids_map,
231                extractor_allocs,
232                doc_allocs,
233                datastore,
234                fields_ids_map_store,
235                move |index_alloc| extractor.init_data(index_alloc),
236            )
237        },
238        |context, items| {
239            if (must_stop_processing)() {
240                return Err(Arc::new(InternalError::AbortedIndexation.into()));
241            }
242
243            // Clean up and reuse the document-specific allocator
244            context.doc_alloc.reset();
245
246            let items = items.as_ref();
247            let changes = items.iter().filter_map(|item| {
248                document_changes.item_to_document_change(context, item).transpose()
249            });
250
251            let res = extractor.process(changes, context).map_err(Arc::new);
252            step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed);
253
254            // send back the doc_alloc in the pool
255            context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));
256
257            res
258        },
259    )?;
260    step.store(total_documents, Ordering::Relaxed);
261
262    Ok(())
263}