summa_core/components/
index_writer_holder.rs

1use std::collections::hash_map::RandomState;
2use std::collections::HashSet;
3use std::path::Path;
4use std::sync::{Arc, RwLock};
5
6use chrono::{DateTime, Datelike};
7use rand::RngCore;
8use summa_proto::proto;
9use tantivy::index::SegmentId;
10use tantivy::merge_policy::MergePolicy;
11use tantivy::query::Query;
12use tantivy::schema::document::ReferenceValueLeaf;
13use tantivy::schema::document::{CompactDocObjectIter, CompactDocValue, ReferenceValue};
14use tantivy::schema::{Field, FieldType, OwnedValue, Value};
15use tantivy::{Directory, Document, Index, IndexWriter, Opstamp, SegmentMeta, SingleSegmentIndexWriter, TantivyDocument, Term};
16use tracing::info;
17
18use super::SummaSegmentAttributes;
19use crate::configs::core::WriterThreads;
20use crate::errors::{SummaResult, ValidationError};
21use crate::Error;
22
23fn extract_flatten<'a, T: AsRef<str>>(v: CompactDocValue<'a>, parts: &[T], buffer: &mut Vec<OwnedValue>) {
24    let mut current = v;
25    for (i, part) in parts.iter().enumerate() {
26        match current.as_value() {
27            ReferenceValue::Object(m) => {
28                for (key, value) in m {
29                    if key == part.as_ref() {
30                        current = value;
31                        break;
32                    }
33                }
34            }
35            ReferenceValue::Array(a) => {
36                for child in a {
37                    extract_flatten(child, &parts[i..], buffer)
38                }
39            }
40            _ => break,
41        }
42    }
43    if let ReferenceValue::Leaf(_) = current.as_value() {
44        buffer.push(OwnedValue::from(current))
45    }
46}
47fn extract_flatten_from_map<'a, T: AsRef<str>>(m: CompactDocObjectIter<'a>, parts: &[T], buffer: &mut Vec<OwnedValue>) {
48    for (key, value) in m {
49        if key == parts[0].as_ref() {
50            match value.as_value() {
51                ReferenceValue::Leaf(_) => {}
52                ReferenceValue::Array(a) => {
53                    for child in a {
54                        extract_flatten(child, &parts[1..], buffer)
55                    }
56                }
57                ReferenceValue::Object(child) => extract_flatten_from_map(child, &parts[1..], buffer),
58            }
59        }
60    }
61}
62
63#[inline]
64fn generate_id() -> String {
65    let mut data = [0u8; 16];
66    rand::thread_rng().fill_bytes(&mut data);
67    base36::encode(&data)
68}
69
70/// Wrap `tantivy::SingleSegmentIndexWriter` and allows to recreate it
71pub struct SingleIndexWriter {
72    pub index_writer: RwLock<SingleSegmentIndexWriter>,
73    pub index: Index,
74    pub writer_heap_size_bytes: usize,
75}
76
77/// Hold same-thread or pooled implementation of `IndexWriter`
78pub enum IndexWriterImpl {
79    SameThread(SingleIndexWriter),
80    Threaded(IndexWriter),
81}
82
83impl IndexWriterImpl {
84    pub fn new(index: &Index, writer_threads: WriterThreads, writer_heap_size_bytes: usize, merge_policy: Arc<dyn MergePolicy>) -> SummaResult<Self> {
85        Ok(match writer_threads {
86            WriterThreads::SameThread => IndexWriterImpl::SameThread(SingleIndexWriter {
87                index: index.clone(),
88                index_writer: RwLock::new(SingleSegmentIndexWriter::new(index.clone(), writer_heap_size_bytes)?),
89                writer_heap_size_bytes,
90            }),
91            WriterThreads::N(writer_threads) => {
92                let index_writer = index.writer_with_num_threads(writer_threads as usize, writer_heap_size_bytes)?;
93                index_writer.set_merge_policy(merge_policy);
94                IndexWriterImpl::Threaded(index_writer)
95            }
96        })
97    }
98
99    pub fn delete_by_query(&self, query: Box<dyn Query>) -> SummaResult<u64> {
100        match self {
101            IndexWriterImpl::SameThread(_) => unimplemented!(),
102            IndexWriterImpl::Threaded(writer) => Ok(writer.delete_query(query)?),
103        }
104    }
105
106    pub fn delete_by_term(&self, term: Term) -> u64 {
107        match self {
108            IndexWriterImpl::SameThread(_) => unimplemented!(),
109            IndexWriterImpl::Threaded(writer) => writer.delete_term(term),
110        }
111    }
112
113    pub fn add_document(&self, document: TantivyDocument) -> SummaResult<()> {
114        match self {
115            IndexWriterImpl::SameThread(writer) => {
116                writer.index_writer.write().expect("poisoned").add_document(document)?;
117            }
118            IndexWriterImpl::Threaded(writer) => {
119                writer.add_document(document)?;
120            }
121        };
122        Ok(())
123    }
124    pub fn index(&self) -> &Index {
125        match self {
126            IndexWriterImpl::SameThread(writer) => &writer.index,
127            IndexWriterImpl::Threaded(writer) => writer.index(),
128        }
129    }
130    pub fn merge_with_attributes(&self, segment_ids: &[SegmentId], segment_attributes: Option<serde_json::Value>) -> SummaResult<Option<SegmentMeta>> {
131        match self {
132            IndexWriterImpl::SameThread(_) => {
133                unimplemented!()
134            }
135            IndexWriterImpl::Threaded(writer) => {
136                let target_segment = writer.merge_with_attributes(segment_ids, segment_attributes).wait()?;
137                writer.garbage_collect_files().wait()?;
138                Ok(target_segment)
139            }
140        }
141    }
142    pub fn commit(&mut self) -> SummaResult<Opstamp> {
143        match self {
144            IndexWriterImpl::SameThread(writer) => {
145                let index = writer.index.clone();
146                let writer_heap_size_bytes = writer.writer_heap_size_bytes;
147                let writer = writer.index_writer.get_mut().expect("poisoned");
148                take_mut::take(writer, |writer| {
149                    writer.finalize().expect("cannot finalize");
150                    SingleSegmentIndexWriter::new(index.clone(), writer_heap_size_bytes).expect("cannot recreate writer")
151                });
152                Ok(0)
153            }
154            IndexWriterImpl::Threaded(writer) => {
155                info!(action = "commit_files");
156                let opstamp = writer.prepare_commit()?.commit()?;
157                info!(action = "committed", opstamp = ?opstamp);
158                Ok(opstamp)
159            }
160        }
161    }
162    pub fn rollback(&mut self) -> SummaResult<()> {
163        match self {
164            IndexWriterImpl::SameThread(_) => unimplemented!(),
165            IndexWriterImpl::Threaded(writer) => {
166                info!(action = "rollback_files");
167                let opstamp = writer.rollback()?;
168                info!(action = "rollbacked", opstamp = ?opstamp);
169                Ok(())
170            }
171        }
172    }
173}
174
175/// Managing write operations to index
176pub struct IndexWriterHolder {
177    index_writer: IndexWriterImpl,
178    merge_policy: Arc<dyn MergePolicy>,
179    unique_fields: Vec<Field>,
180    writer_threads: WriterThreads,
181    writer_heap_size_bytes: usize,
182    auto_id_field: Option<Field>,
183    extra_year_field: Option<(Field, Field)>,
184    mapped_fields: Vec<((Field, Vec<String>), Field)>,
185}
186
187impl IndexWriterHolder {
188    /// Creates new `IndexWriterHolder` containing `tantivy::IndexWriter` and primary key
189    ///
190    /// `IndexWriterHolder` maintains invariant that the only document with the particular primary key exists in the index.
191    /// It is reached by deletion of every document with the same primary key as indexing one.
192    /// The type of primary key is restricted to I64 but it is subjected to be changed in the future.
193    pub(super) fn new(
194        index_writer: IndexWriterImpl,
195        merge_policy: Arc<dyn MergePolicy>,
196        unique_fields: Vec<Field>,
197        auto_id_field: Option<Field>,
198        mapped_fields: Vec<((Field, Vec<String>), Field)>,
199        writer_threads: WriterThreads,
200        writer_heap_size_bytes: usize,
201    ) -> SummaResult<IndexWriterHolder> {
202        let schema = index_writer.index().schema();
203        let extra_year_field = if let (Ok(extra_field), Ok(issued_at_field)) = (schema.get_field("extra"), schema.get_field("issued_at")) {
204            Some((extra_field, issued_at_field))
205        } else {
206            None
207        };
208        Ok(IndexWriterHolder {
209            index_writer,
210            merge_policy,
211            unique_fields,
212            auto_id_field,
213            writer_threads,
214            writer_heap_size_bytes,
215            extra_year_field,
216            mapped_fields,
217        })
218    }
219
220    /// Creates new `IndexWriterHolder` from `Index` and `core::Config`
221    pub fn create(
222        index: &Index,
223        writer_threads: WriterThreads,
224        writer_heap_size_bytes: usize,
225        merge_policy: Arc<dyn MergePolicy>,
226    ) -> SummaResult<IndexWriterHolder> {
227        let index_writer = IndexWriterImpl::new(index, writer_threads.clone(), writer_heap_size_bytes, merge_policy.clone())?;
228        let schema = index_writer.index().schema();
229        let metas = index.load_metas()?;
230        let mapped_fields = metas
231            .index_attributes()?
232            .map(|attributes: proto::IndexAttributes| {
233                attributes
234                    .mapped_fields
235                    .iter()
236                    .map(|proto::MappedField { source_field, target_field }| {
237                        Ok::<((Field, Vec<String>), Field), ValidationError>((
238                            schema
239                                .find_field(source_field)
240                                .ok_or_else(|| ValidationError::MissingField(source_field.to_string()))
241                                .map(|(field, full_path)| (field, full_path.split('.').map(|x| x.to_string()).collect()))?,
242                            schema
243                                .get_field(target_field)
244                                .map_err(|_| ValidationError::MissingField(source_field.to_string()))?,
245                        ))
246                    })
247                    .collect::<Result<Vec<(_, _)>, _>>()
248            })
249            .transpose()?
250            .unwrap_or_default();
251        let unique_fields = metas
252            .index_attributes()?
253            .map(|attributes: proto::IndexAttributes| {
254                attributes
255                    .unique_fields
256                    .iter()
257                    .map(|unique_field| {
258                        schema
259                            .find_field(unique_field)
260                            .ok_or_else(|| ValidationError::MissingField(unique_field.to_string()))
261                            .map(|x| x.0)
262                    })
263                    .collect::<Result<Vec<_>, _>>()
264            })
265            .transpose()?
266            .unwrap_or_default();
267        let auto_id_field = metas
268            .index_attributes()?
269            .and_then(|attributes: proto::IndexAttributes| {
270                attributes.auto_id_field.map(|auto_id_field| {
271                    schema
272                        .get_field(&auto_id_field)
273                        .or_else(|_| Err(ValidationError::MissingField(auto_id_field.to_string())))
274                })
275            })
276            .transpose()?;
277        IndexWriterHolder::new(
278            index_writer,
279            merge_policy,
280            unique_fields,
281            auto_id_field,
282            mapped_fields,
283            writer_threads,
284            writer_heap_size_bytes,
285        )
286    }
287
288    /// Delete index by its unique fields
289    pub(super) fn resolve_conflicts(&self, document: &TantivyDocument, conflict_strategy: proto::ConflictStrategy) -> SummaResult<Option<u64>> {
290        if self.unique_fields.is_empty() || matches!(conflict_strategy, proto::ConflictStrategy::DoNothing) {
291            return Ok(None);
292        }
293
294        let unique_terms: Vec<Term> = self
295            .unique_fields
296            .iter()
297            .flat_map(|unique_field| {
298                document.get_all(*unique_field).map(|value| match value.as_value() {
299                    ReferenceValue::Leaf(ReferenceValueLeaf::Str(s)) => Some(Ok(vec![Term::from_field_text(*unique_field, s)])),
300                    ReferenceValue::Leaf(ReferenceValueLeaf::I64(i)) => Some(Ok(vec![Term::from_field_i64(*unique_field, i)])),
301                    ReferenceValue::Leaf(ReferenceValueLeaf::U64(i)) => Some(Ok(vec![Term::from_field_u64(*unique_field, i)])),
302                    ReferenceValue::Leaf(ReferenceValueLeaf::F64(i)) => Some(Ok(vec![Term::from_field_f64(*unique_field, i)])),
303                    _ => {
304                        let schema = self.index_writer.index().schema();
305                        let field_type = schema.get_field_entry(*unique_field).field_type();
306                        Some(Err(Error::Validation(Box::new(ValidationError::InvalidUniqueFieldType(field_type.clone())))))
307                    }
308                })
309            })
310            .flatten()
311            .collect::<SummaResult<Vec<_>>>()?
312            .into_iter()
313            .flatten()
314            .collect();
315
316        if unique_terms.is_empty() {
317            Err(ValidationError::MissingUniqueField(format!(
318                "{:?}",
319                document.to_named_doc(&self.index_writer.index().schema()),
320            )))?
321        }
322
323        let mut last_opstamp = None;
324        for term in unique_terms {
325            last_opstamp = Some(self.delete_by_term(term))
326        }
327
328        Ok(last_opstamp)
329    }
330
331    /// Delete documents by query
332    pub(super) fn delete_by_query(&self, query: Box<dyn Query>) -> SummaResult<u64> {
333        self.index_writer.delete_by_query(query)
334    }
335
336    /// Delete documents by `Term`
337    pub(super) fn delete_by_term(&self, term: Term) -> u64 {
338        self.index_writer.delete_by_term(term)
339    }
340
341    /// Tantivy `Index`
342    pub(super) fn index(&self) -> &Index {
343        self.index_writer.index()
344    }
345
346    #[inline]
347    fn process_dynamic_fields(&self, document: &mut TantivyDocument) -> SummaResult<()> {
348        if let Some((extra_field, issued_at_field)) = self.extra_year_field {
349            if let Some(issued_at_value) = document.get_first(issued_at_field) {
350                if let Some(issued_at_value) = issued_at_value.as_i64() {
351                    if let Some(correct_timestamp) = DateTime::from_timestamp(issued_at_value, 0) {
352                        document.add_text(extra_field, correct_timestamp.year().to_string())
353                    }
354                }
355            }
356        }
357        let mut buffer = vec![];
358        for ((source_field, source_full_path), target_field) in &self.mapped_fields {
359            for value in document.get_all(*source_field) {
360                match value.as_value() {
361                    ReferenceValue::Object(entries) => extract_flatten_from_map(entries, source_full_path, &mut buffer),
362                    ReferenceValue::Leaf(leaf) => buffer.push(OwnedValue::from(value)),
363                    _ => unimplemented!(),
364                }
365            }
366            for v in &buffer {
367                document.add_field_value(*target_field, v)
368            }
369            buffer.clear();
370        }
371        Ok(())
372    }
373    #[inline]
374    fn setup_id_field(&self, document: &mut TantivyDocument) -> SummaResult<()> {
375        if let Some(auto_id_field) = &self.auto_id_field {
376            let schema = self.index_writer.index().schema();
377            match schema.get_field_entry(*auto_id_field).field_type() {
378                FieldType::Str(_) => match document.get_first(*auto_id_field) {
379                    Some(_) => {}
380                    None => document.add_text(*auto_id_field, generate_id()),
381                },
382                _ => unreachable!(),
383            }
384        }
385        Ok(())
386    }
387
388    /// Put document to the index. Before comes searchable it must be committed
389    pub fn index_document(&self, mut document: TantivyDocument, conflict_strategy: proto::ConflictStrategy) -> SummaResult<()> {
390        self.process_dynamic_fields(&mut document)?;
391        self.setup_id_field(&mut document)?;
392        self.resolve_conflicts(&document, conflict_strategy)?;
393        self.index_writer.add_document(document)?;
394        Ok(())
395    }
396
397    /// Merge segments into one.
398    ///
399    /// Also cleans deleted documents and do recompression. Possible to pass the only segment in `segment_ids` to do recompression or clean up.
400    /// It is heavy operation that also blocks on `.await` so should be spawned if non-blocking behaviour is required
401    pub fn merge(&self, segment_ids: &[SegmentId], segment_attributes: Option<SummaSegmentAttributes>) -> SummaResult<Option<SegmentMeta>> {
402        info!(action = "merge_segments", segment_ids = ?segment_ids);
403        let segment_meta = self.index_writer.merge_with_attributes(
404            segment_ids,
405            segment_attributes.map(|segment_attributes| serde_json::to_value(segment_attributes).expect("cannot serialize")),
406        )?;
407        info!(action = "merged_segments", segment_ids = ?segment_ids, merged_segment_meta = ?segment_meta);
408        Ok(segment_meta)
409    }
410
411    /// Commits already indexed documents
412    ///
413    /// Committing makes indexed documents visible
414    /// It is heavy operation that also blocks on `.await` so should be spawned if non-blocking behaviour is required
415    pub fn commit(&mut self) -> SummaResult<Opstamp> {
416        self.index_writer.commit()
417    }
418
419    pub fn rollback(&mut self) -> SummaResult<()> {
420        self.index_writer.rollback()
421    }
422
423    pub fn vacuum(&self, segment_attributes: Option<SummaSegmentAttributes>, excluded_segments: Vec<String>) -> SummaResult<()> {
424        let mut segments = self.index().searchable_segments()?;
425        segments.sort_by_key(|segment| segment.meta().num_deleted_docs());
426
427        let excluded_segments: HashSet<SegmentId, RandomState> = excluded_segments
428            .into_iter()
429            .map(|s| SegmentId::from_uuid_string(&s))
430            .collect::<Result<_, _>>()
431            .map_err(|e| Error::InvalidSegmentId(e.to_string()))?;
432
433        let segments = segments
434            .into_iter()
435            .filter(|segment| {
436                let is_frozen = segment
437                    .meta()
438                    .segment_attributes()
439                    .as_ref()
440                    .map(|segment_attributes| {
441                        let parsed_attributes = serde_json::from_value::<SummaSegmentAttributes>(segment_attributes.clone());
442                        parsed_attributes.map(|v| v.is_frozen).unwrap_or(false)
443                    })
444                    .unwrap_or(false);
445                let is_excluded = excluded_segments.contains(&segment.id());
446                !is_frozen && !is_excluded
447            })
448            .collect::<Vec<_>>();
449        if !segments.is_empty() {
450            self.merge(&segments.iter().map(|segment| segment.id()).collect::<Vec<_>>(), segment_attributes)?;
451        }
452        Ok(())
453    }
454
455    pub fn wait_merging_threads(&mut self) {
456        match &mut self.index_writer {
457            IndexWriterImpl::SameThread(_) => (),
458            IndexWriterImpl::Threaded(index_writer) => take_mut::take(index_writer, |index_writer| {
459                let index = index_writer.index().clone();
460                info!(action = "wait_merging_threads", mode = "threaded");
461                index_writer.wait_merging_threads().expect("cannot wait merging threads");
462                info!(action = "merging_threads_finished", mode = "threaded");
463                let index_writer = index
464                    .writer_with_num_threads(self.writer_threads.threads() as usize, self.writer_heap_size_bytes)
465                    .expect("cannot create index writer_holder");
466                index_writer.set_merge_policy(self.merge_policy.clone());
467                index_writer
468            }),
469        };
470    }
471
472    /// Locking index files for executing operation on them
473    pub fn commit_and_prepare(&mut self, with_hotcache: bool) -> SummaResult<Opstamp> {
474        let opstamp = self.commit()?;
475        self.wait_merging_threads();
476
477        if with_hotcache {
478            let directory = self.index().directory();
479            let hotcache_bytes = crate::directories::create_hotcache(
480                directory
481                    .underlying_directory()
482                    .expect("managed directory should contain nested directory")
483                    .box_clone(),
484            )?;
485            directory.atomic_write(Path::new(&format!("hotcache.{}.bin", opstamp)), &hotcache_bytes)?;
486        }
487        Ok(opstamp)
488    }
489}