1use async_trait::async_trait;
59use rusmes_proto::{Mail, MessageId};
60use std::path::Path;
61use tantivy::{
62 collector::TopDocs,
63 query::QueryParser,
64 schema::{Field, Schema, Value, STORED, TEXT},
65 Index, IndexReader, IndexWriter, TantivyDocument,
66};
67use thiserror::Error;
68use uuid::Uuid;
69
70#[derive(Debug, Error)]
72pub enum SearchError {
73 #[error("Tantivy error: {0}")]
74 Tantivy(#[from] tantivy::TantivyError),
75
76 #[error("Query parse error: {0}")]
77 QueryParse(#[from] tantivy::query::QueryParserError),
78
79 #[error("Message not found: {0}")]
80 MessageNotFound(String),
81
82 #[error("Invalid UTF-8 in message")]
83 InvalidUtf8,
84
85 #[error("IO error: {0}")]
86 Io(#[from] std::io::Error),
87}
88
89pub type Result<T> = std::result::Result<T, SearchError>;
90
91#[derive(Debug, Clone)]
93pub struct SearchResult {
94 pub message_uuid: Uuid,
96 pub score: f32,
98}
99
100#[async_trait]
102pub trait SearchIndex: Send + Sync {
103 async fn index_message(&self, message_id: &MessageId, mail: &Mail) -> Result<()>;
105
106 async fn delete_message(&self, message_id: &MessageId) -> Result<()>;
108
109 async fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>>;
112
113 async fn commit(&self) -> Result<()>;
115}
116
117pub struct TantivySearchIndex {
119 index: Index,
120 reader: IndexReader,
121 writer: std::sync::Arc<std::sync::Mutex<IndexWriter>>,
122 schema_fields: SchemaFields,
123}
124
125#[derive(Clone)]
127struct SchemaFields {
128 message_id: Field,
129 from: Field,
130 to: Field,
131 subject: Field,
132 body: Field,
133}
134
135impl TantivySearchIndex {
136 pub fn new(index_path: impl AsRef<Path>) -> Result<Self> {
138 let (schema, fields) = Self::build_schema();
139
140 let index_path = index_path.as_ref();
141 std::fs::create_dir_all(index_path)?;
142
143 let index = Index::create_in_dir(index_path, schema.clone())?;
144 let writer = index.writer(50_000_000)?; let reader = index.reader()?;
146
147 Ok(Self {
148 index,
149 reader,
150 writer: std::sync::Arc::new(std::sync::Mutex::new(writer)),
151 schema_fields: fields,
152 })
153 }
154
155 pub fn open(index_path: impl AsRef<Path>) -> Result<Self> {
157 let index = Index::open_in_dir(index_path.as_ref())?;
158 let schema = index.schema();
159
160 let fields = SchemaFields {
161 message_id: schema.get_field("message_id")?,
162 from: schema.get_field("from")?,
163 to: schema.get_field("to")?,
164 subject: schema.get_field("subject")?,
165 body: schema.get_field("body")?,
166 };
167
168 let writer = index.writer(50_000_000)?;
169 let reader = index.reader()?;
170
171 Ok(Self {
172 index,
173 reader,
174 writer: std::sync::Arc::new(std::sync::Mutex::new(writer)),
175 schema_fields: fields,
176 })
177 }
178
179 fn build_schema() -> (Schema, SchemaFields) {
181 let mut schema_builder = Schema::builder();
182
183 let message_id = schema_builder.add_text_field("message_id", STORED);
184 let from = schema_builder.add_text_field("from", TEXT | STORED);
185 let to = schema_builder.add_text_field("to", TEXT | STORED);
186 let subject = schema_builder.add_text_field("subject", TEXT | STORED);
187 let body = schema_builder.add_text_field("body", TEXT);
188
189 let schema = schema_builder.build();
190 let fields = SchemaFields {
191 message_id,
192 from,
193 to,
194 subject,
195 body,
196 };
197
198 (schema, fields)
199 }
200
201 fn extract_mail_text(&self, mail: &Mail) -> (String, String, String, String) {
203 let message = mail.message();
204 let headers = message.headers();
205
206 let from = headers.get_first("from").unwrap_or("").to_string();
208
209 let to = headers.get_first("to").unwrap_or("").to_string();
211
212 let subject = headers.get_first("subject").unwrap_or("").to_string();
214
215 let body = message.extract_text().unwrap_or_default();
217
218 (from, to, subject, body)
219 }
220}
221
222#[async_trait]
223impl SearchIndex for TantivySearchIndex {
224 async fn index_message(&self, message_id: &MessageId, mail: &Mail) -> Result<()> {
225 let (from, to, subject, body) = self.extract_mail_text(mail);
226
227 let mut doc = TantivyDocument::new();
228 doc.add_text(self.schema_fields.message_id, message_id.to_string());
229 doc.add_text(self.schema_fields.from, from);
230 doc.add_text(self.schema_fields.to, to);
231 doc.add_text(self.schema_fields.subject, subject);
232 doc.add_text(self.schema_fields.body, body);
233
234 let writer = self.writer.lock().map_err(|e| {
235 SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
236 "Writer mutex poisoned: {e}"
237 )))
238 })?;
239 writer.add_document(doc)?;
240
241 Ok(())
242 }
243
244 async fn delete_message(&self, message_id: &MessageId) -> Result<()> {
245 let writer = self.writer.lock().map_err(|e| {
246 SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
247 "Writer mutex poisoned: {e}"
248 )))
249 })?;
250 let term =
251 tantivy::Term::from_field_text(self.schema_fields.message_id, &message_id.to_string());
252 writer.delete_term(term);
253
254 Ok(())
255 }
256
257 async fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>> {
258 let searcher = self.reader.searcher();
259
260 let query_parser = QueryParser::for_index(
261 &self.index,
262 vec![
263 self.schema_fields.from,
264 self.schema_fields.to,
265 self.schema_fields.subject,
266 self.schema_fields.body,
267 ],
268 );
269
270 let query = query_parser.parse_query(query)?;
271 let top_docs = searcher.search(&query, &TopDocs::with_limit(limit))?;
272
273 let mut results = Vec::new();
274 for (score, doc_address) in top_docs {
275 let retrieved_doc: TantivyDocument = searcher.doc(doc_address)?;
276
277 if let Some(message_id_value) = retrieved_doc.get_first(self.schema_fields.message_id) {
278 if let Some(message_id_str) = message_id_value.as_str() {
279 if let Ok(uuid) = message_id_str.parse::<Uuid>() {
280 results.push(SearchResult {
281 message_uuid: uuid,
282 score,
283 });
284 }
285 }
286 }
287 }
288
289 Ok(results)
290 }
291
292 async fn commit(&self) -> Result<()> {
293 let mut writer = self.writer.lock().map_err(|e| {
294 SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
295 "Writer mutex poisoned: {e}"
296 )))
297 })?;
298 writer.commit()?;
299 self.reader.reload()?;
300 Ok(())
301 }
302}