Skip to main content

mxr_search/
index.rs

1use crate::schema::MxrSchema;
2use mxr_core::id::MessageId;
3use mxr_core::types::MessageFlags;
4use mxr_core::types::{Envelope, MessageBody};
5use mxr_core::MxrError;
6use std::path::Path;
7use tantivy::{
8    collector::TopDocs, query::Query, query::QueryParser, schema::Value, Index, IndexReader,
9    IndexWriter, ReloadPolicy, TantivyDocument,
10};
11
12pub struct SearchIndex {
13    index: Index,
14    reader: IndexReader,
15    writer: IndexWriter,
16    schema: MxrSchema,
17}
18
19#[derive(Debug, Clone)]
20pub struct SearchResult {
21    pub message_id: String,
22    pub account_id: String,
23    pub thread_id: String,
24    pub score: f32,
25}
26
27impl SearchIndex {
28    pub fn schema(&self) -> &MxrSchema {
29        &self.schema
30    }
31
32    pub fn open(index_path: &Path) -> Result<Self, MxrError> {
33        let schema_def = MxrSchema::build();
34        let dir = tantivy::directory::MmapDirectory::open(index_path)
35            .map_err(|e| MxrError::Search(e.to_string()))?;
36
37        let index = match Index::open_or_create(dir, schema_def.schema.clone()) {
38            Ok(idx) => idx,
39            Err(e) if e.to_string().contains("schema does not match") => {
40                tracing::warn!("Search index schema mismatch, rebuilding: {e}");
41                // Wipe and recreate
42                if index_path.exists() {
43                    std::fs::remove_dir_all(index_path)
44                        .map_err(|e| MxrError::Search(e.to_string()))?;
45                    std::fs::create_dir_all(index_path)
46                        .map_err(|e| MxrError::Search(e.to_string()))?;
47                }
48                let dir = tantivy::directory::MmapDirectory::open(index_path)
49                    .map_err(|e| MxrError::Search(e.to_string()))?;
50                Index::open_or_create(dir, schema_def.schema.clone())
51                    .map_err(|e| MxrError::Search(e.to_string()))?
52            }
53            Err(e) => return Err(MxrError::Search(e.to_string())),
54        };
55
56        let reader = index
57            .reader_builder()
58            .reload_policy(ReloadPolicy::OnCommitWithDelay)
59            .try_into()
60            .map_err(|e: tantivy::TantivyError| MxrError::Search(e.to_string()))?;
61
62        let writer = index
63            .writer(50_000_000)
64            .map_err(|e| MxrError::Search(e.to_string()))?;
65
66        Ok(Self {
67            index,
68            reader,
69            writer,
70            schema: schema_def,
71        })
72    }
73
74    pub fn in_memory() -> Result<Self, MxrError> {
75        let schema_def = MxrSchema::build();
76        let index = Index::create_in_ram(schema_def.schema.clone());
77
78        let reader = index
79            .reader_builder()
80            .reload_policy(ReloadPolicy::Manual)
81            .try_into()
82            .map_err(|e: tantivy::TantivyError| MxrError::Search(e.to_string()))?;
83
84        let writer = index
85            .writer(15_000_000)
86            .map_err(|e| MxrError::Search(e.to_string()))?;
87
88        Ok(Self {
89            index,
90            reader,
91            writer,
92            schema: schema_def,
93        })
94    }
95
96    pub fn index_envelope(&mut self, envelope: &Envelope) -> Result<(), MxrError> {
97        let s = &self.schema;
98        let mut doc = TantivyDocument::new();
99        doc.add_text(s.message_id, envelope.id.as_str());
100        doc.add_text(s.account_id, envelope.account_id.as_str());
101        doc.add_text(s.thread_id, envelope.thread_id.as_str());
102        doc.add_text(s.subject, &envelope.subject);
103        doc.add_text(s.from_name, envelope.from.name.as_deref().unwrap_or(""));
104        doc.add_text(s.from_email, &envelope.from.email);
105        for addr in &envelope.to {
106            doc.add_text(s.to_email, &addr.email);
107        }
108        for addr in &envelope.cc {
109            doc.add_text(s.cc_email, &addr.email);
110        }
111        for addr in &envelope.bcc {
112            doc.add_text(s.bcc_email, &addr.email);
113        }
114        doc.add_text(s.snippet, &envelope.snippet);
115        for label in &envelope.label_provider_ids {
116            doc.add_text(s.labels, label.to_lowercase());
117        }
118        doc.add_u64(s.size_bytes, envelope.size_bytes);
119        doc.add_u64(s.flags, envelope.flags.bits() as u64);
120        doc.add_bool(s.has_attachments, envelope.has_attachments);
121        doc.add_bool(s.is_read, envelope.flags.contains(MessageFlags::READ));
122        doc.add_bool(s.is_starred, envelope.flags.contains(MessageFlags::STARRED));
123        doc.add_bool(s.is_draft, envelope.flags.contains(MessageFlags::DRAFT));
124        doc.add_bool(s.is_sent, envelope.flags.contains(MessageFlags::SENT));
125        doc.add_bool(s.is_trash, envelope.flags.contains(MessageFlags::TRASH));
126        doc.add_bool(s.is_spam, envelope.flags.contains(MessageFlags::SPAM));
127        doc.add_bool(
128            s.is_answered,
129            envelope.flags.contains(MessageFlags::ANSWERED),
130        );
131
132        let dt = tantivy::DateTime::from_timestamp_secs(envelope.date.timestamp());
133        doc.add_date(s.date, dt);
134
135        self.writer
136            .add_document(doc)
137            .map_err(|e| MxrError::Search(e.to_string()))?;
138        Ok(())
139    }
140
141    pub fn index_body(&mut self, envelope: &Envelope, body: &MessageBody) -> Result<(), MxrError> {
142        let term = tantivy::Term::from_field_text(self.schema.message_id, &envelope.id.as_str());
143        self.writer.delete_term(term);
144
145        let s = &self.schema;
146        let mut doc = TantivyDocument::new();
147        doc.add_text(s.message_id, envelope.id.as_str());
148        doc.add_text(s.account_id, envelope.account_id.as_str());
149        doc.add_text(s.thread_id, envelope.thread_id.as_str());
150        doc.add_text(s.subject, &envelope.subject);
151        doc.add_text(s.from_name, envelope.from.name.as_deref().unwrap_or(""));
152        doc.add_text(s.from_email, &envelope.from.email);
153        for addr in &envelope.to {
154            doc.add_text(s.to_email, &addr.email);
155        }
156        for addr in &envelope.cc {
157            doc.add_text(s.cc_email, &addr.email);
158        }
159        for addr in &envelope.bcc {
160            doc.add_text(s.bcc_email, &addr.email);
161        }
162        doc.add_text(s.snippet, &envelope.snippet);
163        for label in &envelope.label_provider_ids {
164            doc.add_text(s.labels, label.to_lowercase());
165        }
166
167        let body_text = body.text_plain.as_deref().unwrap_or("");
168        doc.add_text(s.body_text, body_text);
169        for attachment in &body.attachments {
170            doc.add_text(s.attachment_filenames, attachment.filename.to_lowercase());
171        }
172
173        doc.add_u64(s.size_bytes, envelope.size_bytes);
174        doc.add_u64(s.flags, envelope.flags.bits() as u64);
175        doc.add_bool(s.has_attachments, envelope.has_attachments);
176        doc.add_bool(s.is_read, envelope.flags.contains(MessageFlags::READ));
177        doc.add_bool(s.is_starred, envelope.flags.contains(MessageFlags::STARRED));
178        doc.add_bool(s.is_draft, envelope.flags.contains(MessageFlags::DRAFT));
179        doc.add_bool(s.is_sent, envelope.flags.contains(MessageFlags::SENT));
180        doc.add_bool(s.is_trash, envelope.flags.contains(MessageFlags::TRASH));
181        doc.add_bool(s.is_spam, envelope.flags.contains(MessageFlags::SPAM));
182        doc.add_bool(
183            s.is_answered,
184            envelope.flags.contains(MessageFlags::ANSWERED),
185        );
186        let dt = tantivy::DateTime::from_timestamp_secs(envelope.date.timestamp());
187        doc.add_date(s.date, dt);
188
189        self.writer
190            .add_document(doc)
191            .map_err(|e| MxrError::Search(e.to_string()))?;
192        Ok(())
193    }
194
195    pub fn remove_document(&mut self, message_id: &MessageId) {
196        let term = tantivy::Term::from_field_text(self.schema.message_id, &message_id.as_str());
197        self.writer.delete_term(term);
198    }
199
200    pub fn commit(&mut self) -> Result<(), MxrError> {
201        self.writer
202            .commit()
203            .map_err(|e| MxrError::Search(e.to_string()))?;
204        self.reader
205            .reload()
206            .map_err(|e| MxrError::Search(e.to_string()))?;
207        Ok(())
208    }
209
210    pub fn search(&self, query_str: &str, limit: usize) -> Result<Vec<SearchResult>, MxrError> {
211        let s = &self.schema;
212
213        let mut query_parser = QueryParser::for_index(
214            &self.index,
215            vec![
216                s.subject,
217                s.from_name,
218                s.snippet,
219                s.body_text,
220                s.attachment_filenames,
221            ],
222        );
223        query_parser.set_field_boost(s.subject, 3.0);
224        query_parser.set_field_boost(s.from_name, 2.0);
225        query_parser.set_field_boost(s.snippet, 1.0);
226        query_parser.set_field_boost(s.body_text, 0.5);
227        query_parser.set_field_boost(s.attachment_filenames, 0.75);
228
229        let query = query_parser
230            .parse_query(query_str)
231            .map_err(|e| MxrError::Search(e.to_string()))?;
232
233        let searcher = self.reader.searcher();
234        let top_docs = searcher
235            .search(&query, &TopDocs::with_limit(limit))
236            .map_err(|e| MxrError::Search(e.to_string()))?;
237
238        let mut results = Vec::with_capacity(top_docs.len());
239        for (score, doc_address) in top_docs {
240            let doc: TantivyDocument = searcher
241                .doc(doc_address)
242                .map_err(|e| MxrError::Search(e.to_string()))?;
243
244            let message_id = doc
245                .get_first(s.message_id)
246                .and_then(|v| v.as_str())
247                .unwrap_or("")
248                .to_string();
249            let account_id = doc
250                .get_first(s.account_id)
251                .and_then(|v| v.as_str())
252                .unwrap_or("")
253                .to_string();
254            let thread_id = doc
255                .get_first(s.thread_id)
256                .and_then(|v| v.as_str())
257                .unwrap_or("")
258                .to_string();
259
260            results.push(SearchResult {
261                message_id,
262                account_id,
263                thread_id,
264                score,
265            });
266        }
267
268        Ok(results)
269    }
270
271    /// Number of indexed documents.
272    pub fn num_docs(&self) -> u64 {
273        self.reader.searcher().num_docs()
274    }
275
276    /// Clear all documents and prepare for reindexing.
277    pub fn clear(&mut self) -> Result<(), MxrError> {
278        self.writer
279            .delete_all_documents()
280            .map_err(|e| MxrError::Search(e.to_string()))?;
281        self.commit()?;
282        Ok(())
283    }
284
285    pub fn search_ast(
286        &self,
287        query: Box<dyn Query>,
288        limit: usize,
289    ) -> Result<Vec<SearchResult>, MxrError> {
290        let s = &self.schema;
291        let searcher = self.reader.searcher();
292        let top_docs = searcher
293            .search(&*query, &TopDocs::with_limit(limit))
294            .map_err(|e| MxrError::Search(e.to_string()))?;
295
296        let mut results = Vec::with_capacity(top_docs.len());
297        for (score, doc_address) in top_docs {
298            let doc: TantivyDocument = searcher
299                .doc(doc_address)
300                .map_err(|e| MxrError::Search(e.to_string()))?;
301
302            let message_id = doc
303                .get_first(s.message_id)
304                .and_then(|v| v.as_str())
305                .unwrap_or("")
306                .to_string();
307            let account_id = doc
308                .get_first(s.account_id)
309                .and_then(|v| v.as_str())
310                .unwrap_or("")
311                .to_string();
312            let thread_id = doc
313                .get_first(s.thread_id)
314                .and_then(|v| v.as_str())
315                .unwrap_or("")
316                .to_string();
317
318            results.push(SearchResult {
319                message_id,
320                account_id,
321                thread_id,
322                score,
323            });
324        }
325
326        Ok(results)
327    }
328}