Skip to main content

rusmes_search/
lib.rs

1//! Full-text search for RusMES
2//!
3//! This crate provides a Tantivy-backed full-text search index for mail messages.
4//! It exposes a trait-based abstraction so that the rest of the RusMES system can
5//! remain decoupled from the underlying search engine.
6//!
7//! # Key Features
8//!
9//! - **Tantivy back-end** — uses [Tantivy](https://github.com/quickwit-oss/tantivy) for
10//!   on-disk inverted-index full-text search with BM25 ranking.
11//! - **Async interface** — the [`SearchIndex`] trait is fully `async` and `Send + Sync`,
12//!   making it easy to integrate into Tokio-based protocol handlers.
13//! - **Schema**: indexes `from`, `to`, `subject` (TEXT + STORED) and `body` (TEXT) fields
14//!   extracted from [`rusmes_proto::Mail`] envelopes, plus a stored `message_id` field
15//!   for result correlation.
16//! - **Ranked results** — [`search`][SearchIndex::search] returns [`SearchResult`] items
17//!   sorted by Tantivy relevance score.
18//! - **Atomic commits** — pending index changes are buffered in memory and flushed to disk
19//!   only when [`commit`][SearchIndex::commit] is called, enabling batched indexing.
20//! - **Mutex-guarded writer** — the [`IndexWriter`] is protected by a `std::sync::Mutex`
21//!   so that multiple async tasks can share the same index safely.
22//!
23//! # Usage
24//!
25//! ```rust,no_run
26//! use rusmes_search::{TantivySearchIndex, SearchIndex};
27//! use std::path::Path;
28//!
29//! # async fn example() -> rusmes_search::Result<()> {
30//! // Create or open an index at the given path
31//! let index = TantivySearchIndex::new("/var/lib/rusmes/search")?;
32//!
33//! // Search (returns up to `limit` results ranked by relevance)
34//! let results = index.search("quarterly report", 10).await?;
35//! for r in &results {
36//!     println!("message_uuid={} score={}", r.message_uuid, r.score);
37//! }
38//!
39//! // Commit pending writes before querying new documents
40//! index.commit().await?;
41//! # Ok(())
42//! # }
43//! ```
44//!
45//! # Opening vs Creating
46//!
47//! Use [`TantivySearchIndex::new`] when creating an index for the first time.
48//! Use [`TantivySearchIndex::open`] to reopen an existing index after a restart.
49//! Both paths fail with a [`SearchError`] if the directory cannot be accessed or the
50//! schema is incompatible.
51//!
52//! # Error Handling
53//!
54//! All fallible operations return [`Result<T>`][Result] which aliases
55//! `std::result::Result<T, SearchError>`. The [`SearchError`] enum covers Tantivy
56//! engine errors, query parse failures, I/O errors, and missing-message conditions.
57
58use async_trait::async_trait;
59use rusmes_proto::{Mail, MessageId};
60use std::path::Path;
61use tantivy::{
62    collector::TopDocs,
63    query::QueryParser,
64    schema::{Field, Schema, Value, STORED, TEXT},
65    Index, IndexReader, IndexWriter, TantivyDocument,
66};
67use thiserror::Error;
68use uuid::Uuid;
69
70/// Search index errors
71#[derive(Debug, Error)]
72pub enum SearchError {
73    #[error("Tantivy error: {0}")]
74    Tantivy(#[from] tantivy::TantivyError),
75
76    #[error("Query parse error: {0}")]
77    QueryParse(#[from] tantivy::query::QueryParserError),
78
79    #[error("Message not found: {0}")]
80    MessageNotFound(String),
81
82    #[error("Invalid UTF-8 in message")]
83    InvalidUtf8,
84
85    #[error("IO error: {0}")]
86    Io(#[from] std::io::Error),
87}
88
89pub type Result<T> = std::result::Result<T, SearchError>;
90
91/// Search result containing message ID information
92#[derive(Debug, Clone)]
93pub struct SearchResult {
94    /// The UUID of the message
95    pub message_uuid: Uuid,
96    /// Relevance score
97    pub score: f32,
98}
99
100/// Search index trait for message indexing and querying
101#[async_trait]
102pub trait SearchIndex: Send + Sync {
103    /// Index a message
104    async fn index_message(&self, message_id: &MessageId, mail: &Mail) -> Result<()>;
105
106    /// Delete a message from the index
107    async fn delete_message(&self, message_id: &MessageId) -> Result<()>;
108
109    /// Search for messages matching a query
110    /// Returns a vector of search results ranked by relevance
111    async fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>>;
112
113    /// Commit pending changes to the index
114    async fn commit(&self) -> Result<()>;
115}
116
117/// Tantivy-based search index implementation
118pub struct TantivySearchIndex {
119    index: Index,
120    reader: IndexReader,
121    writer: std::sync::Arc<std::sync::Mutex<IndexWriter>>,
122    schema_fields: SchemaFields,
123}
124
125/// Schema field handles
126#[derive(Clone)]
127struct SchemaFields {
128    message_id: Field,
129    from: Field,
130    to: Field,
131    subject: Field,
132    body: Field,
133}
134
135impl TantivySearchIndex {
136    /// Create a new Tantivy search index at the specified path
137    pub fn new(index_path: impl AsRef<Path>) -> Result<Self> {
138        let (schema, fields) = Self::build_schema();
139
140        let index_path = index_path.as_ref();
141        std::fs::create_dir_all(index_path)?;
142
143        let index = Index::create_in_dir(index_path, schema.clone())?;
144        let writer = index.writer(50_000_000)?; // 50MB heap
145        let reader = index.reader()?;
146
147        Ok(Self {
148            index,
149            reader,
150            writer: std::sync::Arc::new(std::sync::Mutex::new(writer)),
151            schema_fields: fields,
152        })
153    }
154
155    /// Open an existing Tantivy search index
156    pub fn open(index_path: impl AsRef<Path>) -> Result<Self> {
157        let index = Index::open_in_dir(index_path.as_ref())?;
158        let schema = index.schema();
159
160        let fields = SchemaFields {
161            message_id: schema.get_field("message_id")?,
162            from: schema.get_field("from")?,
163            to: schema.get_field("to")?,
164            subject: schema.get_field("subject")?,
165            body: schema.get_field("body")?,
166        };
167
168        let writer = index.writer(50_000_000)?;
169        let reader = index.reader()?;
170
171        Ok(Self {
172            index,
173            reader,
174            writer: std::sync::Arc::new(std::sync::Mutex::new(writer)),
175            schema_fields: fields,
176        })
177    }
178
179    /// Build the search schema
180    fn build_schema() -> (Schema, SchemaFields) {
181        let mut schema_builder = Schema::builder();
182
183        let message_id = schema_builder.add_text_field("message_id", STORED);
184        let from = schema_builder.add_text_field("from", TEXT | STORED);
185        let to = schema_builder.add_text_field("to", TEXT | STORED);
186        let subject = schema_builder.add_text_field("subject", TEXT | STORED);
187        let body = schema_builder.add_text_field("body", TEXT);
188
189        let schema = schema_builder.build();
190        let fields = SchemaFields {
191            message_id,
192            from,
193            to,
194            subject,
195            body,
196        };
197
198        (schema, fields)
199    }
200
201    /// Extract text content from mail for indexing
202    fn extract_mail_text(&self, mail: &Mail) -> (String, String, String, String) {
203        let message = mail.message();
204        let headers = message.headers();
205
206        // Extract From header
207        let from = headers.get_first("from").unwrap_or("").to_string();
208
209        // Extract To header
210        let to = headers.get_first("to").unwrap_or("").to_string();
211
212        // Extract Subject header
213        let subject = headers.get_first("subject").unwrap_or("").to_string();
214
215        // Extract body text
216        let body = message.extract_text().unwrap_or_default();
217
218        (from, to, subject, body)
219    }
220}
221
222#[async_trait]
223impl SearchIndex for TantivySearchIndex {
224    async fn index_message(&self, message_id: &MessageId, mail: &Mail) -> Result<()> {
225        let (from, to, subject, body) = self.extract_mail_text(mail);
226
227        let mut doc = TantivyDocument::new();
228        doc.add_text(self.schema_fields.message_id, message_id.to_string());
229        doc.add_text(self.schema_fields.from, from);
230        doc.add_text(self.schema_fields.to, to);
231        doc.add_text(self.schema_fields.subject, subject);
232        doc.add_text(self.schema_fields.body, body);
233
234        let writer = self.writer.lock().map_err(|e| {
235            SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
236                "Writer mutex poisoned: {e}"
237            )))
238        })?;
239        writer.add_document(doc)?;
240
241        Ok(())
242    }
243
244    async fn delete_message(&self, message_id: &MessageId) -> Result<()> {
245        let writer = self.writer.lock().map_err(|e| {
246            SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
247                "Writer mutex poisoned: {e}"
248            )))
249        })?;
250        let term =
251            tantivy::Term::from_field_text(self.schema_fields.message_id, &message_id.to_string());
252        writer.delete_term(term);
253
254        Ok(())
255    }
256
257    async fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>> {
258        let searcher = self.reader.searcher();
259
260        let query_parser = QueryParser::for_index(
261            &self.index,
262            vec![
263                self.schema_fields.from,
264                self.schema_fields.to,
265                self.schema_fields.subject,
266                self.schema_fields.body,
267            ],
268        );
269
270        let query = query_parser.parse_query(query)?;
271        let top_docs = searcher.search(&query, &TopDocs::with_limit(limit))?;
272
273        let mut results = Vec::new();
274        for (score, doc_address) in top_docs {
275            let retrieved_doc: TantivyDocument = searcher.doc(doc_address)?;
276
277            if let Some(message_id_value) = retrieved_doc.get_first(self.schema_fields.message_id) {
278                if let Some(message_id_str) = message_id_value.as_str() {
279                    if let Ok(uuid) = message_id_str.parse::<Uuid>() {
280                        results.push(SearchResult {
281                            message_uuid: uuid,
282                            score,
283                        });
284                    }
285                }
286            }
287        }
288
289        Ok(results)
290    }
291
292    async fn commit(&self) -> Result<()> {
293        let mut writer = self.writer.lock().map_err(|e| {
294            SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
295                "Writer mutex poisoned: {e}"
296            )))
297        })?;
298        writer.commit()?;
299        self.reader.reload()?;
300        Ok(())
301    }
302}