1use std::collections::hash_map::RandomState;
2use std::collections::HashSet;
3use std::path::Path;
4use std::sync::{Arc, RwLock};
5
6use chrono::{DateTime, Datelike};
7use rand::RngCore;
8use summa_proto::proto;
9use tantivy::index::SegmentId;
10use tantivy::merge_policy::MergePolicy;
11use tantivy::query::Query;
12use tantivy::schema::document::ReferenceValueLeaf;
13use tantivy::schema::document::{CompactDocObjectIter, CompactDocValue, ReferenceValue};
14use tantivy::schema::{Field, FieldType, OwnedValue, Value};
15use tantivy::{Directory, Document, Index, IndexWriter, Opstamp, SegmentMeta, SingleSegmentIndexWriter, TantivyDocument, Term};
16use tracing::info;
17
18use super::SummaSegmentAttributes;
19use crate::configs::core::WriterThreads;
20use crate::errors::{SummaResult, ValidationError};
21use crate::Error;
22
23fn extract_flatten<'a, T: AsRef<str>>(v: CompactDocValue<'a>, parts: &[T], buffer: &mut Vec<OwnedValue>) {
24 let mut current = v;
25 for (i, part) in parts.iter().enumerate() {
26 match current.as_value() {
27 ReferenceValue::Object(m) => {
28 for (key, value) in m {
29 if key == part.as_ref() {
30 current = value;
31 break;
32 }
33 }
34 }
35 ReferenceValue::Array(a) => {
36 for child in a {
37 extract_flatten(child, &parts[i..], buffer)
38 }
39 }
40 _ => break,
41 }
42 }
43 if let ReferenceValue::Leaf(_) = current.as_value() {
44 buffer.push(OwnedValue::from(current))
45 }
46}
47fn extract_flatten_from_map<'a, T: AsRef<str>>(m: CompactDocObjectIter<'a>, parts: &[T], buffer: &mut Vec<OwnedValue>) {
48 for (key, value) in m {
49 if key == parts[0].as_ref() {
50 match value.as_value() {
51 ReferenceValue::Leaf(_) => {}
52 ReferenceValue::Array(a) => {
53 for child in a {
54 extract_flatten(child, &parts[1..], buffer)
55 }
56 }
57 ReferenceValue::Object(child) => extract_flatten_from_map(child, &parts[1..], buffer),
58 }
59 }
60 }
61}
62
63#[inline]
64fn generate_id() -> String {
65 let mut data = [0u8; 16];
66 rand::thread_rng().fill_bytes(&mut data);
67 base36::encode(&data)
68}
69
70pub struct SingleIndexWriter {
72 pub index_writer: RwLock<SingleSegmentIndexWriter>,
73 pub index: Index,
74 pub writer_heap_size_bytes: usize,
75}
76
77pub enum IndexWriterImpl {
79 SameThread(SingleIndexWriter),
80 Threaded(IndexWriter),
81}
82
83impl IndexWriterImpl {
84 pub fn new(index: &Index, writer_threads: WriterThreads, writer_heap_size_bytes: usize, merge_policy: Arc<dyn MergePolicy>) -> SummaResult<Self> {
85 Ok(match writer_threads {
86 WriterThreads::SameThread => IndexWriterImpl::SameThread(SingleIndexWriter {
87 index: index.clone(),
88 index_writer: RwLock::new(SingleSegmentIndexWriter::new(index.clone(), writer_heap_size_bytes)?),
89 writer_heap_size_bytes,
90 }),
91 WriterThreads::N(writer_threads) => {
92 let index_writer = index.writer_with_num_threads(writer_threads as usize, writer_heap_size_bytes)?;
93 index_writer.set_merge_policy(merge_policy);
94 IndexWriterImpl::Threaded(index_writer)
95 }
96 })
97 }
98
99 pub fn delete_by_query(&self, query: Box<dyn Query>) -> SummaResult<u64> {
100 match self {
101 IndexWriterImpl::SameThread(_) => unimplemented!(),
102 IndexWriterImpl::Threaded(writer) => Ok(writer.delete_query(query)?),
103 }
104 }
105
106 pub fn delete_by_term(&self, term: Term) -> u64 {
107 match self {
108 IndexWriterImpl::SameThread(_) => unimplemented!(),
109 IndexWriterImpl::Threaded(writer) => writer.delete_term(term),
110 }
111 }
112
113 pub fn add_document(&self, document: TantivyDocument) -> SummaResult<()> {
114 match self {
115 IndexWriterImpl::SameThread(writer) => {
116 writer.index_writer.write().expect("poisoned").add_document(document)?;
117 }
118 IndexWriterImpl::Threaded(writer) => {
119 writer.add_document(document)?;
120 }
121 };
122 Ok(())
123 }
124 pub fn index(&self) -> &Index {
125 match self {
126 IndexWriterImpl::SameThread(writer) => &writer.index,
127 IndexWriterImpl::Threaded(writer) => writer.index(),
128 }
129 }
130 pub fn merge_with_attributes(&self, segment_ids: &[SegmentId], segment_attributes: Option<serde_json::Value>) -> SummaResult<Option<SegmentMeta>> {
131 match self {
132 IndexWriterImpl::SameThread(_) => {
133 unimplemented!()
134 }
135 IndexWriterImpl::Threaded(writer) => {
136 let target_segment = writer.merge_with_attributes(segment_ids, segment_attributes).wait()?;
137 writer.garbage_collect_files().wait()?;
138 Ok(target_segment)
139 }
140 }
141 }
142 pub fn commit(&mut self) -> SummaResult<Opstamp> {
143 match self {
144 IndexWriterImpl::SameThread(writer) => {
145 let index = writer.index.clone();
146 let writer_heap_size_bytes = writer.writer_heap_size_bytes;
147 let writer = writer.index_writer.get_mut().expect("poisoned");
148 take_mut::take(writer, |writer| {
149 writer.finalize().expect("cannot finalize");
150 SingleSegmentIndexWriter::new(index.clone(), writer_heap_size_bytes).expect("cannot recreate writer")
151 });
152 Ok(0)
153 }
154 IndexWriterImpl::Threaded(writer) => {
155 info!(action = "commit_files");
156 let opstamp = writer.prepare_commit()?.commit()?;
157 info!(action = "committed", opstamp = ?opstamp);
158 Ok(opstamp)
159 }
160 }
161 }
162 pub fn rollback(&mut self) -> SummaResult<()> {
163 match self {
164 IndexWriterImpl::SameThread(_) => unimplemented!(),
165 IndexWriterImpl::Threaded(writer) => {
166 info!(action = "rollback_files");
167 let opstamp = writer.rollback()?;
168 info!(action = "rollbacked", opstamp = ?opstamp);
169 Ok(())
170 }
171 }
172 }
173}
174
175pub struct IndexWriterHolder {
177 index_writer: IndexWriterImpl,
178 merge_policy: Arc<dyn MergePolicy>,
179 unique_fields: Vec<Field>,
180 writer_threads: WriterThreads,
181 writer_heap_size_bytes: usize,
182 auto_id_field: Option<Field>,
183 extra_year_field: Option<(Field, Field)>,
184 mapped_fields: Vec<((Field, Vec<String>), Field)>,
185}
186
187impl IndexWriterHolder {
188 pub(super) fn new(
194 index_writer: IndexWriterImpl,
195 merge_policy: Arc<dyn MergePolicy>,
196 unique_fields: Vec<Field>,
197 auto_id_field: Option<Field>,
198 mapped_fields: Vec<((Field, Vec<String>), Field)>,
199 writer_threads: WriterThreads,
200 writer_heap_size_bytes: usize,
201 ) -> SummaResult<IndexWriterHolder> {
202 let schema = index_writer.index().schema();
203 let extra_year_field = if let (Ok(extra_field), Ok(issued_at_field)) = (schema.get_field("extra"), schema.get_field("issued_at")) {
204 Some((extra_field, issued_at_field))
205 } else {
206 None
207 };
208 Ok(IndexWriterHolder {
209 index_writer,
210 merge_policy,
211 unique_fields,
212 auto_id_field,
213 writer_threads,
214 writer_heap_size_bytes,
215 extra_year_field,
216 mapped_fields,
217 })
218 }
219
220 pub fn create(
222 index: &Index,
223 writer_threads: WriterThreads,
224 writer_heap_size_bytes: usize,
225 merge_policy: Arc<dyn MergePolicy>,
226 ) -> SummaResult<IndexWriterHolder> {
227 let index_writer = IndexWriterImpl::new(index, writer_threads.clone(), writer_heap_size_bytes, merge_policy.clone())?;
228 let schema = index_writer.index().schema();
229 let metas = index.load_metas()?;
230 let mapped_fields = metas
231 .index_attributes()?
232 .map(|attributes: proto::IndexAttributes| {
233 attributes
234 .mapped_fields
235 .iter()
236 .map(|proto::MappedField { source_field, target_field }| {
237 Ok::<((Field, Vec<String>), Field), ValidationError>((
238 schema
239 .find_field(source_field)
240 .ok_or_else(|| ValidationError::MissingField(source_field.to_string()))
241 .map(|(field, full_path)| (field, full_path.split('.').map(|x| x.to_string()).collect()))?,
242 schema
243 .get_field(target_field)
244 .map_err(|_| ValidationError::MissingField(source_field.to_string()))?,
245 ))
246 })
247 .collect::<Result<Vec<(_, _)>, _>>()
248 })
249 .transpose()?
250 .unwrap_or_default();
251 let unique_fields = metas
252 .index_attributes()?
253 .map(|attributes: proto::IndexAttributes| {
254 attributes
255 .unique_fields
256 .iter()
257 .map(|unique_field| {
258 schema
259 .find_field(unique_field)
260 .ok_or_else(|| ValidationError::MissingField(unique_field.to_string()))
261 .map(|x| x.0)
262 })
263 .collect::<Result<Vec<_>, _>>()
264 })
265 .transpose()?
266 .unwrap_or_default();
267 let auto_id_field = metas
268 .index_attributes()?
269 .and_then(|attributes: proto::IndexAttributes| {
270 attributes.auto_id_field.map(|auto_id_field| {
271 schema
272 .get_field(&auto_id_field)
273 .or_else(|_| Err(ValidationError::MissingField(auto_id_field.to_string())))
274 })
275 })
276 .transpose()?;
277 IndexWriterHolder::new(
278 index_writer,
279 merge_policy,
280 unique_fields,
281 auto_id_field,
282 mapped_fields,
283 writer_threads,
284 writer_heap_size_bytes,
285 )
286 }
287
288 pub(super) fn resolve_conflicts(&self, document: &TantivyDocument, conflict_strategy: proto::ConflictStrategy) -> SummaResult<Option<u64>> {
290 if self.unique_fields.is_empty() || matches!(conflict_strategy, proto::ConflictStrategy::DoNothing) {
291 return Ok(None);
292 }
293
294 let unique_terms: Vec<Term> = self
295 .unique_fields
296 .iter()
297 .flat_map(|unique_field| {
298 document.get_all(*unique_field).map(|value| match value.as_value() {
299 ReferenceValue::Leaf(ReferenceValueLeaf::Str(s)) => Some(Ok(vec![Term::from_field_text(*unique_field, s)])),
300 ReferenceValue::Leaf(ReferenceValueLeaf::I64(i)) => Some(Ok(vec![Term::from_field_i64(*unique_field, i)])),
301 ReferenceValue::Leaf(ReferenceValueLeaf::U64(i)) => Some(Ok(vec![Term::from_field_u64(*unique_field, i)])),
302 ReferenceValue::Leaf(ReferenceValueLeaf::F64(i)) => Some(Ok(vec![Term::from_field_f64(*unique_field, i)])),
303 _ => {
304 let schema = self.index_writer.index().schema();
305 let field_type = schema.get_field_entry(*unique_field).field_type();
306 Some(Err(Error::Validation(Box::new(ValidationError::InvalidUniqueFieldType(field_type.clone())))))
307 }
308 })
309 })
310 .flatten()
311 .collect::<SummaResult<Vec<_>>>()?
312 .into_iter()
313 .flatten()
314 .collect();
315
316 if unique_terms.is_empty() {
317 Err(ValidationError::MissingUniqueField(format!(
318 "{:?}",
319 document.to_named_doc(&self.index_writer.index().schema()),
320 )))?
321 }
322
323 let mut last_opstamp = None;
324 for term in unique_terms {
325 last_opstamp = Some(self.delete_by_term(term))
326 }
327
328 Ok(last_opstamp)
329 }
330
331 pub(super) fn delete_by_query(&self, query: Box<dyn Query>) -> SummaResult<u64> {
333 self.index_writer.delete_by_query(query)
334 }
335
336 pub(super) fn delete_by_term(&self, term: Term) -> u64 {
338 self.index_writer.delete_by_term(term)
339 }
340
341 pub(super) fn index(&self) -> &Index {
343 self.index_writer.index()
344 }
345
346 #[inline]
347 fn process_dynamic_fields(&self, document: &mut TantivyDocument) -> SummaResult<()> {
348 if let Some((extra_field, issued_at_field)) = self.extra_year_field {
349 if let Some(issued_at_value) = document.get_first(issued_at_field) {
350 if let Some(issued_at_value) = issued_at_value.as_i64() {
351 if let Some(correct_timestamp) = DateTime::from_timestamp(issued_at_value, 0) {
352 document.add_text(extra_field, correct_timestamp.year().to_string())
353 }
354 }
355 }
356 }
357 let mut buffer = vec![];
358 for ((source_field, source_full_path), target_field) in &self.mapped_fields {
359 for value in document.get_all(*source_field) {
360 match value.as_value() {
361 ReferenceValue::Object(entries) => extract_flatten_from_map(entries, source_full_path, &mut buffer),
362 ReferenceValue::Leaf(leaf) => buffer.push(OwnedValue::from(value)),
363 _ => unimplemented!(),
364 }
365 }
366 for v in &buffer {
367 document.add_field_value(*target_field, v)
368 }
369 buffer.clear();
370 }
371 Ok(())
372 }
373 #[inline]
374 fn setup_id_field(&self, document: &mut TantivyDocument) -> SummaResult<()> {
375 if let Some(auto_id_field) = &self.auto_id_field {
376 let schema = self.index_writer.index().schema();
377 match schema.get_field_entry(*auto_id_field).field_type() {
378 FieldType::Str(_) => match document.get_first(*auto_id_field) {
379 Some(_) => {}
380 None => document.add_text(*auto_id_field, generate_id()),
381 },
382 _ => unreachable!(),
383 }
384 }
385 Ok(())
386 }
387
388 pub fn index_document(&self, mut document: TantivyDocument, conflict_strategy: proto::ConflictStrategy) -> SummaResult<()> {
390 self.process_dynamic_fields(&mut document)?;
391 self.setup_id_field(&mut document)?;
392 self.resolve_conflicts(&document, conflict_strategy)?;
393 self.index_writer.add_document(document)?;
394 Ok(())
395 }
396
397 pub fn merge(&self, segment_ids: &[SegmentId], segment_attributes: Option<SummaSegmentAttributes>) -> SummaResult<Option<SegmentMeta>> {
402 info!(action = "merge_segments", segment_ids = ?segment_ids);
403 let segment_meta = self.index_writer.merge_with_attributes(
404 segment_ids,
405 segment_attributes.map(|segment_attributes| serde_json::to_value(segment_attributes).expect("cannot serialize")),
406 )?;
407 info!(action = "merged_segments", segment_ids = ?segment_ids, merged_segment_meta = ?segment_meta);
408 Ok(segment_meta)
409 }
410
411 pub fn commit(&mut self) -> SummaResult<Opstamp> {
416 self.index_writer.commit()
417 }
418
419 pub fn rollback(&mut self) -> SummaResult<()> {
420 self.index_writer.rollback()
421 }
422
423 pub fn vacuum(&self, segment_attributes: Option<SummaSegmentAttributes>, excluded_segments: Vec<String>) -> SummaResult<()> {
424 let mut segments = self.index().searchable_segments()?;
425 segments.sort_by_key(|segment| segment.meta().num_deleted_docs());
426
427 let excluded_segments: HashSet<SegmentId, RandomState> = excluded_segments
428 .into_iter()
429 .map(|s| SegmentId::from_uuid_string(&s))
430 .collect::<Result<_, _>>()
431 .map_err(|e| Error::InvalidSegmentId(e.to_string()))?;
432
433 let segments = segments
434 .into_iter()
435 .filter(|segment| {
436 let is_frozen = segment
437 .meta()
438 .segment_attributes()
439 .as_ref()
440 .map(|segment_attributes| {
441 let parsed_attributes = serde_json::from_value::<SummaSegmentAttributes>(segment_attributes.clone());
442 parsed_attributes.map(|v| v.is_frozen).unwrap_or(false)
443 })
444 .unwrap_or(false);
445 let is_excluded = excluded_segments.contains(&segment.id());
446 !is_frozen && !is_excluded
447 })
448 .collect::<Vec<_>>();
449 if !segments.is_empty() {
450 self.merge(&segments.iter().map(|segment| segment.id()).collect::<Vec<_>>(), segment_attributes)?;
451 }
452 Ok(())
453 }
454
455 pub fn wait_merging_threads(&mut self) {
456 match &mut self.index_writer {
457 IndexWriterImpl::SameThread(_) => (),
458 IndexWriterImpl::Threaded(index_writer) => take_mut::take(index_writer, |index_writer| {
459 let index = index_writer.index().clone();
460 info!(action = "wait_merging_threads", mode = "threaded");
461 index_writer.wait_merging_threads().expect("cannot wait merging threads");
462 info!(action = "merging_threads_finished", mode = "threaded");
463 let index_writer = index
464 .writer_with_num_threads(self.writer_threads.threads() as usize, self.writer_heap_size_bytes)
465 .expect("cannot create index writer_holder");
466 index_writer.set_merge_policy(self.merge_policy.clone());
467 index_writer
468 }),
469 };
470 }
471
472 pub fn commit_and_prepare(&mut self, with_hotcache: bool) -> SummaResult<Opstamp> {
474 let opstamp = self.commit()?;
475 self.wait_merging_threads();
476
477 if with_hotcache {
478 let directory = self.index().directory();
479 let hotcache_bytes = crate::directories::create_hotcache(
480 directory
481 .underlying_directory()
482 .expect("managed directory should contain nested directory")
483 .box_clone(),
484 )?;
485 directory.atomic_write(Path::new(&format!("hotcache.{}.bin", opstamp)), &hotcache_bytes)?;
486 }
487 Ok(opstamp)
488 }
489}