Skip to main content

mxr_search/
index.rs

1use crate::schema::MxrSchema;
2use mxr_core::id::MessageId;
3use mxr_core::types::MessageFlags;
4use mxr_core::types::{Envelope, MessageBody, SortOrder};
5use mxr_core::MxrError;
6use std::path::Path;
7use tantivy::{
8    collector::TopDocs, query::Query, query::QueryParser, schema::Value, Index, IndexReader,
9    IndexWriter, Order, ReloadPolicy, TantivyDocument,
10};
11
12pub struct SearchIndex {
13    index: Index,
14    reader: IndexReader,
15    writer: IndexWriter,
16    schema: MxrSchema,
17}
18
19#[derive(Debug, Clone)]
20pub struct SearchResult {
21    pub message_id: String,
22    pub account_id: String,
23    pub thread_id: String,
24    pub score: f32,
25}
26
27#[derive(Debug, Clone)]
28pub struct SearchPage {
29    pub results: Vec<SearchResult>,
30    pub has_more: bool,
31}
32
33fn sane_search_sort_timestamp(timestamp: i64) -> i64 {
34    let cutoff = (chrono::Utc::now() + chrono::Duration::days(1)).timestamp();
35    if timestamp > cutoff {
36        0
37    } else {
38        timestamp
39    }
40}
41
42impl SearchIndex {
43    pub fn schema(&self) -> &MxrSchema {
44        &self.schema
45    }
46
47    pub fn open(index_path: &Path) -> Result<Self, MxrError> {
48        let (index, _) = Self::open_with_rebuild_status(index_path)?;
49        Ok(index)
50    }
51
52    pub fn open_with_rebuild_status(index_path: &Path) -> Result<(Self, bool), MxrError> {
53        let schema_def = MxrSchema::build();
54        let dir = tantivy::directory::MmapDirectory::open(index_path)
55            .map_err(|e| MxrError::Search(e.to_string()))?;
56
57        let (index, rebuilt) = match Index::open_or_create(dir, schema_def.schema.clone()) {
58            Ok(idx) => (idx, false),
59            Err(e) if e.to_string().contains("schema does not match") => {
60                tracing::warn!("Search index schema mismatch, rebuilding: {e}");
61                // Wipe and recreate
62                if index_path.exists() {
63                    std::fs::remove_dir_all(index_path)
64                        .map_err(|e| MxrError::Search(e.to_string()))?;
65                    std::fs::create_dir_all(index_path)
66                        .map_err(|e| MxrError::Search(e.to_string()))?;
67                }
68                let dir = tantivy::directory::MmapDirectory::open(index_path)
69                    .map_err(|e| MxrError::Search(e.to_string()))?;
70                (
71                    Index::open_or_create(dir, schema_def.schema.clone())
72                        .map_err(|e| MxrError::Search(e.to_string()))?,
73                    true,
74                )
75            }
76            Err(e) => return Err(MxrError::Search(e.to_string())),
77        };
78
79        let reader = index
80            .reader_builder()
81            .reload_policy(ReloadPolicy::OnCommitWithDelay)
82            .try_into()
83            .map_err(|e: tantivy::TantivyError| MxrError::Search(e.to_string()))?;
84
85        let writer = index
86            .writer(50_000_000)
87            .map_err(|e| MxrError::Search(e.to_string()))?;
88
89        Ok((
90            Self {
91                index,
92                reader,
93                writer,
94                schema: schema_def,
95            },
96            rebuilt,
97        ))
98    }
99
100    pub fn in_memory() -> Result<Self, MxrError> {
101        let schema_def = MxrSchema::build();
102        let index = Index::create_in_ram(schema_def.schema.clone());
103
104        let reader = index
105            .reader_builder()
106            .reload_policy(ReloadPolicy::Manual)
107            .try_into()
108            .map_err(|e: tantivy::TantivyError| MxrError::Search(e.to_string()))?;
109
110        let writer = index
111            .writer(15_000_000)
112            .map_err(|e| MxrError::Search(e.to_string()))?;
113
114        Ok(Self {
115            index,
116            reader,
117            writer,
118            schema: schema_def,
119        })
120    }
121
122    pub fn index_envelope(&mut self, envelope: &Envelope) -> Result<(), MxrError> {
123        let s = &self.schema;
124        let mut doc = TantivyDocument::new();
125        doc.add_text(s.message_id, envelope.id.as_str());
126        doc.add_text(s.account_id, envelope.account_id.as_str());
127        doc.add_text(s.thread_id, envelope.thread_id.as_str());
128        doc.add_text(s.subject, &envelope.subject);
129        doc.add_text(s.from_name, envelope.from.name.as_deref().unwrap_or(""));
130        doc.add_text(s.from_email, &envelope.from.email);
131        for addr in &envelope.to {
132            doc.add_text(s.to_email, &addr.email);
133        }
134        for addr in &envelope.cc {
135            doc.add_text(s.cc_email, &addr.email);
136        }
137        for addr in &envelope.bcc {
138            doc.add_text(s.bcc_email, &addr.email);
139        }
140        doc.add_text(s.snippet, &envelope.snippet);
141        for label in &envelope.label_provider_ids {
142            doc.add_text(s.labels, label.to_lowercase());
143        }
144        doc.add_u64(s.size_bytes, envelope.size_bytes);
145        doc.add_u64(s.flags, envelope.flags.bits() as u64);
146        doc.add_bool(s.has_attachments, envelope.has_attachments);
147        doc.add_bool(s.is_read, envelope.flags.contains(MessageFlags::READ));
148        doc.add_bool(s.is_starred, envelope.flags.contains(MessageFlags::STARRED));
149        doc.add_bool(s.is_draft, envelope.flags.contains(MessageFlags::DRAFT));
150        doc.add_bool(s.is_sent, envelope.flags.contains(MessageFlags::SENT));
151        doc.add_bool(s.is_trash, envelope.flags.contains(MessageFlags::TRASH));
152        doc.add_bool(s.is_spam, envelope.flags.contains(MessageFlags::SPAM));
153        doc.add_bool(
154            s.is_answered,
155            envelope.flags.contains(MessageFlags::ANSWERED),
156        );
157
158        let timestamp = envelope.date.timestamp();
159        let dt = tantivy::DateTime::from_timestamp_secs(timestamp);
160        doc.add_date(s.date, dt);
161        doc.add_i64(s.sort_date_ts, sane_search_sort_timestamp(timestamp));
162
163        self.writer
164            .add_document(doc)
165            .map_err(|e| MxrError::Search(e.to_string()))?;
166        Ok(())
167    }
168
169    pub fn index_body(&mut self, envelope: &Envelope, body: &MessageBody) -> Result<(), MxrError> {
170        let term = tantivy::Term::from_field_text(self.schema.message_id, &envelope.id.as_str());
171        self.writer.delete_term(term);
172
173        let s = &self.schema;
174        let mut doc = TantivyDocument::new();
175        doc.add_text(s.message_id, envelope.id.as_str());
176        doc.add_text(s.account_id, envelope.account_id.as_str());
177        doc.add_text(s.thread_id, envelope.thread_id.as_str());
178        doc.add_text(s.subject, &envelope.subject);
179        doc.add_text(s.from_name, envelope.from.name.as_deref().unwrap_or(""));
180        doc.add_text(s.from_email, &envelope.from.email);
181        for addr in &envelope.to {
182            doc.add_text(s.to_email, &addr.email);
183        }
184        for addr in &envelope.cc {
185            doc.add_text(s.cc_email, &addr.email);
186        }
187        for addr in &envelope.bcc {
188            doc.add_text(s.bcc_email, &addr.email);
189        }
190        doc.add_text(s.snippet, &envelope.snippet);
191        for label in &envelope.label_provider_ids {
192            doc.add_text(s.labels, label.to_lowercase());
193        }
194
195        let body_text = body.text_plain.as_deref().unwrap_or("");
196        doc.add_text(s.body_text, body_text);
197        for attachment in &body.attachments {
198            doc.add_text(s.attachment_filenames, attachment.filename.to_lowercase());
199        }
200
201        doc.add_u64(s.size_bytes, envelope.size_bytes);
202        doc.add_u64(s.flags, envelope.flags.bits() as u64);
203        doc.add_bool(s.has_attachments, envelope.has_attachments);
204        doc.add_bool(s.is_read, envelope.flags.contains(MessageFlags::READ));
205        doc.add_bool(s.is_starred, envelope.flags.contains(MessageFlags::STARRED));
206        doc.add_bool(s.is_draft, envelope.flags.contains(MessageFlags::DRAFT));
207        doc.add_bool(s.is_sent, envelope.flags.contains(MessageFlags::SENT));
208        doc.add_bool(s.is_trash, envelope.flags.contains(MessageFlags::TRASH));
209        doc.add_bool(s.is_spam, envelope.flags.contains(MessageFlags::SPAM));
210        doc.add_bool(
211            s.is_answered,
212            envelope.flags.contains(MessageFlags::ANSWERED),
213        );
214        let timestamp = envelope.date.timestamp();
215        let dt = tantivy::DateTime::from_timestamp_secs(timestamp);
216        doc.add_date(s.date, dt);
217        doc.add_i64(s.sort_date_ts, sane_search_sort_timestamp(timestamp));
218
219        self.writer
220            .add_document(doc)
221            .map_err(|e| MxrError::Search(e.to_string()))?;
222        Ok(())
223    }
224
225    pub fn remove_document(&mut self, message_id: &MessageId) {
226        let term = tantivy::Term::from_field_text(self.schema.message_id, &message_id.as_str());
227        self.writer.delete_term(term);
228    }
229
230    pub fn commit(&mut self) -> Result<(), MxrError> {
231        self.writer
232            .commit()
233            .map_err(|e| MxrError::Search(e.to_string()))?;
234        self.reader
235            .reload()
236            .map_err(|e| MxrError::Search(e.to_string()))?;
237        Ok(())
238    }
239
240    pub fn search(
241        &self,
242        query_str: &str,
243        limit: usize,
244        offset: usize,
245        sort: SortOrder,
246    ) -> Result<SearchPage, MxrError> {
247        let s = &self.schema;
248
249        let mut query_parser = QueryParser::for_index(
250            &self.index,
251            vec![
252                s.subject,
253                s.from_name,
254                s.snippet,
255                s.body_text,
256                s.attachment_filenames,
257            ],
258        );
259        query_parser.set_field_boost(s.subject, 3.0);
260        query_parser.set_field_boost(s.from_name, 2.0);
261        query_parser.set_field_boost(s.snippet, 1.0);
262        query_parser.set_field_boost(s.body_text, 0.5);
263        query_parser.set_field_boost(s.attachment_filenames, 0.75);
264
265        let query = query_parser
266            .parse_query(query_str)
267            .map_err(|e| MxrError::Search(e.to_string()))?;
268
269        let searcher = self.reader.searcher();
270        let fetch_limit = limit.saturating_add(1);
271        let top_docs = match sort {
272            SortOrder::Relevance => searcher
273                .search(&query, &TopDocs::with_limit(fetch_limit).and_offset(offset))
274                .map_err(|e| MxrError::Search(e.to_string()))?
275                .into_iter()
276                .map(|(score, doc_address)| (score, doc_address))
277                .collect::<Vec<_>>(),
278            SortOrder::DateDesc => searcher
279                .search(
280                    &query,
281                    &TopDocs::with_limit(fetch_limit)
282                        .and_offset(offset)
283                        .order_by_fast_field::<i64>("sort_date_ts", Order::Desc),
284                )
285                .map_err(|e| MxrError::Search(e.to_string()))?
286                .into_iter()
287                .map(|(sort_score, doc_address)| (sort_score as f32, doc_address))
288                .collect::<Vec<_>>(),
289            SortOrder::DateAsc => searcher
290                .search(
291                    &query,
292                    &TopDocs::with_limit(fetch_limit)
293                        .and_offset(offset)
294                        .order_by_fast_field::<i64>("sort_date_ts", Order::Asc),
295                )
296                .map_err(|e| MxrError::Search(e.to_string()))?
297                .into_iter()
298                .map(|(sort_score, doc_address)| (sort_score as f32, doc_address))
299                .collect::<Vec<_>>(),
300        };
301
302        let has_more = top_docs.len() > limit;
303        let mut results = Vec::with_capacity(top_docs.len().min(limit));
304        for (score, doc_address) in top_docs.into_iter().take(limit) {
305            let doc: TantivyDocument = searcher
306                .doc(doc_address)
307                .map_err(|e| MxrError::Search(e.to_string()))?;
308
309            let message_id = doc
310                .get_first(s.message_id)
311                .and_then(|v| v.as_str())
312                .unwrap_or("")
313                .to_string();
314            let account_id = doc
315                .get_first(s.account_id)
316                .and_then(|v| v.as_str())
317                .unwrap_or("")
318                .to_string();
319            let thread_id = doc
320                .get_first(s.thread_id)
321                .and_then(|v| v.as_str())
322                .unwrap_or("")
323                .to_string();
324
325            results.push(SearchResult {
326                message_id,
327                account_id,
328                thread_id,
329                score,
330            });
331        }
332
333        Ok(SearchPage { results, has_more })
334    }
335
336    /// Number of indexed documents.
337    pub fn num_docs(&self) -> u64 {
338        self.reader.searcher().num_docs()
339    }
340
341    /// Clear all documents and prepare for reindexing.
342    pub fn clear(&mut self) -> Result<(), MxrError> {
343        self.writer
344            .delete_all_documents()
345            .map_err(|e| MxrError::Search(e.to_string()))?;
346        self.commit()?;
347        Ok(())
348    }
349
350    pub fn search_ast(
351        &self,
352        query: Box<dyn Query>,
353        limit: usize,
354        offset: usize,
355        sort: SortOrder,
356    ) -> Result<SearchPage, MxrError> {
357        let s = &self.schema;
358        let searcher = self.reader.searcher();
359        let fetch_limit = limit.saturating_add(1);
360        let top_docs = match sort {
361            SortOrder::Relevance => searcher
362                .search(
363                    &*query,
364                    &TopDocs::with_limit(fetch_limit).and_offset(offset),
365                )
366                .map_err(|e| MxrError::Search(e.to_string()))?
367                .into_iter()
368                .map(|(score, doc_address)| (score, doc_address))
369                .collect::<Vec<_>>(),
370            SortOrder::DateDesc => searcher
371                .search(
372                    &*query,
373                    &TopDocs::with_limit(fetch_limit)
374                        .and_offset(offset)
375                        .order_by_fast_field::<i64>("sort_date_ts", Order::Desc),
376                )
377                .map_err(|e| MxrError::Search(e.to_string()))?
378                .into_iter()
379                .map(|(sort_score, doc_address)| (sort_score as f32, doc_address))
380                .collect::<Vec<_>>(),
381            SortOrder::DateAsc => searcher
382                .search(
383                    &*query,
384                    &TopDocs::with_limit(fetch_limit)
385                        .and_offset(offset)
386                        .order_by_fast_field::<i64>("sort_date_ts", Order::Asc),
387                )
388                .map_err(|e| MxrError::Search(e.to_string()))?
389                .into_iter()
390                .map(|(sort_score, doc_address)| (sort_score as f32, doc_address))
391                .collect::<Vec<_>>(),
392        };
393
394        let has_more = top_docs.len() > limit;
395        let mut results = Vec::with_capacity(top_docs.len().min(limit));
396        for (score, doc_address) in top_docs.into_iter().take(limit) {
397            let doc: TantivyDocument = searcher
398                .doc(doc_address)
399                .map_err(|e| MxrError::Search(e.to_string()))?;
400
401            let message_id = doc
402                .get_first(s.message_id)
403                .and_then(|v| v.as_str())
404                .unwrap_or("")
405                .to_string();
406            let account_id = doc
407                .get_first(s.account_id)
408                .and_then(|v| v.as_str())
409                .unwrap_or("")
410                .to_string();
411            let thread_id = doc
412                .get_first(s.thread_id)
413                .and_then(|v| v.as_str())
414                .unwrap_or("")
415                .to_string();
416
417            results.push(SearchResult {
418                message_id,
419                account_id,
420                thread_id,
421                score,
422            });
423        }
424
425        Ok(SearchPage { results, has_more })
426    }
427}