summa_core/components/
index_registry.rs

1use std::collections::HashMap;
2use std::fmt::{Debug, Formatter};
3use std::sync::Arc;
4
5use futures::future::join_all;
6use summa_proto::proto;
7use summa_proto::proto::collector_output::CollectorOutput;
8use summa_proto::proto::Score;
9use tantivy::DocAddress;
10use tokio::sync::RwLock;
11use tracing::{debug, info, trace};
12
13use super::IndexHolder;
14use crate::components::custom_serializer::NamedFieldDocument;
15use crate::components::fruit_extractors::{IntermediateExtractionResult, ReadyCollectorOutput, ScoredDocAddress};
16use crate::configs::{ConfigProxy, DirectProxy};
17use crate::errors::{SummaResult, ValidationError};
18use crate::proto_traits::Wrapper;
19use crate::utils::sync::{Handler, OwningHandler};
20use crate::Error;
21
22/// The main struct responsible for combining different indices and managing their lifetime.
23#[derive(Clone)]
24pub struct IndexRegistry {
25    core_config_holder: Arc<dyn ConfigProxy<crate::configs::core::Config>>,
26    index_holders: Arc<RwLock<HashMap<String, OwningHandler<IndexHolder>>>>,
27}
28
29impl Default for IndexRegistry {
30    fn default() -> Self {
31        IndexRegistry {
32            core_config_holder: Arc::new(DirectProxy::default()),
33            index_holders: Arc::default(),
34        }
35    }
36}
37
38impl Debug for IndexRegistry {
39    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("IndexRegistry").field("index_holders", &self.index_holders).finish()
41    }
42}
43
44struct ScoredDocAddressRefWithAlias<'a> {
45    index_alias: &'a str,
46    scored_doc_address: &'a ScoredDocAddress,
47}
48
49impl ScoredDocAddressRefWithAlias<'_> {
50    pub fn doc_address(&self) -> DocAddress {
51        self.scored_doc_address.doc_address
52    }
53
54    pub fn score(&self) -> &Option<Score> {
55        &self.scored_doc_address.score
56    }
57}
58
59impl IndexRegistry {
60    pub fn new(core_config: &Arc<dyn ConfigProxy<crate::configs::core::Config>>) -> IndexRegistry {
61        IndexRegistry {
62            core_config_holder: core_config.clone(),
63            index_holders: Arc::default(),
64        }
65    }
66
67    /// Read-locked `HashMap` of all indices
68    pub fn index_holders(&self) -> &Arc<RwLock<HashMap<String, OwningHandler<IndexHolder>>>> {
69        &self.index_holders
70    }
71
72    pub async fn index_holders_cloned(&self) -> HashMap<String, Handler<IndexHolder>> {
73        self.index_holders()
74            .read()
75            .await
76            .iter()
77            .map(|(index_name, handler)| (index_name.to_string(), handler.handler()))
78            .collect()
79    }
80
81    pub async fn clear(self) {
82        debug!(action = "acquiring_index_holders_for_write");
83        self.index_holders().write().await.clear();
84    }
85
86    /// Returns `Handler` to `IndexHolder`.
87    ///
88    /// It is safe to keep `Handler<IndexHolder>` cause `Index` won't be deleted until `Handler` is alive.
89    /// Though, `IndexHolder` can be removed from the registry of `IndexHolder`s to prevent new queries
90    pub async fn get_index_holder(&self, index_alias: &str) -> SummaResult<Handler<IndexHolder>> {
91        self.get_index_holder_by_name(
92            self.core_config_holder
93                .read()
94                .await
95                .get()
96                .resolve_index_alias(index_alias)
97                .as_deref()
98                .unwrap_or(index_alias),
99        )
100        .await
101    }
102
103    /// Retrieve `IndexHolder` by its name
104    pub async fn get_index_holder_by_name(&self, index_name: &str) -> SummaResult<Handler<IndexHolder>> {
105        Ok(self
106            .index_holders()
107            .read()
108            .await
109            .get(index_name)
110            .ok_or_else(|| Error::Validation(Box::new(ValidationError::MissingIndex(index_name.to_owned()))))?
111            .handler())
112    }
113
114    /// Add new index to `IndexRegistry`
115    pub async fn add(&self, index_holder: IndexHolder) -> SummaResult<Handler<IndexHolder>> {
116        let index_holder = OwningHandler::new(index_holder);
117        let index_holder_handler = index_holder.handler();
118        debug!(action = "acquiring_index_holders_for_write");
119        self.index_holders().write().await.insert(index_holder.index_name().to_string(), index_holder);
120        info!(action = "added");
121        Ok(index_holder_handler)
122    }
123
124    /// Deletes index from `IndexRegistry`
125    pub async fn delete(&self, index_name: &str) {
126        debug!(action = "acquiring_index_holders_for_write");
127        self.index_holders().write().await.remove(index_name);
128    }
129
130    /// Merges several `IntermediateExtractionResult`
131    pub async fn finalize_extraction(&self, ie_results: Vec<IntermediateExtractionResult>) -> SummaResult<Vec<proto::CollectorOutput>> {
132        if ie_results.is_empty() {
133            return Ok(vec![]);
134        }
135        let mut collector_outputs = vec![];
136
137        for ie_result in ie_results.into_iter() {
138            collector_outputs.push(proto::CollectorOutput {
139                collector_output: Some(match ie_result {
140                    IntermediateExtractionResult::Ready(ReadyCollectorOutput::Aggregation(aggregation_collector_output)) => {
141                        CollectorOutput::Aggregation(aggregation_collector_output)
142                    }
143                    IntermediateExtractionResult::Ready(ReadyCollectorOutput::Count(count_collector_output)) => CollectorOutput::Count(count_collector_output),
144                    IntermediateExtractionResult::Ready(ReadyCollectorOutput::Facet(facet_collector_output)) => CollectorOutput::Facet(facet_collector_output),
145                    IntermediateExtractionResult::PreparedDocumentReferences(prepared_document_references) => {
146                        trace!(action = "prepared_documents_finalization");
147                        let extraction_tooling = &prepared_document_references.extraction_tooling;
148                        let snippet_generator = if let Some(snippet_generator_config) = prepared_document_references.snippet_generator_config {
149                            Some(snippet_generator_config.as_tantivy_async().await)
150                        } else {
151                            None
152                        };
153
154                        let scored_doc_address_refs =
155                            prepared_document_references
156                                .scored_doc_addresses
157                                .iter()
158                                .map(|scored_doc_address| ScoredDocAddressRefWithAlias {
159                                    index_alias: prepared_document_references.index_alias.as_str(),
160                                    scored_doc_address,
161                                });
162                        let extraction_tooling_ref = &extraction_tooling;
163                        let snippet_generator_ref = &snippet_generator;
164
165                        let scored_documents = join_all(scored_doc_address_refs.into_iter().enumerate().map(|(position, scored_doc_address_ref)| {
166                            let doc_address = scored_doc_address_ref.doc_address();
167                            let searcher = extraction_tooling_ref.searcher.clone();
168                            async move {
169                                #[cfg(feature = "tokio-rt")]
170                                let document = tokio::task::spawn_blocking(move || searcher.doc(doc_address)).await??;
171                                #[cfg(not(feature = "tokio-rt"))]
172                                let document = searcher.doc_async(doc_address).await?;
173                                Ok(proto::ScoredDocument {
174                                    document: NamedFieldDocument::from_document(
175                                        extraction_tooling.searcher.schema(),
176                                        &extraction_tooling.query_fields,
177                                        &extraction_tooling.multi_fields,
178                                        &document,
179                                    )
180                                    .to_json_string(),
181                                    score: scored_doc_address_ref.score().clone(),
182                                    position: position as u32,
183                                    snippets: snippet_generator_ref
184                                        .as_ref()
185                                        .map(|snippet_generator_ref| {
186                                            snippet_generator_ref
187                                                .iter()
188                                                .map(|(field_name, snippet_generator)| {
189                                                    (
190                                                        field_name.to_string(),
191                                                        Wrapper::from(snippet_generator.snippet_from_doc(&document)).into_inner(),
192                                                    )
193                                                })
194                                                .collect()
195                                        })
196                                        .unwrap_or_default(),
197                                    index_alias: scored_doc_address_ref.index_alias.to_string(),
198                                })
199                            }
200                        }))
201                        .await
202                        .into_iter()
203                        .collect::<SummaResult<Vec<_>>>()?;
204                        CollectorOutput::Documents(proto::DocumentsCollectorOutput {
205                            has_next: prepared_document_references.has_next,
206                            scored_documents,
207                        })
208                    }
209                }),
210            });
211        }
212        Ok(collector_outputs)
213    }
214}