summa_core/components/
index_holder.rs

1use std::collections::HashSet;
2use std::fmt::Debug;
3use std::hash::{Hash, Hasher};
4use std::path::Path;
5use std::sync::Arc;
6
7use futures::future::{join_all, try_join_all};
8use serde::Deserialize;
9use summa_proto::proto;
10use summa_proto::proto::IndexAttributes;
11use tantivy::collector::{Collector, MultiCollector, MultiFruit};
12use tantivy::directory::OwnedBytes;
13use tantivy::query::{EnableScoring, Query};
14use tantivy::schema::{Field, Schema};
15use tantivy::space_usage::SearcherSpaceUsage;
16use tantivy::{Directory, Index, IndexBuilder, IndexReader, Opstamp, ReloadPolicy, Searcher};
17use tokio::sync::RwLock;
18use tracing::{debug, error, info, instrument, trace, warn};
19
20use super::SummaSegmentAttributes;
21use super::{build_fruit_extractor, default_tokenizers, FruitExtractor, ProtoQueryParser};
22use crate::components::collector_cache::CollectorCache;
23use crate::components::fruit_extractors::IntermediateExtractionResult;
24use crate::components::segment_attributes::SegmentAttributesMergerImpl;
25use crate::components::{IndexWriterHolder, SummaDocument};
26use crate::configs::ConfigProxy;
27use crate::directories::{CachingDirectory, ExternalRequest, ExternalRequestGenerator, FileStats, HotDirectory, NetworkDirectory, StaticDirectoryCache};
28use crate::errors::{SummaResult, ValidationError};
29use crate::proto_traits::Wrapper;
30use crate::Error;
31
32pub struct IndexHolder {
33    index_engine_config: Arc<dyn ConfigProxy<proto::IndexEngineConfig>>,
34    index_name: String,
35    index: Index,
36    cached_index_attributes: Option<IndexAttributes>,
37    cached_schema: Schema,
38    cached_multi_fields: HashSet<Field>,
39    index_reader: IndexReader,
40    index_writer_holder: Option<Arc<RwLock<IndexWriterHolder>>>,
41    query_parser: ProtoQueryParser,
42    /// Counters
43    collector_cache: parking_lot::Mutex<CollectorCache>,
44}
45
46impl Hash for IndexHolder {
47    fn hash<H: Hasher>(&self, state: &mut H) {
48        self.index_name.hash(state)
49    }
50}
51
52impl PartialEq<Self> for IndexHolder {
53    fn eq(&self, other: &Self) -> bool {
54        self.index_name.eq(&other.index_name)
55    }
56}
57
58#[derive(Deserialize)]
59struct LightMeta {
60    pub opstamp: Opstamp,
61}
62
63impl Debug for IndexHolder {
64    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
65        f.debug_struct("IndexHolder")
66            .field("index_name", &self.index_name)
67            .field("index_settings", &self.index.settings())
68            .finish()
69    }
70}
71
72/// Sets up standard Summa tokenizers
73///
74/// The set of tokenizers includes standard Tantivy tokenizers as well as `Tokenizer` that supports CJK
75pub fn register_default_tokenizers(index: &Index) {
76    for (tokenizer_name, tokenizer) in &default_tokenizers() {
77        index.tokenizers().register(tokenizer_name, tokenizer.clone())
78    }
79}
80
81/// Cleanup after index deletion
82///
83/// Consumers are stopped, then `IndexConfig` is removed from `CoreConfig`
84/// and then directory with the index is deleted.
85pub async fn cleanup_index(index_engine_config: proto::IndexEngineConfig) -> SummaResult<()> {
86    match index_engine_config.config {
87        #[cfg(feature = "fs")]
88        Some(proto::index_engine_config::Config::File(ref config)) => {
89            info!(action = "delete_directory", directory = ?config.path);
90            tokio::fs::remove_dir_all(&config.path)
91                .await
92                .map_err(|e| Error::IO((e, Some(std::path::PathBuf::from(&config.path)))))?;
93        }
94        _ => (),
95    };
96    Ok(())
97}
98
99pub async fn read_opstamp<D: Directory>(directory: &D) -> SummaResult<Opstamp> {
100    let meta = directory.atomic_read_async(Path::new("meta.json")).await.map_err(|e| Error::Anyhow(e.into()))?;
101    let meta_string = String::from_utf8(meta).map_err(|e| Error::Anyhow(e.into()))?;
102    let meta_json: LightMeta = match serde_json::from_str(&meta_string) {
103        Ok(meta_json) => meta_json,
104        Err(e) => {
105            error!(action = "invalid_json", json = meta_string);
106            Err(e)?
107        }
108    };
109    Ok(meta_json.opstamp)
110}
111
112fn wrap_with_caches<D: Directory>(
113    directory: D,
114    hotcache_bytes: Option<OwnedBytes>,
115    cache_config: Option<proto::CacheConfig>,
116    opstamp: Opstamp,
117) -> SummaResult<Box<dyn Directory>> {
118    let static_cache = hotcache_bytes
119        .map(|hotcache_bytes| StaticDirectoryCache::open(hotcache_bytes, opstamp))
120        .transpose()?;
121    info!(action = "opened_static_cache", static_cache = ?static_cache);
122    let file_lengths = static_cache
123        .as_ref()
124        .map(|static_cache| static_cache.file_lengths().clone())
125        .unwrap_or_default();
126
127    info!(action = "read_file_lengths", file_lengths = ?file_lengths);
128    let file_stats = FileStats::from_file_lengths(file_lengths);
129    let next_directory = if let Some(cache_config) = cache_config {
130        Box::new(CachingDirectory::bounded(Arc::new(directory), cache_config.cache_size as usize, file_stats)) as Box<dyn Directory>
131    } else {
132        Box::new(directory) as Box<dyn Directory>
133    };
134    info!(action = "wrapped_with_cache");
135    Ok(match static_cache {
136        Some(static_cache) => {
137            let hot_directory = HotDirectory::open(next_directory, static_cache);
138            info!(action = "opened_hotcache", hot_directory = ?hot_directory);
139            Box::new(hot_directory?)
140        }
141        None => next_directory,
142    })
143}
144
145impl IndexHolder {
146    /// Sets up `IndexHolder`
147    pub fn create_holder(
148        core_config: &crate::configs::core::Config,
149        mut index: Index,
150        index_name: &str,
151        index_engine_config: Arc<dyn ConfigProxy<proto::IndexEngineConfig>>,
152        merge_policy: Option<proto::MergePolicy>,
153        query_parser_config: proto::QueryParserConfig,
154    ) -> SummaResult<IndexHolder> {
155        register_default_tokenizers(&index);
156
157        index.settings_mut().docstore_compress_threads = core_config.doc_store_compress_threads;
158        index.set_segment_attributes_merger(Arc::new(SegmentAttributesMergerImpl::<SummaSegmentAttributes>::new()));
159
160        let metas = index.load_metas()?;
161        let cached_schema = index.schema();
162        let cached_index_attributes: Option<IndexAttributes> = metas.index_attributes()?;
163        let cached_multi_fields = cached_index_attributes
164            .as_ref()
165            .map(|index_attributes| {
166                index_attributes
167                    .multi_fields
168                    .iter()
169                    .map(|field_name| Ok::<_, Error>(cached_schema.get_field(field_name)?))
170                    .collect()
171            })
172            .transpose()?
173            .unwrap_or_default();
174
175        let query_parser = ProtoQueryParser::for_index(&index, query_parser_config)?;
176        let index_reader = index
177            .reader_builder()
178            .doc_store_cache_num_blocks(core_config.doc_store_cache_num_blocks)
179            .reload_policy(ReloadPolicy::OnCommitWithDelay)
180            .try_into()?;
181        index_reader.reload()?;
182
183        let index_writer_holder = if let Some(writer_threads) = &core_config.writer_threads {
184            let merge_policy = Wrapper::from(merge_policy).into();
185            info!(action = "create_index_writer", merge_policy = ?merge_policy, writer_threads = ?writer_threads, writer_heap_size_bytes = core_config.writer_heap_size_bytes);
186            Some(Arc::new(RwLock::new(IndexWriterHolder::create(
187                &index,
188                writer_threads.clone(),
189                core_config.writer_heap_size_bytes as usize,
190                merge_policy,
191            )?)))
192        } else {
193            None
194        };
195
196        Ok(IndexHolder {
197            index_engine_config,
198            index_name: index_name.to_string(),
199            index: index.clone(),
200            query_parser,
201            cached_schema,
202            cached_index_attributes,
203            cached_multi_fields,
204            index_reader,
205            index_writer_holder,
206            collector_cache: parking_lot::Mutex::new(CollectorCache::new(&core_config.collector_cache)),
207        })
208    }
209
210    /// Creates index and sets it up via `setup`
211    #[instrument(skip_all)]
212    pub fn create_memory_index(index_builder: IndexBuilder) -> SummaResult<Index> {
213        let index = index_builder.create_in_ram()?;
214        info!(action = "created", index = ?index);
215        Ok(index)
216    }
217
218    /// Creates index and sets it up via `setup`
219    #[instrument(skip_all)]
220    #[cfg(feature = "fs")]
221    pub async fn create_file_index(index_path: &Path, index_builder: IndexBuilder) -> SummaResult<Index> {
222        if index_path.exists() {
223            return Err(ValidationError::ExistingPath(index_path.to_owned()).into());
224        }
225        tokio::fs::create_dir_all(index_path).await?;
226        let index = index_builder.create_in_dir(index_path)?;
227        info!(action = "created", index = ?index);
228        Ok(index)
229    }
230
231    /// Attaches index and sets it up via `setup`
232    #[instrument(skip_all)]
233    #[cfg(feature = "fs")]
234    pub async fn open_file_index(file_engine_config: &proto::FileEngineConfig) -> SummaResult<Index> {
235        let index = Index::open_in_dir(&file_engine_config.path)?;
236        info!(action = "opened", config = ?file_engine_config);
237        Ok(index)
238    }
239
240    pub async fn open_remote_index<
241        TExternalRequest: ExternalRequest + 'static,
242        TExternalRequestGenerator: ExternalRequestGenerator<TExternalRequest> + 'static,
243    >(
244        remote_engine_config: proto::RemoteEngineConfig,
245        read_hotcache: bool,
246    ) -> SummaResult<Index> {
247        info!(action = "opening_network_directory", config = ?remote_engine_config, read_hotcache = read_hotcache);
248        let network_directory = NetworkDirectory::open(Box::new(TExternalRequestGenerator::new(remote_engine_config.clone())));
249        let opstamp = read_opstamp(&network_directory).await?;
250        let hotcache_bytes = match network_directory
251            .get_network_file_handle(format!("hotcache.{}.bin", opstamp).as_ref())
252            .do_read_bytes_async(None)
253            .await
254        {
255            Ok(hotcache_bytes) => {
256                if read_hotcache {
257                    info!(action = "read_hotcache", len = hotcache_bytes.len());
258                    Some(hotcache_bytes)
259                } else {
260                    warn!(action = "omit_hotcache");
261                    None
262                }
263            }
264            Err(error) => {
265                warn!(action = "error_hotcache", error = ?error);
266                None
267            }
268        };
269        let directory = wrap_with_caches(network_directory, hotcache_bytes, remote_engine_config.cache_config, opstamp)?;
270        Ok(Index::open_async(directory).await?)
271    }
272
273    /// Compression
274    pub fn compression(&self) -> proto::Compression {
275        Wrapper::from(self.index_reader().searcher().index().settings().docstore_compression).into_inner()
276    }
277
278    /// Load index attributes from meta.json
279    pub fn index_attributes(&self) -> Option<&IndexAttributes> {
280        self.cached_index_attributes.as_ref()
281    }
282
283    /// How to resolve conflicts on inserting new documents
284    pub fn conflict_strategy(&self) -> proto::ConflictStrategy {
285        self.index_attributes().as_ref().map(|c| c.conflict_strategy()).unwrap_or_default()
286    }
287
288    /// Index name
289    pub fn index_name(&self) -> &str {
290        &self.index_name
291    }
292
293    /// `IndexReader` singleton
294    pub fn index_reader(&self) -> &IndexReader {
295        &self.index_reader
296    }
297
298    /// Return internal Tantivy index
299    pub fn index(&self) -> &Index {
300        &self.index
301    }
302
303    pub fn index_engine_config(&self) -> &Arc<dyn ConfigProxy<proto::IndexEngineConfig>> {
304        &self.index_engine_config
305    }
306
307    pub fn index_writer_holder(&self) -> SummaResult<&Arc<RwLock<IndexWriterHolder>>> {
308        self.index_writer_holder
309            .as_ref()
310            .ok_or_else(|| Error::ReadOnlyIndex(self.index_name.to_string()))
311    }
312
313    /// Index schema
314    pub fn schema(&self) -> &Schema {
315        &self.cached_schema
316    }
317
318    /// Multi fields
319    pub fn multi_fields(&self) -> &HashSet<Field> {
320        &self.cached_multi_fields
321    }
322
323    /// Return internal Tantivy index
324    pub fn real_directory(&self) -> &dyn Directory {
325        self.index.directory().real_directory()
326    }
327
328    /// Load term dictionaries into memory
329    pub async fn partial_warmup<T: AsRef<str>>(&self, load_dictionaries: bool, fields: &[T]) -> SummaResult<()> {
330        let searcher = self.index_reader().searcher();
331        let mut warm_up_futures = Vec::new();
332        let default_fields = fields
333            .iter()
334            .map(|field_name| {
335                self.cached_schema
336                    .find_field(field_name.as_ref())
337                    .map(|x| x.0)
338                    .ok_or_else(|| ValidationError::MissingField(field_name.as_ref().to_string()))
339            })
340            .collect::<Result<HashSet<_>, _>>()?;
341        for field in default_fields {
342            for segment_reader in searcher.segment_readers() {
343                let inverted_index = segment_reader.inverted_index_async(field).await?.clone();
344                if load_dictionaries {
345                    warm_up_futures.push(async move {
346                        let dict = inverted_index.terms();
347                        info!(action = "warming_up_dictionary", index_name = ?self.index_name());
348                        dict.warm_up_dictionary_async().await
349                    });
350                }
351            }
352        }
353        info!(action = "warming_up");
354        try_join_all(warm_up_futures).await?;
355        Ok(())
356    }
357
358    /// Load all index files into memory
359    pub async fn full_warmup(&self) -> SummaResult<()> {
360        let managed_directory = self.index.directory();
361        info!(action = "warming_up");
362        join_all(managed_directory.list_managed_files().into_iter().map(move |file| {
363            let file_name = file.to_string_lossy().to_string();
364            async move {
365                info!(action = "start_reading_file", index_name = ?self.index_name(), file_name = ?file_name);
366                managed_directory.atomic_read_async(&file).await.map_err(|e| Error::Tantivy(e.into()))?;
367                info!(action = "finished_reading_file", index_name = ?self.index_name(), file_name = ?file_name);
368                Ok(())
369            }
370        }))
371        .await
372        .into_iter()
373        .collect::<SummaResult<Vec<_>>>()?;
374        Ok(())
375    }
376
377    /// Runs a query on the segment readers wrapped by the searcher asynchronously.
378    pub async fn search_in_segments_async(
379        &self,
380        searcher: &Searcher,
381        query: &dyn Query,
382        collector: &MultiCollector<'_>,
383        is_fieldnorms_scoring_enabled: Option<bool>,
384    ) -> tantivy::Result<MultiFruit> {
385        let enabled_scoring = match (is_fieldnorms_scoring_enabled, collector.requires_scoring()) {
386            (Some(true), true) | (None, true) => EnableScoring::enabled_from_searcher(searcher),
387            (Some(false), true) => EnableScoring::enabled_from_searcher_without_fieldnorms(searcher),
388            (_, false) => EnableScoring::disabled_from_searcher(searcher),
389        };
390        let segment_readers = searcher.segment_readers();
391        trace!(index_name = ?self.index_name, action = "weight");
392        let weight = query.weight_async(enabled_scoring).await?;
393        trace!(index_name = ?self.index_name, action = "collect_segment");
394        let fruits = join_all(segment_readers.iter().enumerate().map(|(segment_ord, segment_reader)| {
395            let weight_ref = weight.as_ref();
396            collector.collect_segment_async(weight_ref, segment_ord as u32, segment_reader)
397        }))
398        .await
399        .into_iter()
400        .collect::<tantivy::Result<Vec<_>>>()?;
401        collector.merge_fruits(fruits)
402    }
403
404    pub fn search_in_segments(
405        &self,
406        searcher: &Searcher,
407        query: &dyn Query,
408        collector: &MultiCollector<'_>,
409        is_fieldnorms_scoring_enabled: Option<bool>,
410    ) -> tantivy::Result<MultiFruit> {
411        let enabled_scoring = match (is_fieldnorms_scoring_enabled, collector.requires_scoring()) {
412            (Some(true), true) | (None, true) => EnableScoring::enabled_from_searcher(searcher),
413            (Some(false), true) => EnableScoring::enabled_from_searcher_without_fieldnorms(searcher),
414            (_, false) => EnableScoring::disabled_from_searcher(searcher),
415        };
416        searcher.search_with_executor(query, collector, searcher.index().search_executor(), enabled_scoring)
417    }
418
419    pub fn search(&self, index_alias: &str, query: proto::query::Query, collectors: Vec<proto::Collector>) -> SummaResult<Vec<IntermediateExtractionResult>> {
420        self.custom_search(index_alias, query, collectors, None, None, None)
421    }
422
423    #[cfg(feature = "tokio-rt")]
424    pub async fn search_async(
425        &self,
426        index_alias: &str,
427        query: proto::query::Query,
428        collectors: Vec<proto::Collector>,
429    ) -> SummaResult<Vec<IntermediateExtractionResult>> {
430        self.custom_search_async(index_alias, query, collectors, None, None, None).await
431    }
432
433    /// Search `query` in the `IndexHolder` and collecting `Fruit` with a list of `collectors`
434    pub async fn custom_search_async(
435        &self,
436        index_alias: &str,
437        query: proto::query::Query,
438        collectors: Vec<proto::Collector>,
439        is_fieldnorms_scoring_enabled: Option<bool>,
440        load_cache: Option<bool>,
441        store_cache: Option<bool>,
442    ) -> SummaResult<Vec<IntermediateExtractionResult>> {
443        let collectors_len = collectors.len();
444        let mut missed_collector_indices = Vec::with_capacity(collectors_len);
445        let mut collector_outputs = vec![None; collectors_len];
446        let mut adjusted_collectors = Vec::with_capacity(collectors_len);
447        let mut original_collectors = Vec::with_capacity(collectors_len);
448        let load_cache = load_cache.unwrap_or(false);
449        let store_cache = store_cache.unwrap_or(false);
450
451        info!(action = "parse_query", index_name = ?self.index_name, query = ?query);
452        #[cfg(feature = "tokio-rt")]
453        let parsed_query = {
454            let query_parser = self.query_parser.clone();
455            tokio::task::spawn_blocking(move || query_parser.parse_query(query)).await??
456        };
457        #[cfg(not(feature = "tokio-rt"))]
458        let parsed_query = self.query_parser.parse_query(query)?;
459
460        let caching_key = format!("{:?}|{:?}", parsed_query, is_fieldnorms_scoring_enabled);
461
462        if load_cache {
463            let mut cache = self.collector_cache.lock();
464            for (i, collector) in collectors.into_iter().enumerate() {
465                let is_caching_enabled = CollectorCache::is_caching_enabled(&collector);
466                if is_caching_enabled {
467                    let adjusted_collector = CollectorCache::adjust_collector(&collector);
468                    info!(action = "querying_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
469                    match cache.get(&caching_key, &adjusted_collector, &collector) {
470                        Some(cached_value) => {
471                            info!(action = "match_collector_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
472                            collector_outputs[i] = Some(cached_value)
473                        }
474                        None => {
475                            info!(action = "mismatch_collector_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
476                            adjusted_collectors.push(adjusted_collector);
477                            original_collectors.push(collector);
478                            missed_collector_indices.push(i)
479                        }
480                    }
481                } else {
482                    info!(action = "skip_querying_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?collector);
483                    adjusted_collectors.push(collector.clone());
484                    original_collectors.push(collector);
485                    missed_collector_indices.push(i)
486                }
487            }
488            drop(cache);
489            info!(action = "queried_cache", index_name = ?self.index_name, collectors = collectors_len, cached_collectors = collectors_len - adjusted_collectors.len());
490        } else {
491            for collector in collectors.into_iter() {
492                original_collectors.push(collector.clone());
493                if store_cache && CollectorCache::is_caching_enabled(&collector) {
494                    adjusted_collectors.push(CollectorCache::adjust_collector(&collector));
495                } else {
496                    adjusted_collectors.push(collector);
497                }
498            }
499            missed_collector_indices = (0..collectors_len).collect();
500        }
501
502        if adjusted_collectors.is_empty() {
503            info!(action = "served_from_cache", index_name = ?self.index_name, query = ?parsed_query);
504            return Ok(collector_outputs.into_iter().map(Option::unwrap).collect());
505        }
506        let searcher = self.index_reader().searcher();
507        let mut multi_collector = MultiCollector::new();
508        let extractors: Vec<Box<dyn FruitExtractor>> = adjusted_collectors
509            .iter()
510            .map(|collector_proto| {
511                build_fruit_extractor(
512                    self,
513                    index_alias,
514                    searcher.clone(),
515                    collector_proto.clone(),
516                    &parsed_query,
517                    &mut multi_collector,
518                )
519            })
520            .collect::<SummaResult<_>>()?;
521        info!(
522            target: "query",
523            index_name = ?self.index_name,
524            parsed_query = ?parsed_query,
525            is_fieldnorms_scoring_enabled = is_fieldnorms_scoring_enabled,
526        );
527        let mut multi_fruit = self
528            .search_in_segments_async(&searcher, &parsed_query, &multi_collector, is_fieldnorms_scoring_enabled)
529            .await?;
530        if load_cache || store_cache {
531            let mut cache = self.collector_cache.lock();
532            for (((extractor, i), original_collector), adjusted_collector) in extractors
533                .into_iter()
534                .zip(missed_collector_indices.into_iter())
535                .zip(original_collectors.into_iter())
536                .zip(adjusted_collectors.into_iter())
537            {
538                let extracted_result = extractor.extract(&mut multi_fruit)?;
539                if CollectorCache::is_caching_enabled(&original_collector) {
540                    let adjusted_extracted_result = CollectorCache::adjust_result(&extracted_result, &original_collector);
541                    if store_cache {
542                        info!(action = "storing_collector_to_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
543                        cache.put(&caching_key, &adjusted_collector, extracted_result);
544                    };
545                    collector_outputs[i] = Some(adjusted_extracted_result)
546                } else {
547                    collector_outputs[i] = Some(extracted_result)
548                }
549            }
550            drop(cache);
551        } else {
552            for (i, extractor) in extractors.into_iter().enumerate() {
553                collector_outputs[i] = Some(extractor.extract(&mut multi_fruit)?);
554            }
555        }
556        Ok(collector_outputs.into_iter().map(Option::unwrap).collect())
557    }
558
559    /// Search `query` in the `IndexHolder` and collecting `Fruit` with a list of `collectors`
560    pub fn custom_search(
561        &self,
562        index_alias: &str,
563        query: proto::query::Query,
564        collectors: Vec<proto::Collector>,
565        is_fieldnorms_scoring_enabled: Option<bool>,
566        load_cache: Option<bool>,
567        store_cache: Option<bool>,
568    ) -> SummaResult<Vec<IntermediateExtractionResult>> {
569        let collectors_len = collectors.len();
570        let mut missed_collector_indices = Vec::with_capacity(collectors_len);
571        let mut collector_outputs = vec![None; collectors_len];
572        let mut adjusted_collectors = Vec::with_capacity(collectors_len);
573        let mut original_collectors = Vec::with_capacity(collectors_len);
574        let load_cache = load_cache.unwrap_or(false);
575        let store_cache = store_cache.unwrap_or(false);
576
577        info!(action = "parse_query", index_name = ?self.index_name, query = ?query);
578        let parsed_query = self.query_parser.parse_query(query)?;
579
580        let caching_key = format!("{:?}|{:?}", parsed_query, is_fieldnorms_scoring_enabled);
581
582        if load_cache {
583            let mut cache = self.collector_cache.lock();
584            for (i, collector) in collectors.into_iter().enumerate() {
585                let is_caching_enabled = CollectorCache::is_caching_enabled(&collector);
586                if is_caching_enabled {
587                    let adjusted_collector = CollectorCache::adjust_collector(&collector);
588                    info!(action = "querying_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
589                    match cache.get(&caching_key, &adjusted_collector, &collector) {
590                        Some(cached_value) => {
591                            info!(action = "match_collector_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
592                            collector_outputs[i] = Some(cached_value)
593                        }
594                        None => {
595                            info!(action = "mismatch_collector_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
596                            adjusted_collectors.push(adjusted_collector);
597                            original_collectors.push(collector);
598                            missed_collector_indices.push(i)
599                        }
600                    }
601                } else {
602                    info!(action = "skip_querying_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?collector);
603                    adjusted_collectors.push(collector.clone());
604                    original_collectors.push(collector);
605                    missed_collector_indices.push(i)
606                }
607            }
608            drop(cache);
609            info!(action = "queried_cache", index_name = ?self.index_name, collectors = collectors_len, cached_collectors = collectors_len - adjusted_collectors.len());
610        } else {
611            for collector in collectors.into_iter() {
612                original_collectors.push(collector.clone());
613                if store_cache && CollectorCache::is_caching_enabled(&collector) {
614                    adjusted_collectors.push(CollectorCache::adjust_collector(&collector));
615                } else {
616                    adjusted_collectors.push(collector);
617                }
618            }
619            missed_collector_indices = (0..collectors_len).collect();
620        }
621
622        if adjusted_collectors.is_empty() {
623            info!(action = "served_from_cache", index_name = ?self.index_name, query = ?parsed_query);
624            return Ok(collector_outputs.into_iter().map(Option::unwrap).collect());
625        }
626        let searcher = self.index_reader().searcher();
627        let mut multi_collector = MultiCollector::new();
628        let extractors: Vec<Box<dyn FruitExtractor>> = adjusted_collectors
629            .iter()
630            .map(|collector_proto| {
631                build_fruit_extractor(
632                    self,
633                    index_alias,
634                    searcher.clone(),
635                    collector_proto.clone(),
636                    &parsed_query,
637                    &mut multi_collector,
638                )
639            })
640            .collect::<SummaResult<_>>()?;
641        info!(
642            target: "query",
643            index_name = ?self.index_name,
644            parsed_query = ?parsed_query,
645            is_fieldnorms_scoring_enabled = is_fieldnorms_scoring_enabled,
646        );
647        let mut multi_fruit = self.search_in_segments(&searcher, &parsed_query, &multi_collector, is_fieldnorms_scoring_enabled)?;
648        if load_cache || store_cache {
649            let mut cache = self.collector_cache.lock();
650            for (((extractor, i), original_collector), adjusted_collector) in extractors
651                .into_iter()
652                .zip(missed_collector_indices.into_iter())
653                .zip(original_collectors.into_iter())
654                .zip(adjusted_collectors.into_iter())
655            {
656                let extracted_result = extractor.extract(&mut multi_fruit)?;
657                if CollectorCache::is_caching_enabled(&original_collector) {
658                    let adjusted_extracted_result = CollectorCache::adjust_result(&extracted_result, &original_collector);
659                    if store_cache {
660                        info!(action = "storing_collector_to_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
661                        cache.put(&caching_key, &adjusted_collector, extracted_result);
662                    };
663                    collector_outputs[i] = Some(adjusted_extracted_result)
664                } else {
665                    collector_outputs[i] = Some(extracted_result)
666                }
667            }
668            drop(cache);
669        } else {
670            for (i, extractor) in extractors.into_iter().enumerate() {
671                collector_outputs[i] = Some(extractor.extract(&mut multi_fruit)?);
672            }
673        }
674        Ok(collector_outputs.into_iter().map(Option::unwrap).collect())
675    }
676
677    /// Delete `SummaDocument` by `unq`
678    pub async fn delete_documents(&self, query: proto::query::Query) -> SummaResult<u64> {
679        #[cfg(feature = "tokio-rt")]
680        let parsed_query = {
681            let query_parser = self.query_parser.clone();
682            tokio::task::spawn_blocking(move || query_parser.parse_query(query)).await??
683        };
684        #[cfg(not(feature = "tokio-rt"))]
685        let parsed_query = self.query_parser.parse_query(query)?;
686        debug!(action = "acquiring_index_writer_for_read");
687        self.index_writer_holder()?.read().await.delete_by_query(parsed_query)
688    }
689
690    /// Index generic `SummaDocument`
691    ///
692    /// `IndexUpdater` bounds unbounded `SummaDocument` inside
693    pub async fn index_document(&self, document_bytes: &[u8], skip_updated_at_modification: bool) -> SummaResult<()> {
694        let document = SummaDocument::parse_json_bytes(&self.index.schema(), document_bytes, skip_updated_at_modification)?;
695        debug!(action = "acquiring_index_writer_for_read");
696        self.index_writer_holder()?.read().await.index_document(document, self.conflict_strategy())
697    }
698
699    /// Index multiple documents at a time
700    pub async fn index_bulk(&self, documents: &Vec<Vec<u8>>, conflict_strategy: Option<proto::ConflictStrategy>) -> SummaResult<(u64, u64)> {
701        let (mut success_docs, mut failed_docs) = (0u64, 0u64);
702        debug!(action = "acquiring_index_writer_for_read");
703        let index_writer_holder = self.index_writer_holder()?.read().await;
704        let conflict_strategy = conflict_strategy.unwrap_or_else(|| self.conflict_strategy());
705        for document in documents {
706            match SummaDocument::UnboundJsonBytes(document)
707                .bound_with(&self.index.schema())
708                .try_into()
709                .and_then(|document| index_writer_holder.index_document(document, conflict_strategy))
710            {
711                Ok(_) => success_docs += 1,
712                Err(error) => {
713                    warn!(action = "error", error = ?error);
714                    failed_docs += 1
715                }
716            }
717        }
718        Ok((success_docs, failed_docs))
719    }
720
721    #[cfg(feature = "tokio-rt")]
722    pub async fn documents<O: Send + 'static>(
723        &self,
724        searcher: &Searcher,
725        query_filter: &Option<proto::Query>,
726        documents_modifier: impl Fn(tantivy::TantivyDocument) -> Option<O> + Clone + Send + Sync + 'static,
727    ) -> SummaResult<tokio::sync::mpsc::Receiver<O>> {
728        match query_filter {
729            None | Some(proto::Query { query: None }) => {
730                let segment_readers = searcher.segment_readers();
731                let (tx, rx) = tokio::sync::mpsc::channel(segment_readers.len() * 2 + 1);
732                for segment_reader in segment_readers {
733                    let documents_modifier = documents_modifier.clone();
734                    let tx = tx.clone();
735                    let segment_reader = segment_reader.clone();
736                    let span = tracing::Span::current();
737                    tokio::task::spawn_blocking(move || {
738                        span.in_scope(|| {
739                            let store_reader = segment_reader.get_store_reader(1)?;
740                            for document in store_reader.iter(segment_reader.alive_bitset()) {
741                                let Ok(document) = document else {
742                                    info!(action = "broken_document", document = ?document);
743                                    return Ok::<_, Error>(());
744                                };
745                                if let Some(document) = documents_modifier(document) {
746                                    if tx.blocking_send(document).is_err() {
747                                        info!(action = "documents_client_dropped");
748                                        return Ok::<_, Error>(());
749                                    }
750                                }
751                            }
752                            Ok(())
753                        })
754                    });
755                }
756                Ok(rx)
757            }
758            Some(proto::Query { query: Some(query_filter) }) => self.filtered_documents(searcher, query_filter, documents_modifier).await,
759        }
760    }
761
762    #[cfg(feature = "tokio-rt")]
763    async fn filtered_documents<O: Send + 'static>(
764        &self,
765        searcher: &Searcher,
766        query: &proto::query::Query,
767        documents_modifier: impl Fn(tantivy::TantivyDocument) -> Option<O> + Clone + Send + Sync + 'static,
768    ) -> SummaResult<tokio::sync::mpsc::Receiver<O>> {
769        let parsed_query = self.query_parser.parse_query(query.clone())?;
770        let collector = tantivy::collector::DocSetCollector;
771        let segment_readers = searcher.segment_readers();
772        let weight = parsed_query.weight_async(EnableScoring::disabled_from_searcher(searcher)).await?;
773
774        let fruits = join_all(segment_readers.iter().enumerate().map(|(segment_ord, segment_reader)| {
775            let weight_ref = weight.as_ref();
776            collector.collect_segment_async(weight_ref, segment_ord as u32, segment_reader)
777        }))
778        .await
779        .into_iter()
780        .collect::<tantivy::Result<Vec<_>>>()?;
781        let docs = collector.merge_fruits(fruits)?;
782
783        let (tx, rx) = tokio::sync::mpsc::channel(segment_readers.len() * 2 + 1);
784        let span = tracing::Span::current();
785        let searcher = searcher.clone();
786        tokio::task::spawn_blocking(move || {
787            span.in_scope(|| {
788                for doc_address in docs {
789                    let document = searcher.doc(doc_address);
790                    let Ok(document) = document else {
791                        info!(action = "broken_document", document = ?document);
792                        return Ok::<_, Error>(());
793                    };
794                    if let Some(document) = documents_modifier(document) {
795                        if tx.blocking_send(document).is_err() {
796                            info!(action = "documents_client_dropped");
797                            return Ok::<_, Error>(());
798                        }
799                    }
800                }
801                Ok(())
802            })
803        });
804        Ok(rx)
805    }
806
807    pub fn space_usage(&self) -> SummaResult<SearcherSpaceUsage> {
808        let index_reader = self.index_reader();
809        index_reader.reload()?;
810        Ok(index_reader.searcher().space_usage()?)
811    }
812
813    pub fn clear_collector_cache(&self) {
814        self.collector_cache.lock().remove_expired()
815    }
816}
817
818#[cfg(test)]
819pub mod tests {
820    use std::error::Error;
821    use std::sync::Arc;
822
823    use serde_json::json;
824    use summa_proto::proto;
825    use summa_proto::proto::ConflictStrategy;
826    use tantivy::collector::{Count, TopDocs};
827    use tantivy::query::{AllQuery, TermQuery};
828    use tantivy::schema::{IndexRecordOption, Value};
829    use tantivy::{doc, IndexBuilder, TantivyDocument, Term};
830
831    use crate::components::index_holder::register_default_tokenizers;
832    use crate::components::test_utils::{create_test_schema, generate_documents};
833    use crate::components::{IndexWriterHolder, SummaDocument};
834    use crate::configs::core::WriterThreads;
835
836    #[test]
837    fn test_deletes() -> Result<(), Box<dyn Error>> {
838        let schema = create_test_schema();
839        let index = IndexBuilder::new()
840            .schema(schema.clone())
841            .index_attributes(proto::IndexAttributes {
842                unique_fields: vec!["id".to_string()],
843                ..Default::default()
844            })
845            .create_in_ram()?;
846        register_default_tokenizers(&index);
847        let mut index_writer_holder = IndexWriterHolder::create(
848            &index,
849            WriterThreads::N(12),
850            1024 * 1024 * 1024,
851            Arc::new(tantivy::merge_policy::LogMergePolicy::default()),
852        )?;
853        let mut last_document = None;
854        for document in generate_documents(&schema, 10000) {
855            let document: TantivyDocument = SummaDocument::parse_json_bytes(&schema, document.as_bytes(), false)?;
856            last_document = Some(document.clone());
857            index_writer_holder.index_document(document, ConflictStrategy::Merge)?;
858        }
859        let last_document = last_document.clone().unwrap();
860        let id = last_document.get_first(schema.get_field("id").unwrap()).unwrap().as_i64().unwrap();
861        let modified_last_document = doc!(
862            schema.get_field("id").unwrap() => id,
863            schema.get_field("issued_at").unwrap() => 100i64
864        );
865        index_writer_holder.commit()?;
866        for document in generate_documents(&schema, 1000) {
867            let document = SummaDocument::parse_json_bytes(&schema, document.as_bytes(), false)?;
868            index_writer_holder.index_document(modified_last_document.clone(), ConflictStrategy::Merge)?;
869            index_writer_holder.index_document(document, ConflictStrategy::Merge)?;
870        }
871        index_writer_holder.commit()?;
872        let reader = index.reader()?;
873        reader.reload()?;
874        let searcher = reader.searcher();
875        let counter = searcher.search(
876            &TermQuery::new(Term::from_field_i64(schema.get_field("id").unwrap(), id), IndexRecordOption::WithFreqs),
877            &Count,
878        )?;
879        assert_eq!(counter, 1);
880        Ok(())
881    }
882
883    #[test]
884    fn test_mapped_fields() -> Result<(), Box<dyn Error>> {
885        let schema = create_test_schema();
886        let metadata_field = schema.get_field("metadata").expect("no field");
887        let tags_field = schema.get_field("tags").expect("no field");
888        let title_field = schema.get_field("title").expect("no field");
889        let extra_field = schema.get_field("extra").expect("no field");
890        let index = IndexBuilder::new()
891            .schema(schema.clone())
892            .index_attributes(proto::IndexAttributes {
893                mapped_fields: vec![
894                    proto::MappedField {
895                        source_field: "metadata.isbn".to_string(),
896                        target_field: "extra".to_string(),
897                    },
898                    proto::MappedField {
899                        source_field: "tags".to_string(),
900                        target_field: "extra".to_string(),
901                    },
902                    proto::MappedField {
903                        source_field: "title".to_string(),
904                        target_field: "extra".to_string(),
905                    },
906                ],
907                ..Default::default()
908            })
909            .create_in_ram()?;
910        register_default_tokenizers(&index);
911        let mut index_writer_holder = IndexWriterHolder::create(
912            &index,
913            WriterThreads::N(12),
914            1024 * 1024 * 1024,
915            Arc::new(tantivy::merge_policy::LogMergePolicy::default()),
916        )?;
917        index_writer_holder.index_document(
918            doc!(
919                title_field => "Hitchhiker's guide",
920                metadata_field => json!({"isbn": ["100", "200", "300", "500", "600"], "another_title": "Super Title"}),
921                tags_field => "reality",
922            ),
923            ConflictStrategy::Merge,
924        )?;
925        index_writer_holder.index_document(
926            doc!(
927                title_field => "Envy of the Stars",
928                metadata_field => json!({"isbn": "500"}),
929                tags_field => "scifi",
930                tags_field => "novel",
931            ),
932            ConflictStrategy::Merge,
933        )?;
934        index_writer_holder.commit()?;
935        let reader = index.reader()?;
936        reader.reload()?;
937        let searcher = reader.searcher();
938        let docs = searcher
939            .search(
940                &TermQuery::new(Term::from_field_text(extra_field, "500"), IndexRecordOption::Basic),
941                &TopDocs::with_limit(10),
942            )?
943            .into_iter()
944            .map(|x| {
945                searcher
946                    .doc::<TantivyDocument>(x.1)
947                    .unwrap()
948                    .get_first(title_field)
949                    .unwrap()
950                    .as_str()
951                    .map(|x| x.to_string())
952                    .unwrap()
953            })
954            .collect::<Vec<_>>();
955        assert_eq!(format!("{:?}", docs), "[\"Envy of the Stars\", \"Hitchhiker's guide\"]");
956        let docs = searcher
957            .search(
958                &TermQuery::new(Term::from_field_text(extra_field, "scifi"), IndexRecordOption::Basic),
959                &TopDocs::with_limit(10),
960            )?
961            .into_iter()
962            .map(|x| {
963                searcher
964                    .doc::<TantivyDocument>(x.1)
965                    .unwrap()
966                    .get_first(title_field)
967                    .unwrap()
968                    .as_str()
969                    .map(|x| x.to_string())
970                    .unwrap()
971            })
972            .collect::<Vec<_>>();
973        assert_eq!(format!("{:?}", docs), "[\"Envy of the Stars\"]");
974        Ok(())
975    }
976
977    #[test]
978    fn test_dict_tokenizer() -> Result<(), Box<dyn Error>> {
979        let schema = create_test_schema();
980        let title_field = schema.get_field("title").expect("no field");
981        let concepts_field = schema.get_field("concepts").expect("no field");
982        let index = IndexBuilder::new()
983            .schema(schema.clone())
984            .index_attributes(proto::IndexAttributes {
985                mapped_fields: vec![proto::MappedField {
986                    source_field: "title".to_string(),
987                    target_field: "concepts".to_string(),
988                }],
989                ..Default::default()
990            })
991            .create_in_ram()?;
992        register_default_tokenizers(&index);
993        let mut index_writer_holder = IndexWriterHolder::create(
994            &index,
995            WriterThreads::N(12),
996            1024 * 1024 * 1024,
997            Arc::new(tantivy::merge_policy::LogMergePolicy::default()),
998        )?;
999        index_writer_holder.index_document(
1000            doc!(
1001                title_field => "CAGH44 gene (not CAGH45) can be correlated with autism disorder. Do not try to treat it with aspirin",
1002            ),
1003            ConflictStrategy::Merge,
1004        )?;
1005        index_writer_holder.commit()?;
1006        let reader = index.reader()?;
1007        reader.reload()?;
1008        let searcher = reader.searcher();
1009        let docs = searcher
1010            .search(
1011                &TermQuery::new(Term::from_field_text(concepts_field, "acetylsalicylic acid"), IndexRecordOption::Basic),
1012                &TopDocs::with_limit(10),
1013            )?
1014            .into_iter()
1015            .map(|x| {
1016                searcher
1017                    .doc::<TantivyDocument>(x.1)
1018                    .unwrap()
1019                    .get_first(title_field)
1020                    .unwrap()
1021                    .as_str()
1022                    .map(|x| x.to_string())
1023                    .unwrap()
1024            })
1025            .collect::<Vec<_>>();
1026        assert_eq!(
1027            format!("{:?}", docs),
1028            "[\"CAGH44 gene (not CAGH45) can be correlated with autism disorder. Do not try to treat it with aspirin\"]"
1029        );
1030        Ok(())
1031    }
1032
1033    #[test]
1034    fn test_unique_json_fields() -> Result<(), Box<dyn Error>> {
1035        let schema = create_test_schema();
1036        let index = IndexBuilder::new()
1037            .schema(schema.clone())
1038            .index_attributes(proto::IndexAttributes {
1039                unique_fields: vec!["metadata.id".to_string()],
1040                ..Default::default()
1041            })
1042            .create_in_ram()?;
1043        register_default_tokenizers(&index);
1044        let mut index_writer_holder = IndexWriterHolder::create(
1045            &index,
1046            WriterThreads::N(12),
1047            1024 * 1024 * 1024,
1048            Arc::new(tantivy::merge_policy::LogMergePolicy::default()),
1049        )?;
1050
1051        index_writer_holder.index_document(
1052            doc!(schema.get_field("metadata").expect("no field") => json!({"id": 1})),
1053            ConflictStrategy::Merge,
1054        )?;
1055        index_writer_holder.index_document(
1056            doc!(schema.get_field("metadata").expect("no field") => json!({"id": 2})),
1057            ConflictStrategy::Merge,
1058        )?;
1059        index_writer_holder.index_document(
1060            doc!(schema.get_field("metadata").expect("no field") => json!({"id": 3})),
1061            ConflictStrategy::Merge,
1062        )?;
1063        index_writer_holder.commit()?;
1064        let reader = index.reader()?;
1065        reader.reload()?;
1066        let searcher = reader.searcher();
1067        let counter = searcher.search(&AllQuery, &Count)?;
1068        assert_eq!(counter, 3);
1069        index_writer_holder.index_document(
1070            doc!(schema.get_field("metadata").expect("no field") => json!({"id": "g"})),
1071            ConflictStrategy::Merge,
1072        )?;
1073        index_writer_holder.commit()?;
1074        let reader = index.reader()?;
1075        reader.reload()?;
1076        let searcher = reader.searcher();
1077        let counter = searcher.search(&AllQuery, &Count)?;
1078        assert_eq!(counter, 4);
1079        index_writer_holder.index_document(
1080            doc!(schema.get_field("metadata").expect("no field") => json!({"id": "g"})),
1081            ConflictStrategy::Merge,
1082        )?;
1083        index_writer_holder.commit()?;
1084        let reader = index.reader()?;
1085        reader.reload()?;
1086        let searcher = reader.searcher();
1087        let counter = searcher.search(&AllQuery, &Count)?;
1088        assert_eq!(counter, 4);
1089        index_writer_holder.index_document(
1090            doc!(schema.get_field("metadata").expect("no field") => json!({"id": 2})),
1091            ConflictStrategy::Merge,
1092        )?;
1093        index_writer_holder.index_document(
1094            doc!(schema.get_field("metadata").expect("no field") => json!({"id": 4})),
1095            ConflictStrategy::Merge,
1096        )?;
1097        index_writer_holder.commit()?;
1098        let reader = index.reader()?;
1099        reader.reload()?;
1100        let searcher = reader.searcher();
1101        let counter = searcher.search(&AllQuery, &Count)?;
1102        assert_eq!(counter, 5);
1103        Ok(())
1104    }
1105}