1use std::collections::HashSet;
2use std::fmt::Debug;
3use std::hash::{Hash, Hasher};
4use std::path::Path;
5use std::sync::Arc;
6
7use futures::future::{join_all, try_join_all};
8use serde::Deserialize;
9use summa_proto::proto;
10use summa_proto::proto::IndexAttributes;
11use tantivy::collector::{Collector, MultiCollector, MultiFruit};
12use tantivy::directory::OwnedBytes;
13use tantivy::query::{EnableScoring, Query};
14use tantivy::schema::{Field, Schema};
15use tantivy::space_usage::SearcherSpaceUsage;
16use tantivy::{Directory, Index, IndexBuilder, IndexReader, Opstamp, ReloadPolicy, Searcher};
17use tokio::sync::RwLock;
18use tracing::{debug, error, info, instrument, trace, warn};
19
20use super::SummaSegmentAttributes;
21use super::{build_fruit_extractor, default_tokenizers, FruitExtractor, ProtoQueryParser};
22use crate::components::collector_cache::CollectorCache;
23use crate::components::fruit_extractors::IntermediateExtractionResult;
24use crate::components::segment_attributes::SegmentAttributesMergerImpl;
25use crate::components::{IndexWriterHolder, SummaDocument};
26use crate::configs::ConfigProxy;
27use crate::directories::{CachingDirectory, ExternalRequest, ExternalRequestGenerator, FileStats, HotDirectory, NetworkDirectory, StaticDirectoryCache};
28use crate::errors::{SummaResult, ValidationError};
29use crate::proto_traits::Wrapper;
30use crate::Error;
31
32pub struct IndexHolder {
33 index_engine_config: Arc<dyn ConfigProxy<proto::IndexEngineConfig>>,
34 index_name: String,
35 index: Index,
36 cached_index_attributes: Option<IndexAttributes>,
37 cached_schema: Schema,
38 cached_multi_fields: HashSet<Field>,
39 index_reader: IndexReader,
40 index_writer_holder: Option<Arc<RwLock<IndexWriterHolder>>>,
41 query_parser: ProtoQueryParser,
42 collector_cache: parking_lot::Mutex<CollectorCache>,
44}
45
46impl Hash for IndexHolder {
47 fn hash<H: Hasher>(&self, state: &mut H) {
48 self.index_name.hash(state)
49 }
50}
51
52impl PartialEq<Self> for IndexHolder {
53 fn eq(&self, other: &Self) -> bool {
54 self.index_name.eq(&other.index_name)
55 }
56}
57
58#[derive(Deserialize)]
59struct LightMeta {
60 pub opstamp: Opstamp,
61}
62
63impl Debug for IndexHolder {
64 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
65 f.debug_struct("IndexHolder")
66 .field("index_name", &self.index_name)
67 .field("index_settings", &self.index.settings())
68 .finish()
69 }
70}
71
72pub fn register_default_tokenizers(index: &Index) {
76 for (tokenizer_name, tokenizer) in &default_tokenizers() {
77 index.tokenizers().register(tokenizer_name, tokenizer.clone())
78 }
79}
80
81pub async fn cleanup_index(index_engine_config: proto::IndexEngineConfig) -> SummaResult<()> {
86 match index_engine_config.config {
87 #[cfg(feature = "fs")]
88 Some(proto::index_engine_config::Config::File(ref config)) => {
89 info!(action = "delete_directory", directory = ?config.path);
90 tokio::fs::remove_dir_all(&config.path)
91 .await
92 .map_err(|e| Error::IO((e, Some(std::path::PathBuf::from(&config.path)))))?;
93 }
94 _ => (),
95 };
96 Ok(())
97}
98
99pub async fn read_opstamp<D: Directory>(directory: &D) -> SummaResult<Opstamp> {
100 let meta = directory.atomic_read_async(Path::new("meta.json")).await.map_err(|e| Error::Anyhow(e.into()))?;
101 let meta_string = String::from_utf8(meta).map_err(|e| Error::Anyhow(e.into()))?;
102 let meta_json: LightMeta = match serde_json::from_str(&meta_string) {
103 Ok(meta_json) => meta_json,
104 Err(e) => {
105 error!(action = "invalid_json", json = meta_string);
106 Err(e)?
107 }
108 };
109 Ok(meta_json.opstamp)
110}
111
112fn wrap_with_caches<D: Directory>(
113 directory: D,
114 hotcache_bytes: Option<OwnedBytes>,
115 cache_config: Option<proto::CacheConfig>,
116 opstamp: Opstamp,
117) -> SummaResult<Box<dyn Directory>> {
118 let static_cache = hotcache_bytes
119 .map(|hotcache_bytes| StaticDirectoryCache::open(hotcache_bytes, opstamp))
120 .transpose()?;
121 info!(action = "opened_static_cache", static_cache = ?static_cache);
122 let file_lengths = static_cache
123 .as_ref()
124 .map(|static_cache| static_cache.file_lengths().clone())
125 .unwrap_or_default();
126
127 info!(action = "read_file_lengths", file_lengths = ?file_lengths);
128 let file_stats = FileStats::from_file_lengths(file_lengths);
129 let next_directory = if let Some(cache_config) = cache_config {
130 Box::new(CachingDirectory::bounded(Arc::new(directory), cache_config.cache_size as usize, file_stats)) as Box<dyn Directory>
131 } else {
132 Box::new(directory) as Box<dyn Directory>
133 };
134 info!(action = "wrapped_with_cache");
135 Ok(match static_cache {
136 Some(static_cache) => {
137 let hot_directory = HotDirectory::open(next_directory, static_cache);
138 info!(action = "opened_hotcache", hot_directory = ?hot_directory);
139 Box::new(hot_directory?)
140 }
141 None => next_directory,
142 })
143}
144
145impl IndexHolder {
146 pub fn create_holder(
148 core_config: &crate::configs::core::Config,
149 mut index: Index,
150 index_name: &str,
151 index_engine_config: Arc<dyn ConfigProxy<proto::IndexEngineConfig>>,
152 merge_policy: Option<proto::MergePolicy>,
153 query_parser_config: proto::QueryParserConfig,
154 ) -> SummaResult<IndexHolder> {
155 register_default_tokenizers(&index);
156
157 index.settings_mut().docstore_compress_threads = core_config.doc_store_compress_threads;
158 index.set_segment_attributes_merger(Arc::new(SegmentAttributesMergerImpl::<SummaSegmentAttributes>::new()));
159
160 let metas = index.load_metas()?;
161 let cached_schema = index.schema();
162 let cached_index_attributes: Option<IndexAttributes> = metas.index_attributes()?;
163 let cached_multi_fields = cached_index_attributes
164 .as_ref()
165 .map(|index_attributes| {
166 index_attributes
167 .multi_fields
168 .iter()
169 .map(|field_name| Ok::<_, Error>(cached_schema.get_field(field_name)?))
170 .collect()
171 })
172 .transpose()?
173 .unwrap_or_default();
174
175 let query_parser = ProtoQueryParser::for_index(&index, query_parser_config)?;
176 let index_reader = index
177 .reader_builder()
178 .doc_store_cache_num_blocks(core_config.doc_store_cache_num_blocks)
179 .reload_policy(ReloadPolicy::OnCommitWithDelay)
180 .try_into()?;
181 index_reader.reload()?;
182
183 let index_writer_holder = if let Some(writer_threads) = &core_config.writer_threads {
184 let merge_policy = Wrapper::from(merge_policy).into();
185 info!(action = "create_index_writer", merge_policy = ?merge_policy, writer_threads = ?writer_threads, writer_heap_size_bytes = core_config.writer_heap_size_bytes);
186 Some(Arc::new(RwLock::new(IndexWriterHolder::create(
187 &index,
188 writer_threads.clone(),
189 core_config.writer_heap_size_bytes as usize,
190 merge_policy,
191 )?)))
192 } else {
193 None
194 };
195
196 Ok(IndexHolder {
197 index_engine_config,
198 index_name: index_name.to_string(),
199 index: index.clone(),
200 query_parser,
201 cached_schema,
202 cached_index_attributes,
203 cached_multi_fields,
204 index_reader,
205 index_writer_holder,
206 collector_cache: parking_lot::Mutex::new(CollectorCache::new(&core_config.collector_cache)),
207 })
208 }
209
210 #[instrument(skip_all)]
212 pub fn create_memory_index(index_builder: IndexBuilder) -> SummaResult<Index> {
213 let index = index_builder.create_in_ram()?;
214 info!(action = "created", index = ?index);
215 Ok(index)
216 }
217
218 #[instrument(skip_all)]
220 #[cfg(feature = "fs")]
221 pub async fn create_file_index(index_path: &Path, index_builder: IndexBuilder) -> SummaResult<Index> {
222 if index_path.exists() {
223 return Err(ValidationError::ExistingPath(index_path.to_owned()).into());
224 }
225 tokio::fs::create_dir_all(index_path).await?;
226 let index = index_builder.create_in_dir(index_path)?;
227 info!(action = "created", index = ?index);
228 Ok(index)
229 }
230
231 #[instrument(skip_all)]
233 #[cfg(feature = "fs")]
234 pub async fn open_file_index(file_engine_config: &proto::FileEngineConfig) -> SummaResult<Index> {
235 let index = Index::open_in_dir(&file_engine_config.path)?;
236 info!(action = "opened", config = ?file_engine_config);
237 Ok(index)
238 }
239
240 pub async fn open_remote_index<
241 TExternalRequest: ExternalRequest + 'static,
242 TExternalRequestGenerator: ExternalRequestGenerator<TExternalRequest> + 'static,
243 >(
244 remote_engine_config: proto::RemoteEngineConfig,
245 read_hotcache: bool,
246 ) -> SummaResult<Index> {
247 info!(action = "opening_network_directory", config = ?remote_engine_config, read_hotcache = read_hotcache);
248 let network_directory = NetworkDirectory::open(Box::new(TExternalRequestGenerator::new(remote_engine_config.clone())));
249 let opstamp = read_opstamp(&network_directory).await?;
250 let hotcache_bytes = match network_directory
251 .get_network_file_handle(format!("hotcache.{}.bin", opstamp).as_ref())
252 .do_read_bytes_async(None)
253 .await
254 {
255 Ok(hotcache_bytes) => {
256 if read_hotcache {
257 info!(action = "read_hotcache", len = hotcache_bytes.len());
258 Some(hotcache_bytes)
259 } else {
260 warn!(action = "omit_hotcache");
261 None
262 }
263 }
264 Err(error) => {
265 warn!(action = "error_hotcache", error = ?error);
266 None
267 }
268 };
269 let directory = wrap_with_caches(network_directory, hotcache_bytes, remote_engine_config.cache_config, opstamp)?;
270 Ok(Index::open_async(directory).await?)
271 }
272
273 pub fn compression(&self) -> proto::Compression {
275 Wrapper::from(self.index_reader().searcher().index().settings().docstore_compression).into_inner()
276 }
277
278 pub fn index_attributes(&self) -> Option<&IndexAttributes> {
280 self.cached_index_attributes.as_ref()
281 }
282
283 pub fn conflict_strategy(&self) -> proto::ConflictStrategy {
285 self.index_attributes().as_ref().map(|c| c.conflict_strategy()).unwrap_or_default()
286 }
287
288 pub fn index_name(&self) -> &str {
290 &self.index_name
291 }
292
293 pub fn index_reader(&self) -> &IndexReader {
295 &self.index_reader
296 }
297
298 pub fn index(&self) -> &Index {
300 &self.index
301 }
302
303 pub fn index_engine_config(&self) -> &Arc<dyn ConfigProxy<proto::IndexEngineConfig>> {
304 &self.index_engine_config
305 }
306
307 pub fn index_writer_holder(&self) -> SummaResult<&Arc<RwLock<IndexWriterHolder>>> {
308 self.index_writer_holder
309 .as_ref()
310 .ok_or_else(|| Error::ReadOnlyIndex(self.index_name.to_string()))
311 }
312
313 pub fn schema(&self) -> &Schema {
315 &self.cached_schema
316 }
317
318 pub fn multi_fields(&self) -> &HashSet<Field> {
320 &self.cached_multi_fields
321 }
322
323 pub fn real_directory(&self) -> &dyn Directory {
325 self.index.directory().real_directory()
326 }
327
328 pub async fn partial_warmup<T: AsRef<str>>(&self, load_dictionaries: bool, fields: &[T]) -> SummaResult<()> {
330 let searcher = self.index_reader().searcher();
331 let mut warm_up_futures = Vec::new();
332 let default_fields = fields
333 .iter()
334 .map(|field_name| {
335 self.cached_schema
336 .find_field(field_name.as_ref())
337 .map(|x| x.0)
338 .ok_or_else(|| ValidationError::MissingField(field_name.as_ref().to_string()))
339 })
340 .collect::<Result<HashSet<_>, _>>()?;
341 for field in default_fields {
342 for segment_reader in searcher.segment_readers() {
343 let inverted_index = segment_reader.inverted_index_async(field).await?.clone();
344 if load_dictionaries {
345 warm_up_futures.push(async move {
346 let dict = inverted_index.terms();
347 info!(action = "warming_up_dictionary", index_name = ?self.index_name());
348 dict.warm_up_dictionary_async().await
349 });
350 }
351 }
352 }
353 info!(action = "warming_up");
354 try_join_all(warm_up_futures).await?;
355 Ok(())
356 }
357
358 pub async fn full_warmup(&self) -> SummaResult<()> {
360 let managed_directory = self.index.directory();
361 info!(action = "warming_up");
362 join_all(managed_directory.list_managed_files().into_iter().map(move |file| {
363 let file_name = file.to_string_lossy().to_string();
364 async move {
365 info!(action = "start_reading_file", index_name = ?self.index_name(), file_name = ?file_name);
366 managed_directory.atomic_read_async(&file).await.map_err(|e| Error::Tantivy(e.into()))?;
367 info!(action = "finished_reading_file", index_name = ?self.index_name(), file_name = ?file_name);
368 Ok(())
369 }
370 }))
371 .await
372 .into_iter()
373 .collect::<SummaResult<Vec<_>>>()?;
374 Ok(())
375 }
376
377 pub async fn search_in_segments_async(
379 &self,
380 searcher: &Searcher,
381 query: &dyn Query,
382 collector: &MultiCollector<'_>,
383 is_fieldnorms_scoring_enabled: Option<bool>,
384 ) -> tantivy::Result<MultiFruit> {
385 let enabled_scoring = match (is_fieldnorms_scoring_enabled, collector.requires_scoring()) {
386 (Some(true), true) | (None, true) => EnableScoring::enabled_from_searcher(searcher),
387 (Some(false), true) => EnableScoring::enabled_from_searcher_without_fieldnorms(searcher),
388 (_, false) => EnableScoring::disabled_from_searcher(searcher),
389 };
390 let segment_readers = searcher.segment_readers();
391 trace!(index_name = ?self.index_name, action = "weight");
392 let weight = query.weight_async(enabled_scoring).await?;
393 trace!(index_name = ?self.index_name, action = "collect_segment");
394 let fruits = join_all(segment_readers.iter().enumerate().map(|(segment_ord, segment_reader)| {
395 let weight_ref = weight.as_ref();
396 collector.collect_segment_async(weight_ref, segment_ord as u32, segment_reader)
397 }))
398 .await
399 .into_iter()
400 .collect::<tantivy::Result<Vec<_>>>()?;
401 collector.merge_fruits(fruits)
402 }
403
404 pub fn search_in_segments(
405 &self,
406 searcher: &Searcher,
407 query: &dyn Query,
408 collector: &MultiCollector<'_>,
409 is_fieldnorms_scoring_enabled: Option<bool>,
410 ) -> tantivy::Result<MultiFruit> {
411 let enabled_scoring = match (is_fieldnorms_scoring_enabled, collector.requires_scoring()) {
412 (Some(true), true) | (None, true) => EnableScoring::enabled_from_searcher(searcher),
413 (Some(false), true) => EnableScoring::enabled_from_searcher_without_fieldnorms(searcher),
414 (_, false) => EnableScoring::disabled_from_searcher(searcher),
415 };
416 searcher.search_with_executor(query, collector, searcher.index().search_executor(), enabled_scoring)
417 }
418
419 pub fn search(&self, index_alias: &str, query: proto::query::Query, collectors: Vec<proto::Collector>) -> SummaResult<Vec<IntermediateExtractionResult>> {
420 self.custom_search(index_alias, query, collectors, None, None, None)
421 }
422
423 #[cfg(feature = "tokio-rt")]
424 pub async fn search_async(
425 &self,
426 index_alias: &str,
427 query: proto::query::Query,
428 collectors: Vec<proto::Collector>,
429 ) -> SummaResult<Vec<IntermediateExtractionResult>> {
430 self.custom_search_async(index_alias, query, collectors, None, None, None).await
431 }
432
433 pub async fn custom_search_async(
435 &self,
436 index_alias: &str,
437 query: proto::query::Query,
438 collectors: Vec<proto::Collector>,
439 is_fieldnorms_scoring_enabled: Option<bool>,
440 load_cache: Option<bool>,
441 store_cache: Option<bool>,
442 ) -> SummaResult<Vec<IntermediateExtractionResult>> {
443 let collectors_len = collectors.len();
444 let mut missed_collector_indices = Vec::with_capacity(collectors_len);
445 let mut collector_outputs = vec![None; collectors_len];
446 let mut adjusted_collectors = Vec::with_capacity(collectors_len);
447 let mut original_collectors = Vec::with_capacity(collectors_len);
448 let load_cache = load_cache.unwrap_or(false);
449 let store_cache = store_cache.unwrap_or(false);
450
451 info!(action = "parse_query", index_name = ?self.index_name, query = ?query);
452 #[cfg(feature = "tokio-rt")]
453 let parsed_query = {
454 let query_parser = self.query_parser.clone();
455 tokio::task::spawn_blocking(move || query_parser.parse_query(query)).await??
456 };
457 #[cfg(not(feature = "tokio-rt"))]
458 let parsed_query = self.query_parser.parse_query(query)?;
459
460 let caching_key = format!("{:?}|{:?}", parsed_query, is_fieldnorms_scoring_enabled);
461
462 if load_cache {
463 let mut cache = self.collector_cache.lock();
464 for (i, collector) in collectors.into_iter().enumerate() {
465 let is_caching_enabled = CollectorCache::is_caching_enabled(&collector);
466 if is_caching_enabled {
467 let adjusted_collector = CollectorCache::adjust_collector(&collector);
468 info!(action = "querying_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
469 match cache.get(&caching_key, &adjusted_collector, &collector) {
470 Some(cached_value) => {
471 info!(action = "match_collector_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
472 collector_outputs[i] = Some(cached_value)
473 }
474 None => {
475 info!(action = "mismatch_collector_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
476 adjusted_collectors.push(adjusted_collector);
477 original_collectors.push(collector);
478 missed_collector_indices.push(i)
479 }
480 }
481 } else {
482 info!(action = "skip_querying_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?collector);
483 adjusted_collectors.push(collector.clone());
484 original_collectors.push(collector);
485 missed_collector_indices.push(i)
486 }
487 }
488 drop(cache);
489 info!(action = "queried_cache", index_name = ?self.index_name, collectors = collectors_len, cached_collectors = collectors_len - adjusted_collectors.len());
490 } else {
491 for collector in collectors.into_iter() {
492 original_collectors.push(collector.clone());
493 if store_cache && CollectorCache::is_caching_enabled(&collector) {
494 adjusted_collectors.push(CollectorCache::adjust_collector(&collector));
495 } else {
496 adjusted_collectors.push(collector);
497 }
498 }
499 missed_collector_indices = (0..collectors_len).collect();
500 }
501
502 if adjusted_collectors.is_empty() {
503 info!(action = "served_from_cache", index_name = ?self.index_name, query = ?parsed_query);
504 return Ok(collector_outputs.into_iter().map(Option::unwrap).collect());
505 }
506 let searcher = self.index_reader().searcher();
507 let mut multi_collector = MultiCollector::new();
508 let extractors: Vec<Box<dyn FruitExtractor>> = adjusted_collectors
509 .iter()
510 .map(|collector_proto| {
511 build_fruit_extractor(
512 self,
513 index_alias,
514 searcher.clone(),
515 collector_proto.clone(),
516 &parsed_query,
517 &mut multi_collector,
518 )
519 })
520 .collect::<SummaResult<_>>()?;
521 info!(
522 target: "query",
523 index_name = ?self.index_name,
524 parsed_query = ?parsed_query,
525 is_fieldnorms_scoring_enabled = is_fieldnorms_scoring_enabled,
526 );
527 let mut multi_fruit = self
528 .search_in_segments_async(&searcher, &parsed_query, &multi_collector, is_fieldnorms_scoring_enabled)
529 .await?;
530 if load_cache || store_cache {
531 let mut cache = self.collector_cache.lock();
532 for (((extractor, i), original_collector), adjusted_collector) in extractors
533 .into_iter()
534 .zip(missed_collector_indices.into_iter())
535 .zip(original_collectors.into_iter())
536 .zip(adjusted_collectors.into_iter())
537 {
538 let extracted_result = extractor.extract(&mut multi_fruit)?;
539 if CollectorCache::is_caching_enabled(&original_collector) {
540 let adjusted_extracted_result = CollectorCache::adjust_result(&extracted_result, &original_collector);
541 if store_cache {
542 info!(action = "storing_collector_to_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
543 cache.put(&caching_key, &adjusted_collector, extracted_result);
544 };
545 collector_outputs[i] = Some(adjusted_extracted_result)
546 } else {
547 collector_outputs[i] = Some(extracted_result)
548 }
549 }
550 drop(cache);
551 } else {
552 for (i, extractor) in extractors.into_iter().enumerate() {
553 collector_outputs[i] = Some(extractor.extract(&mut multi_fruit)?);
554 }
555 }
556 Ok(collector_outputs.into_iter().map(Option::unwrap).collect())
557 }
558
559 pub fn custom_search(
561 &self,
562 index_alias: &str,
563 query: proto::query::Query,
564 collectors: Vec<proto::Collector>,
565 is_fieldnorms_scoring_enabled: Option<bool>,
566 load_cache: Option<bool>,
567 store_cache: Option<bool>,
568 ) -> SummaResult<Vec<IntermediateExtractionResult>> {
569 let collectors_len = collectors.len();
570 let mut missed_collector_indices = Vec::with_capacity(collectors_len);
571 let mut collector_outputs = vec![None; collectors_len];
572 let mut adjusted_collectors = Vec::with_capacity(collectors_len);
573 let mut original_collectors = Vec::with_capacity(collectors_len);
574 let load_cache = load_cache.unwrap_or(false);
575 let store_cache = store_cache.unwrap_or(false);
576
577 info!(action = "parse_query", index_name = ?self.index_name, query = ?query);
578 let parsed_query = self.query_parser.parse_query(query)?;
579
580 let caching_key = format!("{:?}|{:?}", parsed_query, is_fieldnorms_scoring_enabled);
581
582 if load_cache {
583 let mut cache = self.collector_cache.lock();
584 for (i, collector) in collectors.into_iter().enumerate() {
585 let is_caching_enabled = CollectorCache::is_caching_enabled(&collector);
586 if is_caching_enabled {
587 let adjusted_collector = CollectorCache::adjust_collector(&collector);
588 info!(action = "querying_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
589 match cache.get(&caching_key, &adjusted_collector, &collector) {
590 Some(cached_value) => {
591 info!(action = "match_collector_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
592 collector_outputs[i] = Some(cached_value)
593 }
594 None => {
595 info!(action = "mismatch_collector_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
596 adjusted_collectors.push(adjusted_collector);
597 original_collectors.push(collector);
598 missed_collector_indices.push(i)
599 }
600 }
601 } else {
602 info!(action = "skip_querying_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?collector);
603 adjusted_collectors.push(collector.clone());
604 original_collectors.push(collector);
605 missed_collector_indices.push(i)
606 }
607 }
608 drop(cache);
609 info!(action = "queried_cache", index_name = ?self.index_name, collectors = collectors_len, cached_collectors = collectors_len - adjusted_collectors.len());
610 } else {
611 for collector in collectors.into_iter() {
612 original_collectors.push(collector.clone());
613 if store_cache && CollectorCache::is_caching_enabled(&collector) {
614 adjusted_collectors.push(CollectorCache::adjust_collector(&collector));
615 } else {
616 adjusted_collectors.push(collector);
617 }
618 }
619 missed_collector_indices = (0..collectors_len).collect();
620 }
621
622 if adjusted_collectors.is_empty() {
623 info!(action = "served_from_cache", index_name = ?self.index_name, query = ?parsed_query);
624 return Ok(collector_outputs.into_iter().map(Option::unwrap).collect());
625 }
626 let searcher = self.index_reader().searcher();
627 let mut multi_collector = MultiCollector::new();
628 let extractors: Vec<Box<dyn FruitExtractor>> = adjusted_collectors
629 .iter()
630 .map(|collector_proto| {
631 build_fruit_extractor(
632 self,
633 index_alias,
634 searcher.clone(),
635 collector_proto.clone(),
636 &parsed_query,
637 &mut multi_collector,
638 )
639 })
640 .collect::<SummaResult<_>>()?;
641 info!(
642 target: "query",
643 index_name = ?self.index_name,
644 parsed_query = ?parsed_query,
645 is_fieldnorms_scoring_enabled = is_fieldnorms_scoring_enabled,
646 );
647 let mut multi_fruit = self.search_in_segments(&searcher, &parsed_query, &multi_collector, is_fieldnorms_scoring_enabled)?;
648 if load_cache || store_cache {
649 let mut cache = self.collector_cache.lock();
650 for (((extractor, i), original_collector), adjusted_collector) in extractors
651 .into_iter()
652 .zip(missed_collector_indices.into_iter())
653 .zip(original_collectors.into_iter())
654 .zip(adjusted_collectors.into_iter())
655 {
656 let extracted_result = extractor.extract(&mut multi_fruit)?;
657 if CollectorCache::is_caching_enabled(&original_collector) {
658 let adjusted_extracted_result = CollectorCache::adjust_result(&extracted_result, &original_collector);
659 if store_cache {
660 info!(action = "storing_collector_to_cache", index_name = ?self.index_name, caching_key = caching_key, collector = ?adjusted_collector);
661 cache.put(&caching_key, &adjusted_collector, extracted_result);
662 };
663 collector_outputs[i] = Some(adjusted_extracted_result)
664 } else {
665 collector_outputs[i] = Some(extracted_result)
666 }
667 }
668 drop(cache);
669 } else {
670 for (i, extractor) in extractors.into_iter().enumerate() {
671 collector_outputs[i] = Some(extractor.extract(&mut multi_fruit)?);
672 }
673 }
674 Ok(collector_outputs.into_iter().map(Option::unwrap).collect())
675 }
676
677 pub async fn delete_documents(&self, query: proto::query::Query) -> SummaResult<u64> {
679 #[cfg(feature = "tokio-rt")]
680 let parsed_query = {
681 let query_parser = self.query_parser.clone();
682 tokio::task::spawn_blocking(move || query_parser.parse_query(query)).await??
683 };
684 #[cfg(not(feature = "tokio-rt"))]
685 let parsed_query = self.query_parser.parse_query(query)?;
686 debug!(action = "acquiring_index_writer_for_read");
687 self.index_writer_holder()?.read().await.delete_by_query(parsed_query)
688 }
689
690 pub async fn index_document(&self, document_bytes: &[u8], skip_updated_at_modification: bool) -> SummaResult<()> {
694 let document = SummaDocument::parse_json_bytes(&self.index.schema(), document_bytes, skip_updated_at_modification)?;
695 debug!(action = "acquiring_index_writer_for_read");
696 self.index_writer_holder()?.read().await.index_document(document, self.conflict_strategy())
697 }
698
699 pub async fn index_bulk(&self, documents: &Vec<Vec<u8>>, conflict_strategy: Option<proto::ConflictStrategy>) -> SummaResult<(u64, u64)> {
701 let (mut success_docs, mut failed_docs) = (0u64, 0u64);
702 debug!(action = "acquiring_index_writer_for_read");
703 let index_writer_holder = self.index_writer_holder()?.read().await;
704 let conflict_strategy = conflict_strategy.unwrap_or_else(|| self.conflict_strategy());
705 for document in documents {
706 match SummaDocument::UnboundJsonBytes(document)
707 .bound_with(&self.index.schema())
708 .try_into()
709 .and_then(|document| index_writer_holder.index_document(document, conflict_strategy))
710 {
711 Ok(_) => success_docs += 1,
712 Err(error) => {
713 warn!(action = "error", error = ?error);
714 failed_docs += 1
715 }
716 }
717 }
718 Ok((success_docs, failed_docs))
719 }
720
721 #[cfg(feature = "tokio-rt")]
722 pub async fn documents<O: Send + 'static>(
723 &self,
724 searcher: &Searcher,
725 query_filter: &Option<proto::Query>,
726 documents_modifier: impl Fn(tantivy::TantivyDocument) -> Option<O> + Clone + Send + Sync + 'static,
727 ) -> SummaResult<tokio::sync::mpsc::Receiver<O>> {
728 match query_filter {
729 None | Some(proto::Query { query: None }) => {
730 let segment_readers = searcher.segment_readers();
731 let (tx, rx) = tokio::sync::mpsc::channel(segment_readers.len() * 2 + 1);
732 for segment_reader in segment_readers {
733 let documents_modifier = documents_modifier.clone();
734 let tx = tx.clone();
735 let segment_reader = segment_reader.clone();
736 let span = tracing::Span::current();
737 tokio::task::spawn_blocking(move || {
738 span.in_scope(|| {
739 let store_reader = segment_reader.get_store_reader(1)?;
740 for document in store_reader.iter(segment_reader.alive_bitset()) {
741 let Ok(document) = document else {
742 info!(action = "broken_document", document = ?document);
743 return Ok::<_, Error>(());
744 };
745 if let Some(document) = documents_modifier(document) {
746 if tx.blocking_send(document).is_err() {
747 info!(action = "documents_client_dropped");
748 return Ok::<_, Error>(());
749 }
750 }
751 }
752 Ok(())
753 })
754 });
755 }
756 Ok(rx)
757 }
758 Some(proto::Query { query: Some(query_filter) }) => self.filtered_documents(searcher, query_filter, documents_modifier).await,
759 }
760 }
761
762 #[cfg(feature = "tokio-rt")]
763 async fn filtered_documents<O: Send + 'static>(
764 &self,
765 searcher: &Searcher,
766 query: &proto::query::Query,
767 documents_modifier: impl Fn(tantivy::TantivyDocument) -> Option<O> + Clone + Send + Sync + 'static,
768 ) -> SummaResult<tokio::sync::mpsc::Receiver<O>> {
769 let parsed_query = self.query_parser.parse_query(query.clone())?;
770 let collector = tantivy::collector::DocSetCollector;
771 let segment_readers = searcher.segment_readers();
772 let weight = parsed_query.weight_async(EnableScoring::disabled_from_searcher(searcher)).await?;
773
774 let fruits = join_all(segment_readers.iter().enumerate().map(|(segment_ord, segment_reader)| {
775 let weight_ref = weight.as_ref();
776 collector.collect_segment_async(weight_ref, segment_ord as u32, segment_reader)
777 }))
778 .await
779 .into_iter()
780 .collect::<tantivy::Result<Vec<_>>>()?;
781 let docs = collector.merge_fruits(fruits)?;
782
783 let (tx, rx) = tokio::sync::mpsc::channel(segment_readers.len() * 2 + 1);
784 let span = tracing::Span::current();
785 let searcher = searcher.clone();
786 tokio::task::spawn_blocking(move || {
787 span.in_scope(|| {
788 for doc_address in docs {
789 let document = searcher.doc(doc_address);
790 let Ok(document) = document else {
791 info!(action = "broken_document", document = ?document);
792 return Ok::<_, Error>(());
793 };
794 if let Some(document) = documents_modifier(document) {
795 if tx.blocking_send(document).is_err() {
796 info!(action = "documents_client_dropped");
797 return Ok::<_, Error>(());
798 }
799 }
800 }
801 Ok(())
802 })
803 });
804 Ok(rx)
805 }
806
807 pub fn space_usage(&self) -> SummaResult<SearcherSpaceUsage> {
808 let index_reader = self.index_reader();
809 index_reader.reload()?;
810 Ok(index_reader.searcher().space_usage()?)
811 }
812
813 pub fn clear_collector_cache(&self) {
814 self.collector_cache.lock().remove_expired()
815 }
816}
817
818#[cfg(test)]
819pub mod tests {
820 use std::error::Error;
821 use std::sync::Arc;
822
823 use serde_json::json;
824 use summa_proto::proto;
825 use summa_proto::proto::ConflictStrategy;
826 use tantivy::collector::{Count, TopDocs};
827 use tantivy::query::{AllQuery, TermQuery};
828 use tantivy::schema::{IndexRecordOption, Value};
829 use tantivy::{doc, IndexBuilder, TantivyDocument, Term};
830
831 use crate::components::index_holder::register_default_tokenizers;
832 use crate::components::test_utils::{create_test_schema, generate_documents};
833 use crate::components::{IndexWriterHolder, SummaDocument};
834 use crate::configs::core::WriterThreads;
835
836 #[test]
837 fn test_deletes() -> Result<(), Box<dyn Error>> {
838 let schema = create_test_schema();
839 let index = IndexBuilder::new()
840 .schema(schema.clone())
841 .index_attributes(proto::IndexAttributes {
842 unique_fields: vec!["id".to_string()],
843 ..Default::default()
844 })
845 .create_in_ram()?;
846 register_default_tokenizers(&index);
847 let mut index_writer_holder = IndexWriterHolder::create(
848 &index,
849 WriterThreads::N(12),
850 1024 * 1024 * 1024,
851 Arc::new(tantivy::merge_policy::LogMergePolicy::default()),
852 )?;
853 let mut last_document = None;
854 for document in generate_documents(&schema, 10000) {
855 let document: TantivyDocument = SummaDocument::parse_json_bytes(&schema, document.as_bytes(), false)?;
856 last_document = Some(document.clone());
857 index_writer_holder.index_document(document, ConflictStrategy::Merge)?;
858 }
859 let last_document = last_document.clone().unwrap();
860 let id = last_document.get_first(schema.get_field("id").unwrap()).unwrap().as_i64().unwrap();
861 let modified_last_document = doc!(
862 schema.get_field("id").unwrap() => id,
863 schema.get_field("issued_at").unwrap() => 100i64
864 );
865 index_writer_holder.commit()?;
866 for document in generate_documents(&schema, 1000) {
867 let document = SummaDocument::parse_json_bytes(&schema, document.as_bytes(), false)?;
868 index_writer_holder.index_document(modified_last_document.clone(), ConflictStrategy::Merge)?;
869 index_writer_holder.index_document(document, ConflictStrategy::Merge)?;
870 }
871 index_writer_holder.commit()?;
872 let reader = index.reader()?;
873 reader.reload()?;
874 let searcher = reader.searcher();
875 let counter = searcher.search(
876 &TermQuery::new(Term::from_field_i64(schema.get_field("id").unwrap(), id), IndexRecordOption::WithFreqs),
877 &Count,
878 )?;
879 assert_eq!(counter, 1);
880 Ok(())
881 }
882
883 #[test]
884 fn test_mapped_fields() -> Result<(), Box<dyn Error>> {
885 let schema = create_test_schema();
886 let metadata_field = schema.get_field("metadata").expect("no field");
887 let tags_field = schema.get_field("tags").expect("no field");
888 let title_field = schema.get_field("title").expect("no field");
889 let extra_field = schema.get_field("extra").expect("no field");
890 let index = IndexBuilder::new()
891 .schema(schema.clone())
892 .index_attributes(proto::IndexAttributes {
893 mapped_fields: vec![
894 proto::MappedField {
895 source_field: "metadata.isbn".to_string(),
896 target_field: "extra".to_string(),
897 },
898 proto::MappedField {
899 source_field: "tags".to_string(),
900 target_field: "extra".to_string(),
901 },
902 proto::MappedField {
903 source_field: "title".to_string(),
904 target_field: "extra".to_string(),
905 },
906 ],
907 ..Default::default()
908 })
909 .create_in_ram()?;
910 register_default_tokenizers(&index);
911 let mut index_writer_holder = IndexWriterHolder::create(
912 &index,
913 WriterThreads::N(12),
914 1024 * 1024 * 1024,
915 Arc::new(tantivy::merge_policy::LogMergePolicy::default()),
916 )?;
917 index_writer_holder.index_document(
918 doc!(
919 title_field => "Hitchhiker's guide",
920 metadata_field => json!({"isbn": ["100", "200", "300", "500", "600"], "another_title": "Super Title"}),
921 tags_field => "reality",
922 ),
923 ConflictStrategy::Merge,
924 )?;
925 index_writer_holder.index_document(
926 doc!(
927 title_field => "Envy of the Stars",
928 metadata_field => json!({"isbn": "500"}),
929 tags_field => "scifi",
930 tags_field => "novel",
931 ),
932 ConflictStrategy::Merge,
933 )?;
934 index_writer_holder.commit()?;
935 let reader = index.reader()?;
936 reader.reload()?;
937 let searcher = reader.searcher();
938 let docs = searcher
939 .search(
940 &TermQuery::new(Term::from_field_text(extra_field, "500"), IndexRecordOption::Basic),
941 &TopDocs::with_limit(10),
942 )?
943 .into_iter()
944 .map(|x| {
945 searcher
946 .doc::<TantivyDocument>(x.1)
947 .unwrap()
948 .get_first(title_field)
949 .unwrap()
950 .as_str()
951 .map(|x| x.to_string())
952 .unwrap()
953 })
954 .collect::<Vec<_>>();
955 assert_eq!(format!("{:?}", docs), "[\"Envy of the Stars\", \"Hitchhiker's guide\"]");
956 let docs = searcher
957 .search(
958 &TermQuery::new(Term::from_field_text(extra_field, "scifi"), IndexRecordOption::Basic),
959 &TopDocs::with_limit(10),
960 )?
961 .into_iter()
962 .map(|x| {
963 searcher
964 .doc::<TantivyDocument>(x.1)
965 .unwrap()
966 .get_first(title_field)
967 .unwrap()
968 .as_str()
969 .map(|x| x.to_string())
970 .unwrap()
971 })
972 .collect::<Vec<_>>();
973 assert_eq!(format!("{:?}", docs), "[\"Envy of the Stars\"]");
974 Ok(())
975 }
976
977 #[test]
978 fn test_dict_tokenizer() -> Result<(), Box<dyn Error>> {
979 let schema = create_test_schema();
980 let title_field = schema.get_field("title").expect("no field");
981 let concepts_field = schema.get_field("concepts").expect("no field");
982 let index = IndexBuilder::new()
983 .schema(schema.clone())
984 .index_attributes(proto::IndexAttributes {
985 mapped_fields: vec![proto::MappedField {
986 source_field: "title".to_string(),
987 target_field: "concepts".to_string(),
988 }],
989 ..Default::default()
990 })
991 .create_in_ram()?;
992 register_default_tokenizers(&index);
993 let mut index_writer_holder = IndexWriterHolder::create(
994 &index,
995 WriterThreads::N(12),
996 1024 * 1024 * 1024,
997 Arc::new(tantivy::merge_policy::LogMergePolicy::default()),
998 )?;
999 index_writer_holder.index_document(
1000 doc!(
1001 title_field => "CAGH44 gene (not CAGH45) can be correlated with autism disorder. Do not try to treat it with aspirin",
1002 ),
1003 ConflictStrategy::Merge,
1004 )?;
1005 index_writer_holder.commit()?;
1006 let reader = index.reader()?;
1007 reader.reload()?;
1008 let searcher = reader.searcher();
1009 let docs = searcher
1010 .search(
1011 &TermQuery::new(Term::from_field_text(concepts_field, "acetylsalicylic acid"), IndexRecordOption::Basic),
1012 &TopDocs::with_limit(10),
1013 )?
1014 .into_iter()
1015 .map(|x| {
1016 searcher
1017 .doc::<TantivyDocument>(x.1)
1018 .unwrap()
1019 .get_first(title_field)
1020 .unwrap()
1021 .as_str()
1022 .map(|x| x.to_string())
1023 .unwrap()
1024 })
1025 .collect::<Vec<_>>();
1026 assert_eq!(
1027 format!("{:?}", docs),
1028 "[\"CAGH44 gene (not CAGH45) can be correlated with autism disorder. Do not try to treat it with aspirin\"]"
1029 );
1030 Ok(())
1031 }
1032
1033 #[test]
1034 fn test_unique_json_fields() -> Result<(), Box<dyn Error>> {
1035 let schema = create_test_schema();
1036 let index = IndexBuilder::new()
1037 .schema(schema.clone())
1038 .index_attributes(proto::IndexAttributes {
1039 unique_fields: vec!["metadata.id".to_string()],
1040 ..Default::default()
1041 })
1042 .create_in_ram()?;
1043 register_default_tokenizers(&index);
1044 let mut index_writer_holder = IndexWriterHolder::create(
1045 &index,
1046 WriterThreads::N(12),
1047 1024 * 1024 * 1024,
1048 Arc::new(tantivy::merge_policy::LogMergePolicy::default()),
1049 )?;
1050
1051 index_writer_holder.index_document(
1052 doc!(schema.get_field("metadata").expect("no field") => json!({"id": 1})),
1053 ConflictStrategy::Merge,
1054 )?;
1055 index_writer_holder.index_document(
1056 doc!(schema.get_field("metadata").expect("no field") => json!({"id": 2})),
1057 ConflictStrategy::Merge,
1058 )?;
1059 index_writer_holder.index_document(
1060 doc!(schema.get_field("metadata").expect("no field") => json!({"id": 3})),
1061 ConflictStrategy::Merge,
1062 )?;
1063 index_writer_holder.commit()?;
1064 let reader = index.reader()?;
1065 reader.reload()?;
1066 let searcher = reader.searcher();
1067 let counter = searcher.search(&AllQuery, &Count)?;
1068 assert_eq!(counter, 3);
1069 index_writer_holder.index_document(
1070 doc!(schema.get_field("metadata").expect("no field") => json!({"id": "g"})),
1071 ConflictStrategy::Merge,
1072 )?;
1073 index_writer_holder.commit()?;
1074 let reader = index.reader()?;
1075 reader.reload()?;
1076 let searcher = reader.searcher();
1077 let counter = searcher.search(&AllQuery, &Count)?;
1078 assert_eq!(counter, 4);
1079 index_writer_holder.index_document(
1080 doc!(schema.get_field("metadata").expect("no field") => json!({"id": "g"})),
1081 ConflictStrategy::Merge,
1082 )?;
1083 index_writer_holder.commit()?;
1084 let reader = index.reader()?;
1085 reader.reload()?;
1086 let searcher = reader.searcher();
1087 let counter = searcher.search(&AllQuery, &Count)?;
1088 assert_eq!(counter, 4);
1089 index_writer_holder.index_document(
1090 doc!(schema.get_field("metadata").expect("no field") => json!({"id": 2})),
1091 ConflictStrategy::Merge,
1092 )?;
1093 index_writer_holder.index_document(
1094 doc!(schema.get_field("metadata").expect("no field") => json!({"id": 4})),
1095 ConflictStrategy::Merge,
1096 )?;
1097 index_writer_holder.commit()?;
1098 let reader = index.reader()?;
1099 reader.reload()?;
1100 let searcher = reader.searcher();
1101 let counter = searcher.search(&AllQuery, &Count)?;
1102 assert_eq!(counter, 5);
1103 Ok(())
1104 }
1105}