1use crate::schema::MxrSchema;
2use mxr_core::id::MessageId;
3use mxr_core::types::MessageFlags;
4use mxr_core::types::{Envelope, MessageBody, SortOrder};
5use mxr_core::MxrError;
6use std::path::Path;
7use tantivy::{
8 collector::TopDocs, query::Query, query::QueryParser, schema::Value, Index, IndexReader,
9 IndexWriter, Order, ReloadPolicy, TantivyDocument,
10};
11
12pub struct SearchIndex {
13 index: Index,
14 reader: IndexReader,
15 writer: IndexWriter,
16 schema: MxrSchema,
17}
18
19#[derive(Debug, Clone)]
20pub struct SearchResult {
21 pub message_id: String,
22 pub account_id: String,
23 pub thread_id: String,
24 pub score: f32,
25}
26
27#[derive(Debug, Clone)]
28pub struct SearchPage {
29 pub results: Vec<SearchResult>,
30 pub has_more: bool,
31}
32
33fn sane_search_sort_timestamp(timestamp: i64) -> i64 {
34 let cutoff = (chrono::Utc::now() + chrono::Duration::days(1)).timestamp();
35 if timestamp > cutoff {
36 0
37 } else {
38 timestamp
39 }
40}
41
42impl SearchIndex {
43 pub fn schema(&self) -> &MxrSchema {
44 &self.schema
45 }
46
47 pub fn open(index_path: &Path) -> Result<Self, MxrError> {
48 let (index, _) = Self::open_with_rebuild_status(index_path)?;
49 Ok(index)
50 }
51
52 pub fn open_with_rebuild_status(index_path: &Path) -> Result<(Self, bool), MxrError> {
53 let schema_def = MxrSchema::build();
54 let dir = tantivy::directory::MmapDirectory::open(index_path)
55 .map_err(|e| MxrError::Search(e.to_string()))?;
56
57 let (index, rebuilt) = match Index::open_or_create(dir, schema_def.schema.clone()) {
58 Ok(idx) => (idx, false),
59 Err(e) if e.to_string().contains("schema does not match") => {
60 tracing::warn!("Search index schema mismatch, rebuilding: {e}");
61 if index_path.exists() {
63 std::fs::remove_dir_all(index_path)
64 .map_err(|e| MxrError::Search(e.to_string()))?;
65 std::fs::create_dir_all(index_path)
66 .map_err(|e| MxrError::Search(e.to_string()))?;
67 }
68 let dir = tantivy::directory::MmapDirectory::open(index_path)
69 .map_err(|e| MxrError::Search(e.to_string()))?;
70 (
71 Index::open_or_create(dir, schema_def.schema.clone())
72 .map_err(|e| MxrError::Search(e.to_string()))?,
73 true,
74 )
75 }
76 Err(e) => return Err(MxrError::Search(e.to_string())),
77 };
78
79 let reader = index
80 .reader_builder()
81 .reload_policy(ReloadPolicy::OnCommitWithDelay)
82 .try_into()
83 .map_err(|e: tantivy::TantivyError| MxrError::Search(e.to_string()))?;
84
85 let writer = index
86 .writer(50_000_000)
87 .map_err(|e| MxrError::Search(e.to_string()))?;
88
89 Ok((
90 Self {
91 index,
92 reader,
93 writer,
94 schema: schema_def,
95 },
96 rebuilt,
97 ))
98 }
99
100 pub fn in_memory() -> Result<Self, MxrError> {
101 let schema_def = MxrSchema::build();
102 let index = Index::create_in_ram(schema_def.schema.clone());
103
104 let reader = index
105 .reader_builder()
106 .reload_policy(ReloadPolicy::Manual)
107 .try_into()
108 .map_err(|e: tantivy::TantivyError| MxrError::Search(e.to_string()))?;
109
110 let writer = index
111 .writer(15_000_000)
112 .map_err(|e| MxrError::Search(e.to_string()))?;
113
114 Ok(Self {
115 index,
116 reader,
117 writer,
118 schema: schema_def,
119 })
120 }
121
122 pub fn index_envelope(&mut self, envelope: &Envelope) -> Result<(), MxrError> {
123 let s = &self.schema;
124 let mut doc = TantivyDocument::new();
125 doc.add_text(s.message_id, envelope.id.as_str());
126 doc.add_text(s.account_id, envelope.account_id.as_str());
127 doc.add_text(s.thread_id, envelope.thread_id.as_str());
128 doc.add_text(s.subject, &envelope.subject);
129 doc.add_text(s.from_name, envelope.from.name.as_deref().unwrap_or(""));
130 doc.add_text(s.from_email, &envelope.from.email);
131 for addr in &envelope.to {
132 doc.add_text(s.to_email, &addr.email);
133 }
134 for addr in &envelope.cc {
135 doc.add_text(s.cc_email, &addr.email);
136 }
137 for addr in &envelope.bcc {
138 doc.add_text(s.bcc_email, &addr.email);
139 }
140 doc.add_text(s.snippet, &envelope.snippet);
141 for label in &envelope.label_provider_ids {
142 doc.add_text(s.labels, label.to_lowercase());
143 }
144 doc.add_u64(s.size_bytes, envelope.size_bytes);
145 doc.add_u64(s.flags, envelope.flags.bits() as u64);
146 doc.add_bool(s.has_attachments, envelope.has_attachments);
147 doc.add_bool(s.is_read, envelope.flags.contains(MessageFlags::READ));
148 doc.add_bool(s.is_starred, envelope.flags.contains(MessageFlags::STARRED));
149 doc.add_bool(s.is_draft, envelope.flags.contains(MessageFlags::DRAFT));
150 doc.add_bool(s.is_sent, envelope.flags.contains(MessageFlags::SENT));
151 doc.add_bool(s.is_trash, envelope.flags.contains(MessageFlags::TRASH));
152 doc.add_bool(s.is_spam, envelope.flags.contains(MessageFlags::SPAM));
153 doc.add_bool(
154 s.is_answered,
155 envelope.flags.contains(MessageFlags::ANSWERED),
156 );
157
158 let timestamp = envelope.date.timestamp();
159 let dt = tantivy::DateTime::from_timestamp_secs(timestamp);
160 doc.add_date(s.date, dt);
161 doc.add_i64(s.sort_date_ts, sane_search_sort_timestamp(timestamp));
162
163 self.writer
164 .add_document(doc)
165 .map_err(|e| MxrError::Search(e.to_string()))?;
166 Ok(())
167 }
168
169 pub fn index_body(&mut self, envelope: &Envelope, body: &MessageBody) -> Result<(), MxrError> {
170 let term = tantivy::Term::from_field_text(self.schema.message_id, &envelope.id.as_str());
171 self.writer.delete_term(term);
172
173 let s = &self.schema;
174 let mut doc = TantivyDocument::new();
175 doc.add_text(s.message_id, envelope.id.as_str());
176 doc.add_text(s.account_id, envelope.account_id.as_str());
177 doc.add_text(s.thread_id, envelope.thread_id.as_str());
178 doc.add_text(s.subject, &envelope.subject);
179 doc.add_text(s.from_name, envelope.from.name.as_deref().unwrap_or(""));
180 doc.add_text(s.from_email, &envelope.from.email);
181 for addr in &envelope.to {
182 doc.add_text(s.to_email, &addr.email);
183 }
184 for addr in &envelope.cc {
185 doc.add_text(s.cc_email, &addr.email);
186 }
187 for addr in &envelope.bcc {
188 doc.add_text(s.bcc_email, &addr.email);
189 }
190 doc.add_text(s.snippet, &envelope.snippet);
191 for label in &envelope.label_provider_ids {
192 doc.add_text(s.labels, label.to_lowercase());
193 }
194
195 let body_text = body.text_plain.as_deref().unwrap_or("");
196 doc.add_text(s.body_text, body_text);
197 for attachment in &body.attachments {
198 doc.add_text(s.attachment_filenames, attachment.filename.to_lowercase());
199 }
200
201 doc.add_u64(s.size_bytes, envelope.size_bytes);
202 doc.add_u64(s.flags, envelope.flags.bits() as u64);
203 doc.add_bool(s.has_attachments, envelope.has_attachments);
204 doc.add_bool(s.is_read, envelope.flags.contains(MessageFlags::READ));
205 doc.add_bool(s.is_starred, envelope.flags.contains(MessageFlags::STARRED));
206 doc.add_bool(s.is_draft, envelope.flags.contains(MessageFlags::DRAFT));
207 doc.add_bool(s.is_sent, envelope.flags.contains(MessageFlags::SENT));
208 doc.add_bool(s.is_trash, envelope.flags.contains(MessageFlags::TRASH));
209 doc.add_bool(s.is_spam, envelope.flags.contains(MessageFlags::SPAM));
210 doc.add_bool(
211 s.is_answered,
212 envelope.flags.contains(MessageFlags::ANSWERED),
213 );
214 let timestamp = envelope.date.timestamp();
215 let dt = tantivy::DateTime::from_timestamp_secs(timestamp);
216 doc.add_date(s.date, dt);
217 doc.add_i64(s.sort_date_ts, sane_search_sort_timestamp(timestamp));
218
219 self.writer
220 .add_document(doc)
221 .map_err(|e| MxrError::Search(e.to_string()))?;
222 Ok(())
223 }
224
225 pub fn remove_document(&mut self, message_id: &MessageId) {
226 let term = tantivy::Term::from_field_text(self.schema.message_id, &message_id.as_str());
227 self.writer.delete_term(term);
228 }
229
230 pub fn commit(&mut self) -> Result<(), MxrError> {
231 self.writer
232 .commit()
233 .map_err(|e| MxrError::Search(e.to_string()))?;
234 self.reader
235 .reload()
236 .map_err(|e| MxrError::Search(e.to_string()))?;
237 Ok(())
238 }
239
240 pub fn search(
241 &self,
242 query_str: &str,
243 limit: usize,
244 offset: usize,
245 sort: SortOrder,
246 ) -> Result<SearchPage, MxrError> {
247 let s = &self.schema;
248
249 let mut query_parser = QueryParser::for_index(
250 &self.index,
251 vec![
252 s.subject,
253 s.from_name,
254 s.snippet,
255 s.body_text,
256 s.attachment_filenames,
257 ],
258 );
259 query_parser.set_field_boost(s.subject, 3.0);
260 query_parser.set_field_boost(s.from_name, 2.0);
261 query_parser.set_field_boost(s.snippet, 1.0);
262 query_parser.set_field_boost(s.body_text, 0.5);
263 query_parser.set_field_boost(s.attachment_filenames, 0.75);
264
265 let query = query_parser
266 .parse_query(query_str)
267 .map_err(|e| MxrError::Search(e.to_string()))?;
268
269 let searcher = self.reader.searcher();
270 let fetch_limit = limit.saturating_add(1);
271 let top_docs = match sort {
272 SortOrder::Relevance => searcher
273 .search(&query, &TopDocs::with_limit(fetch_limit).and_offset(offset))
274 .map_err(|e| MxrError::Search(e.to_string()))?
275 .into_iter()
276 .map(|(score, doc_address)| (score, doc_address))
277 .collect::<Vec<_>>(),
278 SortOrder::DateDesc => searcher
279 .search(
280 &query,
281 &TopDocs::with_limit(fetch_limit)
282 .and_offset(offset)
283 .order_by_fast_field::<i64>("sort_date_ts", Order::Desc),
284 )
285 .map_err(|e| MxrError::Search(e.to_string()))?
286 .into_iter()
287 .map(|(sort_score, doc_address)| (sort_score as f32, doc_address))
288 .collect::<Vec<_>>(),
289 SortOrder::DateAsc => searcher
290 .search(
291 &query,
292 &TopDocs::with_limit(fetch_limit)
293 .and_offset(offset)
294 .order_by_fast_field::<i64>("sort_date_ts", Order::Asc),
295 )
296 .map_err(|e| MxrError::Search(e.to_string()))?
297 .into_iter()
298 .map(|(sort_score, doc_address)| (sort_score as f32, doc_address))
299 .collect::<Vec<_>>(),
300 };
301
302 let has_more = top_docs.len() > limit;
303 let mut results = Vec::with_capacity(top_docs.len().min(limit));
304 for (score, doc_address) in top_docs.into_iter().take(limit) {
305 let doc: TantivyDocument = searcher
306 .doc(doc_address)
307 .map_err(|e| MxrError::Search(e.to_string()))?;
308
309 let message_id = doc
310 .get_first(s.message_id)
311 .and_then(|v| v.as_str())
312 .unwrap_or("")
313 .to_string();
314 let account_id = doc
315 .get_first(s.account_id)
316 .and_then(|v| v.as_str())
317 .unwrap_or("")
318 .to_string();
319 let thread_id = doc
320 .get_first(s.thread_id)
321 .and_then(|v| v.as_str())
322 .unwrap_or("")
323 .to_string();
324
325 results.push(SearchResult {
326 message_id,
327 account_id,
328 thread_id,
329 score,
330 });
331 }
332
333 Ok(SearchPage { results, has_more })
334 }
335
336 pub fn num_docs(&self) -> u64 {
338 self.reader.searcher().num_docs()
339 }
340
341 pub fn clear(&mut self) -> Result<(), MxrError> {
343 self.writer
344 .delete_all_documents()
345 .map_err(|e| MxrError::Search(e.to_string()))?;
346 self.commit()?;
347 Ok(())
348 }
349
350 pub fn search_ast(
351 &self,
352 query: Box<dyn Query>,
353 limit: usize,
354 offset: usize,
355 sort: SortOrder,
356 ) -> Result<SearchPage, MxrError> {
357 let s = &self.schema;
358 let searcher = self.reader.searcher();
359 let fetch_limit = limit.saturating_add(1);
360 let top_docs = match sort {
361 SortOrder::Relevance => searcher
362 .search(
363 &*query,
364 &TopDocs::with_limit(fetch_limit).and_offset(offset),
365 )
366 .map_err(|e| MxrError::Search(e.to_string()))?
367 .into_iter()
368 .map(|(score, doc_address)| (score, doc_address))
369 .collect::<Vec<_>>(),
370 SortOrder::DateDesc => searcher
371 .search(
372 &*query,
373 &TopDocs::with_limit(fetch_limit)
374 .and_offset(offset)
375 .order_by_fast_field::<i64>("sort_date_ts", Order::Desc),
376 )
377 .map_err(|e| MxrError::Search(e.to_string()))?
378 .into_iter()
379 .map(|(sort_score, doc_address)| (sort_score as f32, doc_address))
380 .collect::<Vec<_>>(),
381 SortOrder::DateAsc => searcher
382 .search(
383 &*query,
384 &TopDocs::with_limit(fetch_limit)
385 .and_offset(offset)
386 .order_by_fast_field::<i64>("sort_date_ts", Order::Asc),
387 )
388 .map_err(|e| MxrError::Search(e.to_string()))?
389 .into_iter()
390 .map(|(sort_score, doc_address)| (sort_score as f32, doc_address))
391 .collect::<Vec<_>>(),
392 };
393
394 let has_more = top_docs.len() > limit;
395 let mut results = Vec::with_capacity(top_docs.len().min(limit));
396 for (score, doc_address) in top_docs.into_iter().take(limit) {
397 let doc: TantivyDocument = searcher
398 .doc(doc_address)
399 .map_err(|e| MxrError::Search(e.to_string()))?;
400
401 let message_id = doc
402 .get_first(s.message_id)
403 .and_then(|v| v.as_str())
404 .unwrap_or("")
405 .to_string();
406 let account_id = doc
407 .get_first(s.account_id)
408 .and_then(|v| v.as_str())
409 .unwrap_or("")
410 .to_string();
411 let thread_id = doc
412 .get_first(s.thread_id)
413 .and_then(|v| v.as_str())
414 .unwrap_or("")
415 .to_string();
416
417 results.push(SearchResult {
418 message_id,
419 account_id,
420 thread_id,
421 score,
422 });
423 }
424
425 Ok(SearchPage { results, has_more })
426 }
427}