1use crate::schema::MxrSchema;
2use mxr_core::id::MessageId;
3use mxr_core::types::MessageFlags;
4use mxr_core::types::{Envelope, MessageBody};
5use mxr_core::MxrError;
6use std::path::Path;
7use tantivy::{
8 collector::TopDocs, query::Query, query::QueryParser, schema::Value, Index, IndexReader,
9 IndexWriter, ReloadPolicy, TantivyDocument,
10};
11
12pub struct SearchIndex {
13 index: Index,
14 reader: IndexReader,
15 writer: IndexWriter,
16 schema: MxrSchema,
17}
18
19#[derive(Debug, Clone)]
20pub struct SearchResult {
21 pub message_id: String,
22 pub account_id: String,
23 pub thread_id: String,
24 pub score: f32,
25}
26
27impl SearchIndex {
28 pub fn schema(&self) -> &MxrSchema {
29 &self.schema
30 }
31
32 pub fn open(index_path: &Path) -> Result<Self, MxrError> {
33 let schema_def = MxrSchema::build();
34 let dir = tantivy::directory::MmapDirectory::open(index_path)
35 .map_err(|e| MxrError::Search(e.to_string()))?;
36
37 let index = match Index::open_or_create(dir, schema_def.schema.clone()) {
38 Ok(idx) => idx,
39 Err(e) if e.to_string().contains("schema does not match") => {
40 tracing::warn!("Search index schema mismatch, rebuilding: {e}");
41 if index_path.exists() {
43 std::fs::remove_dir_all(index_path)
44 .map_err(|e| MxrError::Search(e.to_string()))?;
45 std::fs::create_dir_all(index_path)
46 .map_err(|e| MxrError::Search(e.to_string()))?;
47 }
48 let dir = tantivy::directory::MmapDirectory::open(index_path)
49 .map_err(|e| MxrError::Search(e.to_string()))?;
50 Index::open_or_create(dir, schema_def.schema.clone())
51 .map_err(|e| MxrError::Search(e.to_string()))?
52 }
53 Err(e) => return Err(MxrError::Search(e.to_string())),
54 };
55
56 let reader = index
57 .reader_builder()
58 .reload_policy(ReloadPolicy::OnCommitWithDelay)
59 .try_into()
60 .map_err(|e: tantivy::TantivyError| MxrError::Search(e.to_string()))?;
61
62 let writer = index
63 .writer(50_000_000)
64 .map_err(|e| MxrError::Search(e.to_string()))?;
65
66 Ok(Self {
67 index,
68 reader,
69 writer,
70 schema: schema_def,
71 })
72 }
73
74 pub fn in_memory() -> Result<Self, MxrError> {
75 let schema_def = MxrSchema::build();
76 let index = Index::create_in_ram(schema_def.schema.clone());
77
78 let reader = index
79 .reader_builder()
80 .reload_policy(ReloadPolicy::Manual)
81 .try_into()
82 .map_err(|e: tantivy::TantivyError| MxrError::Search(e.to_string()))?;
83
84 let writer = index
85 .writer(15_000_000)
86 .map_err(|e| MxrError::Search(e.to_string()))?;
87
88 Ok(Self {
89 index,
90 reader,
91 writer,
92 schema: schema_def,
93 })
94 }
95
96 pub fn index_envelope(&mut self, envelope: &Envelope) -> Result<(), MxrError> {
97 let s = &self.schema;
98 let mut doc = TantivyDocument::new();
99 doc.add_text(s.message_id, envelope.id.as_str());
100 doc.add_text(s.account_id, envelope.account_id.as_str());
101 doc.add_text(s.thread_id, envelope.thread_id.as_str());
102 doc.add_text(s.subject, &envelope.subject);
103 doc.add_text(s.from_name, envelope.from.name.as_deref().unwrap_or(""));
104 doc.add_text(s.from_email, &envelope.from.email);
105 for addr in &envelope.to {
106 doc.add_text(s.to_email, &addr.email);
107 }
108 for addr in &envelope.cc {
109 doc.add_text(s.cc_email, &addr.email);
110 }
111 for addr in &envelope.bcc {
112 doc.add_text(s.bcc_email, &addr.email);
113 }
114 doc.add_text(s.snippet, &envelope.snippet);
115 for label in &envelope.label_provider_ids {
116 doc.add_text(s.labels, label.to_lowercase());
117 }
118 doc.add_u64(s.size_bytes, envelope.size_bytes);
119 doc.add_u64(s.flags, envelope.flags.bits() as u64);
120 doc.add_bool(s.has_attachments, envelope.has_attachments);
121 doc.add_bool(s.is_read, envelope.flags.contains(MessageFlags::READ));
122 doc.add_bool(s.is_starred, envelope.flags.contains(MessageFlags::STARRED));
123 doc.add_bool(s.is_draft, envelope.flags.contains(MessageFlags::DRAFT));
124 doc.add_bool(s.is_sent, envelope.flags.contains(MessageFlags::SENT));
125 doc.add_bool(s.is_trash, envelope.flags.contains(MessageFlags::TRASH));
126 doc.add_bool(s.is_spam, envelope.flags.contains(MessageFlags::SPAM));
127 doc.add_bool(
128 s.is_answered,
129 envelope.flags.contains(MessageFlags::ANSWERED),
130 );
131
132 let dt = tantivy::DateTime::from_timestamp_secs(envelope.date.timestamp());
133 doc.add_date(s.date, dt);
134
135 self.writer
136 .add_document(doc)
137 .map_err(|e| MxrError::Search(e.to_string()))?;
138 Ok(())
139 }
140
141 pub fn index_body(&mut self, envelope: &Envelope, body: &MessageBody) -> Result<(), MxrError> {
142 let term = tantivy::Term::from_field_text(self.schema.message_id, &envelope.id.as_str());
143 self.writer.delete_term(term);
144
145 let s = &self.schema;
146 let mut doc = TantivyDocument::new();
147 doc.add_text(s.message_id, envelope.id.as_str());
148 doc.add_text(s.account_id, envelope.account_id.as_str());
149 doc.add_text(s.thread_id, envelope.thread_id.as_str());
150 doc.add_text(s.subject, &envelope.subject);
151 doc.add_text(s.from_name, envelope.from.name.as_deref().unwrap_or(""));
152 doc.add_text(s.from_email, &envelope.from.email);
153 for addr in &envelope.to {
154 doc.add_text(s.to_email, &addr.email);
155 }
156 for addr in &envelope.cc {
157 doc.add_text(s.cc_email, &addr.email);
158 }
159 for addr in &envelope.bcc {
160 doc.add_text(s.bcc_email, &addr.email);
161 }
162 doc.add_text(s.snippet, &envelope.snippet);
163 for label in &envelope.label_provider_ids {
164 doc.add_text(s.labels, label.to_lowercase());
165 }
166
167 let body_text = body.text_plain.as_deref().unwrap_or("");
168 doc.add_text(s.body_text, body_text);
169 for attachment in &body.attachments {
170 doc.add_text(s.attachment_filenames, attachment.filename.to_lowercase());
171 }
172
173 doc.add_u64(s.size_bytes, envelope.size_bytes);
174 doc.add_u64(s.flags, envelope.flags.bits() as u64);
175 doc.add_bool(s.has_attachments, envelope.has_attachments);
176 doc.add_bool(s.is_read, envelope.flags.contains(MessageFlags::READ));
177 doc.add_bool(s.is_starred, envelope.flags.contains(MessageFlags::STARRED));
178 doc.add_bool(s.is_draft, envelope.flags.contains(MessageFlags::DRAFT));
179 doc.add_bool(s.is_sent, envelope.flags.contains(MessageFlags::SENT));
180 doc.add_bool(s.is_trash, envelope.flags.contains(MessageFlags::TRASH));
181 doc.add_bool(s.is_spam, envelope.flags.contains(MessageFlags::SPAM));
182 doc.add_bool(
183 s.is_answered,
184 envelope.flags.contains(MessageFlags::ANSWERED),
185 );
186 let dt = tantivy::DateTime::from_timestamp_secs(envelope.date.timestamp());
187 doc.add_date(s.date, dt);
188
189 self.writer
190 .add_document(doc)
191 .map_err(|e| MxrError::Search(e.to_string()))?;
192 Ok(())
193 }
194
195 pub fn remove_document(&mut self, message_id: &MessageId) {
196 let term = tantivy::Term::from_field_text(self.schema.message_id, &message_id.as_str());
197 self.writer.delete_term(term);
198 }
199
200 pub fn commit(&mut self) -> Result<(), MxrError> {
201 self.writer
202 .commit()
203 .map_err(|e| MxrError::Search(e.to_string()))?;
204 self.reader
205 .reload()
206 .map_err(|e| MxrError::Search(e.to_string()))?;
207 Ok(())
208 }
209
210 pub fn search(&self, query_str: &str, limit: usize) -> Result<Vec<SearchResult>, MxrError> {
211 let s = &self.schema;
212
213 let mut query_parser = QueryParser::for_index(
214 &self.index,
215 vec![
216 s.subject,
217 s.from_name,
218 s.snippet,
219 s.body_text,
220 s.attachment_filenames,
221 ],
222 );
223 query_parser.set_field_boost(s.subject, 3.0);
224 query_parser.set_field_boost(s.from_name, 2.0);
225 query_parser.set_field_boost(s.snippet, 1.0);
226 query_parser.set_field_boost(s.body_text, 0.5);
227 query_parser.set_field_boost(s.attachment_filenames, 0.75);
228
229 let query = query_parser
230 .parse_query(query_str)
231 .map_err(|e| MxrError::Search(e.to_string()))?;
232
233 let searcher = self.reader.searcher();
234 let top_docs = searcher
235 .search(&query, &TopDocs::with_limit(limit))
236 .map_err(|e| MxrError::Search(e.to_string()))?;
237
238 let mut results = Vec::with_capacity(top_docs.len());
239 for (score, doc_address) in top_docs {
240 let doc: TantivyDocument = searcher
241 .doc(doc_address)
242 .map_err(|e| MxrError::Search(e.to_string()))?;
243
244 let message_id = doc
245 .get_first(s.message_id)
246 .and_then(|v| v.as_str())
247 .unwrap_or("")
248 .to_string();
249 let account_id = doc
250 .get_first(s.account_id)
251 .and_then(|v| v.as_str())
252 .unwrap_or("")
253 .to_string();
254 let thread_id = doc
255 .get_first(s.thread_id)
256 .and_then(|v| v.as_str())
257 .unwrap_or("")
258 .to_string();
259
260 results.push(SearchResult {
261 message_id,
262 account_id,
263 thread_id,
264 score,
265 });
266 }
267
268 Ok(results)
269 }
270
271 pub fn num_docs(&self) -> u64 {
273 self.reader.searcher().num_docs()
274 }
275
276 pub fn clear(&mut self) -> Result<(), MxrError> {
278 self.writer
279 .delete_all_documents()
280 .map_err(|e| MxrError::Search(e.to_string()))?;
281 self.commit()?;
282 Ok(())
283 }
284
285 pub fn search_ast(
286 &self,
287 query: Box<dyn Query>,
288 limit: usize,
289 ) -> Result<Vec<SearchResult>, MxrError> {
290 let s = &self.schema;
291 let searcher = self.reader.searcher();
292 let top_docs = searcher
293 .search(&*query, &TopDocs::with_limit(limit))
294 .map_err(|e| MxrError::Search(e.to_string()))?;
295
296 let mut results = Vec::with_capacity(top_docs.len());
297 for (score, doc_address) in top_docs {
298 let doc: TantivyDocument = searcher
299 .doc(doc_address)
300 .map_err(|e| MxrError::Search(e.to_string()))?;
301
302 let message_id = doc
303 .get_first(s.message_id)
304 .and_then(|v| v.as_str())
305 .unwrap_or("")
306 .to_string();
307 let account_id = doc
308 .get_first(s.account_id)
309 .and_then(|v| v.as_str())
310 .unwrap_or("")
311 .to_string();
312 let thread_id = doc
313 .get_first(s.thread_id)
314 .and_then(|v| v.as_str())
315 .unwrap_or("")
316 .to_string();
317
318 results.push(SearchResult {
319 message_id,
320 account_id,
321 thread_id,
322 score,
323 });
324 }
325
326 Ok(results)
327 }
328}