summa_core/components/
index_registry.rs1use std::collections::HashMap;
2use std::fmt::{Debug, Formatter};
3use std::sync::Arc;
4
5use futures::future::join_all;
6use summa_proto::proto;
7use summa_proto::proto::collector_output::CollectorOutput;
8use summa_proto::proto::Score;
9use tantivy::DocAddress;
10use tokio::sync::RwLock;
11use tracing::{debug, info, trace};
12
13use super::IndexHolder;
14use crate::components::custom_serializer::NamedFieldDocument;
15use crate::components::fruit_extractors::{IntermediateExtractionResult, ReadyCollectorOutput, ScoredDocAddress};
16use crate::configs::{ConfigProxy, DirectProxy};
17use crate::errors::{SummaResult, ValidationError};
18use crate::proto_traits::Wrapper;
19use crate::utils::sync::{Handler, OwningHandler};
20use crate::Error;
21
22#[derive(Clone)]
24pub struct IndexRegistry {
25 core_config_holder: Arc<dyn ConfigProxy<crate::configs::core::Config>>,
26 index_holders: Arc<RwLock<HashMap<String, OwningHandler<IndexHolder>>>>,
27}
28
29impl Default for IndexRegistry {
30 fn default() -> Self {
31 IndexRegistry {
32 core_config_holder: Arc::new(DirectProxy::default()),
33 index_holders: Arc::default(),
34 }
35 }
36}
37
38impl Debug for IndexRegistry {
39 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("IndexRegistry").field("index_holders", &self.index_holders).finish()
41 }
42}
43
44struct ScoredDocAddressRefWithAlias<'a> {
45 index_alias: &'a str,
46 scored_doc_address: &'a ScoredDocAddress,
47}
48
49impl ScoredDocAddressRefWithAlias<'_> {
50 pub fn doc_address(&self) -> DocAddress {
51 self.scored_doc_address.doc_address
52 }
53
54 pub fn score(&self) -> &Option<Score> {
55 &self.scored_doc_address.score
56 }
57}
58
59impl IndexRegistry {
60 pub fn new(core_config: &Arc<dyn ConfigProxy<crate::configs::core::Config>>) -> IndexRegistry {
61 IndexRegistry {
62 core_config_holder: core_config.clone(),
63 index_holders: Arc::default(),
64 }
65 }
66
67 pub fn index_holders(&self) -> &Arc<RwLock<HashMap<String, OwningHandler<IndexHolder>>>> {
69 &self.index_holders
70 }
71
72 pub async fn index_holders_cloned(&self) -> HashMap<String, Handler<IndexHolder>> {
73 self.index_holders()
74 .read()
75 .await
76 .iter()
77 .map(|(index_name, handler)| (index_name.to_string(), handler.handler()))
78 .collect()
79 }
80
81 pub async fn clear(self) {
82 debug!(action = "acquiring_index_holders_for_write");
83 self.index_holders().write().await.clear();
84 }
85
86 pub async fn get_index_holder(&self, index_alias: &str) -> SummaResult<Handler<IndexHolder>> {
91 self.get_index_holder_by_name(
92 self.core_config_holder
93 .read()
94 .await
95 .get()
96 .resolve_index_alias(index_alias)
97 .as_deref()
98 .unwrap_or(index_alias),
99 )
100 .await
101 }
102
103 pub async fn get_index_holder_by_name(&self, index_name: &str) -> SummaResult<Handler<IndexHolder>> {
105 Ok(self
106 .index_holders()
107 .read()
108 .await
109 .get(index_name)
110 .ok_or_else(|| Error::Validation(Box::new(ValidationError::MissingIndex(index_name.to_owned()))))?
111 .handler())
112 }
113
114 pub async fn add(&self, index_holder: IndexHolder) -> SummaResult<Handler<IndexHolder>> {
116 let index_holder = OwningHandler::new(index_holder);
117 let index_holder_handler = index_holder.handler();
118 debug!(action = "acquiring_index_holders_for_write");
119 self.index_holders().write().await.insert(index_holder.index_name().to_string(), index_holder);
120 info!(action = "added");
121 Ok(index_holder_handler)
122 }
123
124 pub async fn delete(&self, index_name: &str) {
126 debug!(action = "acquiring_index_holders_for_write");
127 self.index_holders().write().await.remove(index_name);
128 }
129
130 pub async fn finalize_extraction(&self, ie_results: Vec<IntermediateExtractionResult>) -> SummaResult<Vec<proto::CollectorOutput>> {
132 if ie_results.is_empty() {
133 return Ok(vec![]);
134 }
135 let mut collector_outputs = vec![];
136
137 for ie_result in ie_results.into_iter() {
138 collector_outputs.push(proto::CollectorOutput {
139 collector_output: Some(match ie_result {
140 IntermediateExtractionResult::Ready(ReadyCollectorOutput::Aggregation(aggregation_collector_output)) => {
141 CollectorOutput::Aggregation(aggregation_collector_output)
142 }
143 IntermediateExtractionResult::Ready(ReadyCollectorOutput::Count(count_collector_output)) => CollectorOutput::Count(count_collector_output),
144 IntermediateExtractionResult::Ready(ReadyCollectorOutput::Facet(facet_collector_output)) => CollectorOutput::Facet(facet_collector_output),
145 IntermediateExtractionResult::PreparedDocumentReferences(prepared_document_references) => {
146 trace!(action = "prepared_documents_finalization");
147 let extraction_tooling = &prepared_document_references.extraction_tooling;
148 let snippet_generator = if let Some(snippet_generator_config) = prepared_document_references.snippet_generator_config {
149 Some(snippet_generator_config.as_tantivy_async().await)
150 } else {
151 None
152 };
153
154 let scored_doc_address_refs =
155 prepared_document_references
156 .scored_doc_addresses
157 .iter()
158 .map(|scored_doc_address| ScoredDocAddressRefWithAlias {
159 index_alias: prepared_document_references.index_alias.as_str(),
160 scored_doc_address,
161 });
162 let extraction_tooling_ref = &extraction_tooling;
163 let snippet_generator_ref = &snippet_generator;
164
165 let scored_documents = join_all(scored_doc_address_refs.into_iter().enumerate().map(|(position, scored_doc_address_ref)| {
166 let doc_address = scored_doc_address_ref.doc_address();
167 let searcher = extraction_tooling_ref.searcher.clone();
168 async move {
169 #[cfg(feature = "tokio-rt")]
170 let document = tokio::task::spawn_blocking(move || searcher.doc(doc_address)).await??;
171 #[cfg(not(feature = "tokio-rt"))]
172 let document = searcher.doc_async(doc_address).await?;
173 Ok(proto::ScoredDocument {
174 document: NamedFieldDocument::from_document(
175 extraction_tooling.searcher.schema(),
176 &extraction_tooling.query_fields,
177 &extraction_tooling.multi_fields,
178 &document,
179 )
180 .to_json_string(),
181 score: scored_doc_address_ref.score().clone(),
182 position: position as u32,
183 snippets: snippet_generator_ref
184 .as_ref()
185 .map(|snippet_generator_ref| {
186 snippet_generator_ref
187 .iter()
188 .map(|(field_name, snippet_generator)| {
189 (
190 field_name.to_string(),
191 Wrapper::from(snippet_generator.snippet_from_doc(&document)).into_inner(),
192 )
193 })
194 .collect()
195 })
196 .unwrap_or_default(),
197 index_alias: scored_doc_address_ref.index_alias.to_string(),
198 })
199 }
200 }))
201 .await
202 .into_iter()
203 .collect::<SummaResult<Vec<_>>>()?;
204 CollectorOutput::Documents(proto::DocumentsCollectorOutput {
205 has_next: prepared_document_references.has_next,
206 scored_documents,
207 })
208 }
209 }),
210 });
211 }
212 Ok(collector_outputs)
213 }
214}