milli_core/update/new/indexer/
document_changes.rs1use std::cell::{Cell, RefCell};
2use std::sync::atomic::Ordering;
3use std::sync::{Arc, RwLock};
4
5use bumpalo::Bump;
6use heed::{RoTxn, WithoutTls};
7use rayon::iter::IndexedParallelIterator;
8
9use super::super::document_change::DocumentChange;
10use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
11use crate::progress::{AtomicDocumentStep, Progress};
12use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
13use crate::update::new::steps::IndexingStep;
14use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
15use crate::update::GrenadParameters;
16use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
17
18pub struct DocumentChangeContext<
19 'doc, 'extractor: 'doc, 'fid: 'doc, 'indexer: 'doc, T: MostlySend,
24> {
25 pub index: &'indexer Index,
27 pub db_fields_ids_map: &'indexer FieldsIdsMap,
30 pub rtxn: RoTxn<'indexer, WithoutTls>,
32
33 pub new_fields_ids_map: &'doc std::cell::RefCell<GlobalFieldsIdsMap<'fid>>,
38
39 pub doc_alloc: Bump,
41
42 pub extractor_alloc: &'extractor Bump,
44
45 doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
47
48 pub data: &'doc T,
50}
51
52impl<
53 'doc, 'data: 'doc, 'extractor: 'doc, 'fid: 'doc, 'indexer: 'doc, T: MostlySend,
59 > DocumentChangeContext<'doc, 'extractor, 'fid, 'indexer, T>
60{
61 #[allow(clippy::too_many_arguments)]
62 pub fn new<F>(
63 index: &'indexer Index,
64 db_fields_ids_map: &'indexer FieldsIdsMap,
65 new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
66 extractor_allocs: &'extractor ThreadLocal<FullySend<Bump>>,
67 doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
68 datastore: &'data ThreadLocal<T>,
69 fields_ids_map_store: &'doc ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
70 init_data: F,
71 ) -> Result<Self>
72 where
73 F: FnOnce(&'extractor Bump) -> Result<T>,
74 {
75 let doc_alloc =
76 doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024))));
77 let doc_alloc = doc_alloc.0.take();
78 let fields_ids_map = fields_ids_map_store
79 .get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into());
80
81 let fields_ids_map = &fields_ids_map.0;
82 let extractor_alloc = extractor_allocs.get_or_default();
83
84 let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?;
85
86 let txn = index.read_txn()?;
87 Ok(DocumentChangeContext {
88 index,
89 rtxn: txn,
90 db_fields_ids_map,
91 new_fields_ids_map: fields_ids_map,
92 doc_alloc,
93 extractor_alloc: &extractor_alloc.0,
94 data,
95 doc_allocs,
96 })
97 }
98}
99
100pub trait Extractor<'extractor>: Sync {
102 type Data: MostlySend;
103
104 fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data>;
105
106 fn process<'doc>(
107 &'doc self,
108 changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
109 context: &'doc DocumentChangeContext<Self::Data>,
110 ) -> Result<()>;
111}
112
113pub trait DocumentChanges<'pl >: Sync {
115 type Item: Send;
116
117 fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator<Item = impl AsRef<[Self::Item]>>;
118
119 fn len(&self) -> usize;
120
121 fn is_empty(&self) -> bool {
122 self.len() == 0
123 }
124
125 fn item_to_document_change<'doc, T: MostlySend>(
127 &'doc self,
128 context: &'doc DocumentChangeContext<T>,
129 item: &'doc Self::Item,
130 ) -> Result<Option<DocumentChange<'doc>>> where 'pl: 'doc ;
132}
133
134pub struct IndexingContext<
135 'fid, 'indexer, 'index, MSP,
139> where
140 MSP: Fn() -> bool + Sync,
141{
142 pub index: &'index Index,
143 pub db_fields_ids_map: &'indexer FieldsIdsMap,
144 pub new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
145 pub doc_allocs: &'indexer ThreadLocal<FullySend<Cell<Bump>>>,
146 pub fields_ids_map_store: &'indexer ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
147 pub must_stop_processing: &'indexer MSP,
148 pub progress: &'indexer Progress,
149 pub grenad_parameters: &'indexer GrenadParameters,
150}
151
152impl<MSP> Copy
153 for IndexingContext<
154 '_, '_, '_, MSP,
158 >
159where
160 MSP: Fn() -> bool + Sync,
161{
162}
163
164impl<MSP> Clone
165 for IndexingContext<
166 '_, '_, '_, MSP,
170 >
171where
172 MSP: Fn() -> bool + Sync,
173{
174 fn clone(&self) -> Self {
175 *self
176 }
177}
178
179const CHUNK_SIZE: usize = 100;
180
181pub fn extract<
182 'pl, 'extractor, 'fid, 'indexer, 'data, 'index, EX,
189 DC: DocumentChanges<'pl>,
190 MSP,
191>(
192 document_changes: &DC,
193 extractor: &EX,
194 IndexingContext {
195 index,
196 db_fields_ids_map,
197 new_fields_ids_map,
198 doc_allocs,
199 fields_ids_map_store,
200 must_stop_processing,
201 progress,
202 grenad_parameters: _,
203 }: IndexingContext<'fid, 'indexer, 'index, MSP>,
204 extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
205 datastore: &'data ThreadLocal<EX::Data>,
206 step: IndexingStep,
207) -> Result<()>
208where
209 EX: Extractor<'extractor>,
210 MSP: Fn() -> bool + Sync,
211{
212 tracing::trace!("We are resetting the extractor allocators");
213 progress.update_progress(step);
214 for extractor_alloc in extractor_allocs.iter_mut() {
216 tracing::trace!("\tWith {} bytes reset", extractor_alloc.0.allocated_bytes());
217 extractor_alloc.0.reset();
218 }
219
220 let total_documents = document_changes.len() as u32;
221 let (step, progress_step) = AtomicDocumentStep::new(total_documents);
222 progress.update_progress(progress_step);
223
224 let pi = document_changes.iter(CHUNK_SIZE);
225 pi.try_arc_for_each_try_init(
226 || {
227 DocumentChangeContext::new(
228 index,
229 db_fields_ids_map,
230 new_fields_ids_map,
231 extractor_allocs,
232 doc_allocs,
233 datastore,
234 fields_ids_map_store,
235 move |index_alloc| extractor.init_data(index_alloc),
236 )
237 },
238 |context, items| {
239 if (must_stop_processing)() {
240 return Err(Arc::new(InternalError::AbortedIndexation.into()));
241 }
242
243 context.doc_alloc.reset();
245
246 let items = items.as_ref();
247 let changes = items.iter().filter_map(|item| {
248 document_changes.item_to_document_change(context, item).transpose()
249 });
250
251 let res = extractor.process(changes, context).map_err(Arc::new);
252 step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed);
253
254 context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));
256
257 res
258 },
259 )?;
260 step.store(total_documents, Ordering::Relaxed);
261
262 Ok(())
263}