Skip to main content

rusmes_search/
lib.rs

1//! Full-text search for RusMES
2//!
3//! This crate provides a Tantivy-backed full-text search index for mail messages.
4//! It exposes a trait-based abstraction so that the rest of the RusMES system can
5//! remain decoupled from the underlying search engine.
6//!
7//! # Key Features
8//!
9//! - **Tantivy back-end** — uses [Tantivy](https://github.com/quickwit-oss/tantivy) for
10//!   on-disk inverted-index full-text search with BM25 ranking.
11//! - **Async interface** — the [`SearchIndex`] trait is fully `async` and `Send + Sync`,
12//!   making it easy to integrate into Tokio-based protocol handlers.
13//! - **Schema**: indexes `from`, `to`, `subject` (TEXT + STORED) and `body` (TEXT) fields
14//!   extracted from [`rusmes_proto::Mail`] envelopes, plus a stored `message_id` field
15//!   for result correlation. Additional fields: `attachment_filenames` (TEXT+STORED),
16//!   `header_values` (TEXT), and `date` (i64, indexed+stored) for date-range queries.
17//! - **Ranked results** — [`search`][SearchIndex::search] returns [`SearchResult`] items
18//!   sorted by Tantivy relevance score.
19//! - **Atomic commits** — pending index changes are buffered in memory and flushed to disk
20//!   only when [`commit`][SearchIndex::commit] is called, enabling batched indexing.
21//! - **Mutex-guarded writer** — the [`IndexWriter`] is protected by a `std::sync::Mutex`
22//!   so that multiple async tasks can share the same index safely.
23//! - **Result caching** — see [`cache::ResultCache`]. Lookups are short-circuited by an
24//!   LRU cache (capacity 256 by default); writes bump a global version stamp that
25//!   invalidates all entries.
26//! - **Maintenance APIs** — [`TantivySearchIndex::rebuild`] re-indexes every message in a
27//!   storage backend; [`spawn_reindex_worker`] runs that on a tokio interval;
28//!   [`spawn_incremental_indexer`] subscribes to the storage event stream and indexes
29//!   messages as they arrive; [`TantivySearchIndex::index_size_bytes`] reports the on-disk
30//!   footprint.
31//! - **Schema versioning** — a `schema_version.txt` sidecar file gates schema compatibility.
32//!   On version mismatch, the index directory is purged and rebuilt with the current schema.
33//!
34//! # Usage
35//!
36//! ```rust,no_run
37//! use rusmes_search::{TantivySearchIndex, SearchIndex};
38//! use std::path::Path;
39//!
40//! # async fn example() -> rusmes_search::Result<()> {
41//! // Create or open an index at the given path
42//! let index = TantivySearchIndex::new("/var/lib/rusmes/search")?;
43//!
44//! // Search (returns up to `limit` results ranked by relevance)
45//! let results = index.search("quarterly report", 10).await?;
46//! for r in &results {
47//!     println!("message_uuid={} score={}", r.message_uuid, r.score);
48//! }
49//!
50//! // Commit pending writes before querying new documents
51//! index.commit().await?;
52//! # Ok(())
53//! # }
54//! ```
55//!
56//! # Opening vs Creating
57//!
58//! Use [`TantivySearchIndex::new`] when creating an index for the first time.
59//! Use [`TantivySearchIndex::open`] to reopen an existing index after a restart.
60//! Both paths fail with a [`SearchError`] if the directory cannot be accessed or the
61//! schema is incompatible.
62//!
63//! # Error Handling
64//!
65//! All fallible operations return [`Result<T>`][Result] which aliases
66//! `std::result::Result<T, SearchError>`. The [`SearchError`] enum covers Tantivy
67//! engine errors, query parse failures, I/O errors, and missing-message conditions.
68
69pub mod cache;
70pub mod query_translator;
71
72pub use query_translator::{
73    jmap_filter_to_tantivy, parse_search_term, search_query_to_tantivy, JmapSearchFilter,
74    SearchComparator, SearchCondition, SearchField, SearchQuery, TermKind,
75};
76
77use async_trait::async_trait;
78use rusmes_proto::{Mail, MessageId};
79use rusmes_storage::{StorageBackend, StorageEvent};
80use std::path::{Path, PathBuf};
81use std::str::FromStr;
82use std::sync::Arc;
83use std::time::{Duration, Instant};
84use tantivy::{
85    collector::TopDocs,
86    indexer::LogMergePolicy,
87    query::QueryParser,
88    schema::{Field, NumericOptions, Schema, Value, STORED, TEXT},
89    Index, IndexReader, IndexWriter, TantivyDocument,
90};
91use thiserror::Error;
92use tokio::sync::broadcast;
93use tokio::task::JoinHandle;
94use uuid::Uuid;
95
96pub use cache::ResultCache;
97
98/// Current schema version — increment whenever the Tantivy schema changes.
99///
100/// On index open, this constant is compared against the persisted
101/// `schema_version.txt` sidecar. A mismatch triggers a full index rebuild.
102pub const SCHEMA_VERSION: u32 = 2;
103
104/// Name of the sidecar file written alongside the Tantivy index that records
105/// the schema version that was used to create it.
106const SCHEMA_VERSION_FILE: &str = "schema_version.txt";
107
108/// Search index errors
109#[derive(Debug, Error)]
110pub enum SearchError {
111    #[error("Tantivy error: {0}")]
112    Tantivy(#[from] tantivy::TantivyError),
113
114    #[error("Query parse error: {0}")]
115    QueryParse(#[from] tantivy::query::QueryParserError),
116
117    #[error("Message not found: {0}")]
118    MessageNotFound(String),
119
120    #[error("Invalid UTF-8 in message")]
121    InvalidUtf8,
122
123    #[error("IO error: {0}")]
124    Io(#[from] std::io::Error),
125
126    #[error("Storage error: {0}")]
127    Storage(String),
128}
129
130pub type Result<T> = std::result::Result<T, SearchError>;
131
132/// Search result containing message ID information
133#[derive(Debug, Clone)]
134pub struct SearchResult {
135    /// The UUID of the message
136    pub message_uuid: Uuid,
137    /// Relevance score
138    pub score: f32,
139}
140
141/// Tunable parameters for the segment-merge policy.
142///
143/// The defaults match the Cluster 9 plan: minimum 8 segments per merge,
144/// `level_log_size = 0.75` (tantivy default — restated explicitly so tests can
145/// assert the policy was set), and `min_layer_size` set to `100` documents.
146///
147/// # Note on `min_layer_size`
148///
149/// Tantivy's `LogMergePolicy::min_layer_size` is denominated in **document
150/// count**, not bytes. The Cluster 9 plan asked for "10 MiB" which has no
151/// direct mapping in tantivy 0.25; we substitute `100` documents as a
152/// conservative proxy. Operators can override via [`MergePolicyConfig::min_layer_size`].
153#[derive(Debug, Clone)]
154pub struct MergePolicyConfig {
155    /// Minimum number of segments that may be merged together.
156    pub min_num_segments: usize,
157    /// Minimum segment size (in **documents**) under which all segments are
158    /// considered to belong to the same level.
159    pub min_layer_size: u32,
160    /// Ratio between two consecutive levels.
161    pub level_log_size: f64,
162}
163
164impl Default for MergePolicyConfig {
165    fn default() -> Self {
166        Self {
167            min_num_segments: 8,
168            min_layer_size: 100,
169            level_log_size: 0.75,
170        }
171    }
172}
173
174impl MergePolicyConfig {
175    /// Build a tantivy `LogMergePolicy` from this configuration.
176    pub fn to_tantivy(&self) -> LogMergePolicy {
177        let mut policy = LogMergePolicy::default();
178        policy.set_min_num_segments(self.min_num_segments);
179        policy.set_min_layer_size(self.min_layer_size);
180        policy.set_level_log_size(self.level_log_size);
181        policy
182    }
183}
184
185/// Search index trait for message indexing and querying
186#[async_trait]
187pub trait SearchIndex: Send + Sync {
188    /// Index a message
189    async fn index_message(&self, message_id: &MessageId, mail: &Mail) -> Result<()>;
190
191    /// Delete a message from the index
192    async fn delete_message(&self, message_id: &MessageId) -> Result<()>;
193
194    /// Search for messages matching a query.
195    ///
196    /// Returns a vector of search results ranked by relevance. Implementations
197    /// may serve repeat queries from a result cache; cached results are
198    /// distinguishable by [`SearchResult::score`] = `0.0` (genuine BM25 scores
199    /// are always strictly positive). Callers that require ranking should
200    /// invalidate or bypass the cache before issuing the query.
201    async fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>>;
202
203    /// Commit pending changes to the index
204    async fn commit(&self) -> Result<()>;
205}
206
207/// Tantivy-based search index implementation
208pub struct TantivySearchIndex {
209    index: Index,
210    reader: IndexReader,
211    writer: std::sync::Arc<std::sync::Mutex<IndexWriter>>,
212    schema_fields: SchemaFields,
213    index_path: PathBuf,
214    cache: Arc<ResultCache>,
215}
216
217/// Schema field handles
218#[derive(Clone)]
219struct SchemaFields {
220    message_id: Field,
221    from: Field,
222    to: Field,
223    subject: Field,
224    body: Field,
225    attachment_filenames: Field,
226    header_values: Field,
227    date: Field,
228}
229
230impl TantivySearchIndex {
231    /// Create a new Tantivy search index at the specified path.
232    ///
233    /// Uses the default [`MergePolicyConfig`].
234    pub fn new(index_path: impl AsRef<Path>) -> Result<Self> {
235        Self::new_with_merge_policy(index_path, MergePolicyConfig::default())
236    }
237
238    /// Create a new Tantivy search index with a custom merge policy.
239    pub fn new_with_merge_policy(
240        index_path: impl AsRef<Path>,
241        merge_policy: MergePolicyConfig,
242    ) -> Result<Self> {
243        let (schema, fields) = Self::build_schema();
244
245        let index_path = index_path.as_ref();
246        std::fs::create_dir_all(index_path)?;
247
248        let index = Index::create_in_dir(index_path, schema.clone())?;
249        let writer = index.writer(50_000_000)?; // 50MB heap
250        writer.set_merge_policy(Box::new(merge_policy.to_tantivy()));
251        let reader = index.reader()?;
252
253        // Write the schema version sidecar so future `open()` calls can detect mismatches.
254        write_schema_version(index_path)?;
255
256        Ok(Self {
257            index,
258            reader,
259            writer: std::sync::Arc::new(std::sync::Mutex::new(writer)),
260            schema_fields: fields,
261            index_path: index_path.to_path_buf(),
262            cache: Arc::new(ResultCache::new_default()),
263        })
264    }
265
266    /// Open an existing Tantivy search index.
267    ///
268    /// If the persisted schema version does not match [`SCHEMA_VERSION`], the
269    /// index directory is purged and a fresh index is created.
270    pub fn open(index_path: impl AsRef<Path>) -> Result<Self> {
271        Self::open_with_merge_policy(index_path, MergePolicyConfig::default())
272    }
273
274    /// Open an existing Tantivy search index with a custom merge policy.
275    ///
276    /// If the persisted schema version does not match [`SCHEMA_VERSION`], the
277    /// index directory is purged and a fresh index is created.
278    pub fn open_with_merge_policy(
279        index_path: impl AsRef<Path>,
280        merge_policy: MergePolicyConfig,
281    ) -> Result<Self> {
282        let path_buf = index_path.as_ref().to_path_buf();
283
284        // Check schema version. If mismatched (or sidecar absent), purge and recreate.
285        if !schema_version_matches(&path_buf) {
286            tracing::warn!(
287                "rusmes-search: schema version mismatch at {:?} — purging and rebuilding index",
288                path_buf
289            );
290            purge_index_dir(&path_buf)?;
291            return Self::new_with_merge_policy(path_buf, merge_policy);
292        }
293
294        let index = Index::open_in_dir(&path_buf)?;
295        let schema = index.schema();
296
297        // Verify all required fields exist (guards against partial corruption).
298        let fields = SchemaFields {
299            message_id: schema.get_field("message_id")?,
300            from: schema.get_field("from")?,
301            to: schema.get_field("to")?,
302            subject: schema.get_field("subject")?,
303            body: schema.get_field("body")?,
304            attachment_filenames: schema.get_field("attachment_filenames")?,
305            header_values: schema.get_field("header_values")?,
306            date: schema.get_field("date")?,
307        };
308
309        let writer = index.writer(50_000_000)?;
310        writer.set_merge_policy(Box::new(merge_policy.to_tantivy()));
311        let reader = index.reader()?;
312
313        Ok(Self {
314            index,
315            reader,
316            writer: std::sync::Arc::new(std::sync::Mutex::new(writer)),
317            schema_fields: fields,
318            index_path: path_buf,
319            cache: Arc::new(ResultCache::new_default()),
320        })
321    }
322
323    /// Build the search schema
324    fn build_schema() -> (Schema, SchemaFields) {
325        let mut schema_builder = Schema::builder();
326
327        let message_id = schema_builder.add_text_field("message_id", STORED);
328        let from = schema_builder.add_text_field("from", TEXT | STORED);
329        let to = schema_builder.add_text_field("to", TEXT | STORED);
330        let subject = schema_builder.add_text_field("subject", TEXT | STORED);
331        let body = schema_builder.add_text_field("body", TEXT);
332        let attachment_filenames =
333            schema_builder.add_text_field("attachment_filenames", TEXT | STORED);
334        let header_values = schema_builder.add_text_field("header_values", TEXT);
335        let date = schema_builder
336            .add_i64_field("date", NumericOptions::default().set_indexed().set_stored());
337
338        let schema = schema_builder.build();
339        let fields = SchemaFields {
340            message_id,
341            from,
342            to,
343            subject,
344            body,
345            attachment_filenames,
346            header_values,
347            date,
348        };
349
350        (schema, fields)
351    }
352
353    /// Extract text content from mail for indexing.
354    ///
355    /// Returns `(from, to, subject, body, attachment_filenames, header_values, date_unix)`.
356    ///
357    /// Body extraction performs a recursive MIME walk:
358    /// - If `text/plain` is found, use it as body.
359    /// - Else if `text/html` is found, convert to plain text via `html2text`.
360    /// - Falls back to the raw `extract_text()` for non-MIME messages.
361    ///
362    /// Attachment filenames are collected from non-inline parts that have a
363    /// `Content-Disposition: attachment; filename=…` or `Content-Type: …; name=…`.
364    ///
365    /// Header values concatenates Subject, From, To, Cc, Bcc, Reply-To, and
366    /// Message-ID (after fold-whitespace normalisation) for full-header searching.
367    ///
368    /// The `date` field stores the `Date:` header as a Unix timestamp (`i64`).
369    /// Returns `0` if the header is absent or unparseable.
370    fn extract_mail_text(
371        &self,
372        mail: &Mail,
373    ) -> (String, String, String, String, String, String, i64) {
374        let message = mail.message();
375        let headers = message.headers();
376
377        // Extract standard envelope headers.
378        let from = headers.get_first("from").unwrap_or("").to_string();
379        let to = headers.get_first("to").unwrap_or("").to_string();
380        let subject = headers.get_first("subject").unwrap_or("").to_string();
381
382        // Attempt a MIME walk for body and attachments.
383        let (body, attachment_filenames) = extract_body_and_attachments(message);
384
385        // Build the header_values field from the key searchable headers.
386        let header_values = build_header_values(headers);
387
388        // Parse the Date header into a Unix timestamp.
389        let date_unix = parse_date_header(headers);
390
391        (
392            from,
393            to,
394            subject,
395            body,
396            attachment_filenames,
397            header_values,
398            date_unix,
399        )
400    }
401
402    /// Return a clone of the shared `ResultCache`. Useful for tests and for
403    /// metrics that want to expose cache statistics.
404    pub fn cache(&self) -> Arc<ResultCache> {
405        self.cache.clone()
406    }
407
408    /// Return the Tantivy [`Schema`] used by this index.
409    ///
410    /// Callers (e.g. `query_translator`) need the schema to resolve field
411    /// handles by name when building programmatic queries.
412    pub fn schema(&self) -> Schema {
413        self.index.schema()
414    }
415
416    /// Execute a pre-built Tantivy `Query` against this index and return the
417    /// UUIDs of the matching messages (up to `limit` results).
418    ///
419    /// This is the low-level search path used by the query translation layer.
420    /// The higher-level [`search`][SearchIndex::search] method accepts a query
421    /// string and uses the built-in `QueryParser`; this method accepts an
422    /// already-constructed `Box<dyn Query>` instead.
423    ///
424    /// Results are returned in descending relevance order (Tantivy BM25 score).
425    /// Documents that do not carry a valid UUID in their `message_id` field are
426    /// silently skipped.
427    pub fn search_by_query(
428        &self,
429        query: Box<dyn tantivy::query::Query>,
430        limit: usize,
431    ) -> Result<Vec<uuid::Uuid>> {
432        use tantivy::collector::TopDocs;
433        let searcher = self.reader.searcher();
434        let top_docs = searcher.search(query.as_ref(), &TopDocs::with_limit(limit))?;
435        let mut results = Vec::with_capacity(top_docs.len());
436        for (_score, addr) in top_docs {
437            let doc: TantivyDocument = searcher.doc(addr)?;
438            if let Some(v) = doc.get_first(self.schema_fields.message_id) {
439                if let Some(s) = v.as_str() {
440                    if let Ok(uuid) = s.parse::<uuid::Uuid>() {
441                        results.push(uuid);
442                    }
443                }
444            }
445        }
446        Ok(results)
447    }
448
449    /// IMAP search fast-path: translate a [`SearchQuery`] into a Tantivy query
450    /// and execute it against the index.
451    ///
452    /// Returns the UUIDs of matching messages. If the index is not available,
453    /// the IMAP handler can fall back to a linear scan.
454    pub fn search_imap(
455        &self,
456        query: &query_translator::SearchQuery,
457        limit: usize,
458    ) -> Result<Vec<uuid::Uuid>> {
459        let schema = self.schema();
460        let tantivy_query = query_translator::search_query_to_tantivy(query, &schema);
461        self.search_by_query(tantivy_query, limit)
462    }
463
464    /// JMAP Email/query fast-path: translate a [`JmapSearchFilter`] into a
465    /// Tantivy query and execute it against the index.
466    ///
467    /// Returns the UUIDs of matching messages. Callers build a
468    /// [`JmapSearchFilter`] from the JMAP `EmailFilterCondition` fields that
469    /// map to searchable text/date fields.
470    pub fn search_jmap(
471        &self,
472        filter: &query_translator::JmapSearchFilter,
473        limit: usize,
474    ) -> Result<Vec<uuid::Uuid>> {
475        let schema = self.schema();
476        let tantivy_query = query_translator::jmap_filter_to_tantivy(filter, &schema);
477        self.search_by_query(tantivy_query, limit)
478    }
479
480    /// Total on-disk size, in bytes, of the index directory.
481    ///
482    /// Walks `index_path` recursively and sums every file's length. Returns 0
483    /// if the path does not exist or any individual file cannot be stat'd.
484    pub fn index_size_bytes(&self) -> u64 {
485        let mut total: u64 = 0;
486        for entry in walkdir::WalkDir::new(&self.index_path)
487            .follow_links(false)
488            .into_iter()
489            .filter_map(std::result::Result::ok)
490        {
491            if entry.file_type().is_file() {
492                if let Ok(meta) = entry.metadata() {
493                    total = total.saturating_add(meta.len());
494                }
495            }
496        }
497        total
498    }
499
500    /// Drop every document in the index and commit. Used by [`Self::rebuild`].
501    async fn truncate(&self) -> Result<()> {
502        {
503            let mut writer = self.writer.lock().map_err(|e| {
504                SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
505                    "Writer mutex poisoned: {e}"
506                )))
507            })?;
508            writer.delete_all_documents()?;
509            writer.commit()?;
510        }
511        self.reader.reload()?;
512        self.cache.invalidate_all();
513        Ok(())
514    }
515
516    /// Re-index every message reachable through `store`.
517    ///
518    /// Drops the existing index first (idempotent), then walks every user via
519    /// [`StorageBackend::list_all_users`], every mailbox via
520    /// `MailboxStore::list_mailboxes`, and every message via
521    /// `MessageStore::get_mailbox_messages` + `MessageStore::get_message`.
522    /// Commits in batches of 1000 messages.
523    ///
524    /// Returns `(messages_indexed, elapsed)`.
525    pub async fn rebuild(&self, store: &dyn StorageBackend) -> Result<(usize, Duration)> {
526        const BATCH_SIZE: usize = 1000;
527
528        let started = Instant::now();
529        self.truncate().await?;
530
531        let mailbox_store = store.mailbox_store();
532        let message_store = store.message_store();
533
534        let users = store
535            .list_all_users()
536            .await
537            .map_err(|e| SearchError::Storage(format!("list_all_users failed: {e}")))?;
538
539        let mut indexed = 0usize;
540        let mut since_commit = 0usize;
541
542        for user in users {
543            let mailboxes = mailbox_store
544                .list_mailboxes(&user)
545                .await
546                .map_err(|e| SearchError::Storage(format!("list_mailboxes failed: {e}")))?;
547            for mailbox in mailboxes {
548                let messages = message_store
549                    .get_mailbox_messages(mailbox.id())
550                    .await
551                    .map_err(|e| {
552                        SearchError::Storage(format!("get_mailbox_messages failed: {e}"))
553                    })?;
554                for metadata in messages {
555                    let mail = match message_store.get_message(metadata.message_id()).await {
556                        Ok(Some(m)) => m,
557                        Ok(None) => {
558                            tracing::debug!(
559                                "rebuild: message {} not retrievable, skipping",
560                                metadata.message_id()
561                            );
562                            continue;
563                        }
564                        Err(e) => {
565                            tracing::warn!(
566                                "rebuild: get_message({}) failed: {}",
567                                metadata.message_id(),
568                                e
569                            );
570                            continue;
571                        }
572                    };
573                    self.add_document_no_invalidate(metadata.message_id(), &mail)?;
574                    indexed += 1;
575                    since_commit += 1;
576                    if since_commit >= BATCH_SIZE {
577                        self.commit_writer().await?;
578                        since_commit = 0;
579                    }
580                }
581            }
582        }
583
584        if since_commit > 0 {
585            self.commit_writer().await?;
586        }
587        // Even if nothing was added, make sure readers see the truncate.
588        self.reader.reload()?;
589        // Bump cache once after the bulk operation completes.
590        self.cache.invalidate_all();
591
592        Ok((indexed, started.elapsed()))
593    }
594
595    /// Test-only door: exposes [`Self::add_document_no_invalidate`] under a
596    /// `#[doc(hidden)]` name so integration tests can verify cache behavior
597    /// (insert a document without bumping the cache version stamp). Not part
598    /// of the public API.
599    #[doc(hidden)]
600    pub fn add_document_for_test(&self, message_id: &MessageId, mail: &Mail) -> Result<()> {
601        self.add_document_no_invalidate(message_id, mail)
602    }
603
604    /// Lower-level helper: build + add a document without bumping cache.
605    /// Caller is responsible for cache invalidation (used by `rebuild` to
606    /// invalidate exactly once after the batch).
607    fn add_document_no_invalidate(&self, message_id: &MessageId, mail: &Mail) -> Result<()> {
608        let (from, to, subject, body, attachment_filenames, header_values, date_unix) =
609            self.extract_mail_text(mail);
610        let mut doc = TantivyDocument::new();
611        doc.add_text(self.schema_fields.message_id, message_id.to_string());
612        doc.add_text(self.schema_fields.from, from);
613        doc.add_text(self.schema_fields.to, to);
614        doc.add_text(self.schema_fields.subject, subject);
615        doc.add_text(self.schema_fields.body, body);
616        doc.add_text(
617            self.schema_fields.attachment_filenames,
618            attachment_filenames,
619        );
620        doc.add_text(self.schema_fields.header_values, header_values);
621        doc.add_i64(self.schema_fields.date, date_unix);
622        let writer = self.writer.lock().map_err(|e| {
623            SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
624                "Writer mutex poisoned: {e}"
625            )))
626        })?;
627        // Replace any prior document for the same message_id (idempotent).
628        let term =
629            tantivy::Term::from_field_text(self.schema_fields.message_id, &message_id.to_string());
630        writer.delete_term(term);
631        writer.add_document(doc)?;
632        Ok(())
633    }
634
635    /// Commit the writer (helper used by batched paths).
636    async fn commit_writer(&self) -> Result<()> {
637        let mut writer = self.writer.lock().map_err(|e| {
638            SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
639                "Writer mutex poisoned: {e}"
640            )))
641        })?;
642        writer.commit()?;
643        self.reader.reload()?;
644        Ok(())
645    }
646}
647
648#[async_trait]
649impl SearchIndex for TantivySearchIndex {
650    async fn index_message(&self, message_id: &MessageId, mail: &Mail) -> Result<()> {
651        self.add_document_no_invalidate(message_id, mail)?;
652        // Any cached query result may now be stale; bump the version stamp.
653        self.cache.invalidate_all();
654        Ok(())
655    }
656
657    async fn delete_message(&self, message_id: &MessageId) -> Result<()> {
658        let writer = self.writer.lock().map_err(|e| {
659            SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
660                "Writer mutex poisoned: {e}"
661            )))
662        })?;
663        let term =
664            tantivy::Term::from_field_text(self.schema_fields.message_id, &message_id.to_string());
665        writer.delete_term(term);
666        drop(writer);
667        self.cache.invalidate_all();
668        Ok(())
669    }
670
671    async fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>> {
672        let key = ResultCache::make_key(query, None);
673        if let Some(ids) = self.cache.get(&key) {
674            // Cache hits return zero-score results — we don't store the score
675            // so callers that care about ranking should bypass the cache.
676            return Ok(ids
677                .into_iter()
678                .map(|m| SearchResult {
679                    message_uuid: *m.as_uuid(),
680                    score: 0.0,
681                })
682                .collect());
683        }
684
685        let searcher = self.reader.searcher();
686
687        let query_parser = QueryParser::for_index(
688            &self.index,
689            vec![
690                self.schema_fields.from,
691                self.schema_fields.to,
692                self.schema_fields.subject,
693                self.schema_fields.body,
694                self.schema_fields.attachment_filenames,
695                self.schema_fields.header_values,
696            ],
697        );
698
699        let parsed = query_parser.parse_query(query)?;
700        let top_docs = searcher.search(&parsed, &TopDocs::with_limit(limit))?;
701
702        let mut results = Vec::new();
703        let mut ids_for_cache = Vec::new();
704        for (score, doc_address) in top_docs {
705            let retrieved_doc: TantivyDocument = searcher.doc(doc_address)?;
706
707            if let Some(message_id_value) = retrieved_doc.get_first(self.schema_fields.message_id) {
708                if let Some(message_id_str) = message_id_value.as_str() {
709                    if let Ok(uuid) = message_id_str.parse::<Uuid>() {
710                        results.push(SearchResult {
711                            message_uuid: uuid,
712                            score,
713                        });
714                        ids_for_cache.push(MessageId::from_uuid(uuid));
715                    }
716                }
717            }
718        }
719
720        self.cache.put(key, ids_for_cache);
721
722        Ok(results)
723    }
724
725    async fn commit(&self) -> Result<()> {
726        let mut writer = self.writer.lock().map_err(|e| {
727            SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
728                "Writer mutex poisoned: {e}"
729            )))
730        })?;
731        writer.commit()?;
732        self.reader.reload()?;
733        Ok(())
734    }
735}
736
737// ─── Schema version helpers ──────────────────────────────────────────────────
738
739/// Return `true` if `<dir>/schema_version.txt` exists and contains the current
740/// [`SCHEMA_VERSION`]. Returns `false` if the file is absent, unreadable, or
741/// contains a different version number.
742fn schema_version_matches(dir: &Path) -> bool {
743    let path = dir.join(SCHEMA_VERSION_FILE);
744    match std::fs::read_to_string(&path) {
745        Ok(contents) => contents
746            .trim()
747            .parse::<u32>()
748            .map(|v| v == SCHEMA_VERSION)
749            .unwrap_or(false),
750        Err(_) => false,
751    }
752}
753
754/// Write `<dir>/schema_version.txt` with the current [`SCHEMA_VERSION`].
755fn write_schema_version(dir: &Path) -> Result<()> {
756    let path = dir.join(SCHEMA_VERSION_FILE);
757    std::fs::write(path, SCHEMA_VERSION.to_string()).map_err(SearchError::Io)
758}
759
760/// Remove all files and subdirectories inside `dir` (but keep `dir` itself so
761/// the caller can call `Index::create_in_dir` on it).
762fn purge_index_dir(dir: &Path) -> Result<()> {
763    if !dir.exists() {
764        return Ok(());
765    }
766    for entry in std::fs::read_dir(dir)? {
767        let entry = entry?;
768        let path = entry.path();
769        if path.is_dir() {
770            std::fs::remove_dir_all(&path)?;
771        } else {
772            std::fs::remove_file(&path)?;
773        }
774    }
775    Ok(())
776}
777
778// ─── MIME walk helpers ───────────────────────────────────────────────────────
779
780/// Recursively walk a MIME message and collect the best plain-text body and
781/// all attachment filenames.
782///
783/// Priority:
784/// 1. `text/plain` part → use as body (first one found wins).
785/// 2. `text/html` part (if no `text/plain`) → convert via `html2text`.
786/// 3. Non-MIME message → `extract_text()` fallback.
787///
788/// Attachment filenames are concatenated (space-separated). Inline parts
789/// (`Content-Disposition: inline`) are excluded.
790fn extract_body_and_attachments(message: &rusmes_proto::MimeMessage) -> (String, String) {
791    use rusmes_proto::mime::{split_multipart, ContentType};
792
793    // Inline helper: return body bytes for Small variant only.
794    // Large bodies require async I/O and cannot be processed in this sync context.
795    let small_body_str = |msg: &rusmes_proto::MimeMessage| -> String {
796        match msg.body() {
797            rusmes_proto::MessageBody::Small(b) => String::from_utf8_lossy(b).into_owned(),
798            rusmes_proto::MessageBody::Large(_) => String::new(),
799        }
800    };
801
802    // Get the top-level Content-Type.
803    let ct = match message.content_type() {
804        Ok(Some(ct)) => ct,
805        _ => {
806            // No Content-Type — treat as plain text.
807            // For Small bodies extract directly; Large bodies not available in sync context.
808            return (small_body_str(message), String::new());
809        }
810    };
811
812    if ct.is_multipart() {
813        // Recursively walk multipart.
814        let boundary = match ct.boundary() {
815            Some(b) => b.to_owned(),
816            None => {
817                return (small_body_str(message), String::new());
818            }
819        };
820
821        // Large bodies cannot be processed synchronously; skip them.
822        let raw_body: Vec<u8> = match message.body() {
823            rusmes_proto::MessageBody::Small(b) => b.to_vec(),
824            rusmes_proto::MessageBody::Large(_) => {
825                return (String::new(), String::new());
826            }
827        };
828
829        let parts = match split_multipart(&raw_body, &boundary) {
830            Ok(p) => p,
831            Err(_) => {
832                return (small_body_str(message), String::new());
833            }
834        };
835
836        let mut plain_body: Option<String> = None;
837        let mut html_body: Option<String> = None;
838        let mut attachment_filenames: Vec<String> = Vec::new();
839
840        for part in &parts {
841            let part_ct = part
842                .content_type()
843                .ok()
844                .flatten()
845                .unwrap_or_else(|| ContentType {
846                    main_type: "text".to_string(),
847                    sub_type: "plain".to_string(),
848                    parameters: std::collections::HashMap::new(),
849                });
850
851            let disposition = part
852                .headers
853                .get("content-disposition")
854                .map(|s| s.to_lowercase())
855                .unwrap_or_default();
856
857            // Check if this is an attachment part.
858            let is_attachment = disposition.starts_with("attachment");
859            let is_inline = disposition.starts_with("inline");
860
861            // Collect attachment filenames (exclude inline parts).
862            if is_attachment || (!is_inline && !is_body_part(&part_ct)) {
863                // Try Content-Disposition filename= first.
864                if let Some(fname) = extract_disposition_filename(&disposition) {
865                    attachment_filenames.push(fname);
866                } else if let Some(fname) = part_ct.parameters.get("name") {
867                    attachment_filenames.push(strip_rfc2047(fname));
868                }
869            }
870
871            if is_attachment {
872                // Don't use attachment body for text.
873                continue;
874            }
875
876            match (part_ct.main_type.as_str(), part_ct.sub_type.as_str()) {
877                ("text", "plain") if plain_body.is_none() => {
878                    if let Ok(decoded) = part.decode_body() {
879                        plain_body = Some(String::from_utf8_lossy(&decoded).into_owned());
880                    }
881                }
882                ("text", "html") if html_body.is_none() && plain_body.is_none() => {
883                    if let Ok(decoded) = part.decode_body() {
884                        html_body = Some(html_bytes_to_text(&decoded));
885                    }
886                }
887                ("multipart", _) => {
888                    // Recurse one level into nested multipart using a synthetic message.
889                    // We build a sub-message from the part headers + body.
890                    let sub_bytes = rebuild_part_bytes(part);
891                    if let Ok(sub_msg) = rusmes_proto::MimeMessage::parse_from_bytes(&sub_bytes) {
892                        let (sub_body, sub_filenames) = extract_body_and_attachments(&sub_msg);
893                        if !sub_body.is_empty() && plain_body.is_none() && html_body.is_none() {
894                            plain_body = Some(sub_body);
895                        }
896                        if !sub_filenames.is_empty() {
897                            attachment_filenames.push(sub_filenames);
898                        }
899                    }
900                }
901                _ => {}
902            }
903        }
904
905        let body = plain_body.or(html_body).unwrap_or_default();
906        (body, attachment_filenames.join(" "))
907    } else if ct.main_type == "text" && ct.sub_type == "html" {
908        // Single-part HTML message — decode CTE from Small body; skip Large.
909        match message.body() {
910            rusmes_proto::MessageBody::Small(bytes) => {
911                let encoding = message.content_transfer_encoding();
912                let decoded = match encoding {
913                    rusmes_proto::ContentTransferEncoding::Base64 => {
914                        rusmes_proto::mime::decode_base64(bytes).unwrap_or_default()
915                    }
916                    rusmes_proto::ContentTransferEncoding::QuotedPrintable => {
917                        rusmes_proto::mime::decode_quoted_printable(bytes).unwrap_or_default()
918                    }
919                    _ => bytes.to_vec(),
920                };
921                let text = html_bytes_to_text(&decoded);
922                (text, String::new())
923            }
924            rusmes_proto::MessageBody::Large(_) => (String::new(), String::new()),
925        }
926    } else if ct.main_type == "text" {
927        // Single-part text/plain (or other text subtype) — decode from Small body; skip Large.
928        match message.body() {
929            rusmes_proto::MessageBody::Small(bytes) => {
930                let encoding = message.content_transfer_encoding();
931                let decoded = match encoding {
932                    rusmes_proto::ContentTransferEncoding::Base64 => {
933                        rusmes_proto::mime::decode_base64(bytes).unwrap_or_default()
934                    }
935                    rusmes_proto::ContentTransferEncoding::QuotedPrintable => {
936                        rusmes_proto::mime::decode_quoted_printable(bytes).unwrap_or_default()
937                    }
938                    _ => bytes.to_vec(),
939                };
940                (
941                    String::from_utf8_lossy(&decoded).into_owned(),
942                    String::new(),
943                )
944            }
945            rusmes_proto::MessageBody::Large(_) => (String::new(), String::new()),
946        }
947    } else {
948        // Non-text, non-multipart (e.g. a bare application/pdf) — no indexable text.
949        (String::new(), String::new())
950    }
951}
952
953/// Returns `true` if this MIME part is a body content type rather than an attachment.
954fn is_body_part(ct: &rusmes_proto::mime::ContentType) -> bool {
955    matches!(
956        (ct.main_type.as_str(), ct.sub_type.as_str()),
957        ("text", "plain") | ("text", "html") | ("multipart", _)
958    )
959}
960
961/// Extract a `filename=` value from a Content-Disposition header string
962/// (already lowercased).
963///
964/// Handles both `filename="foo"` and `filename=foo` forms.
965fn extract_disposition_filename(disposition: &str) -> Option<String> {
966    // Split on ';' and scan for the filename parameter.
967    for segment in disposition.split(';') {
968        let seg = segment.trim();
969        if let Some(rest) = seg.strip_prefix("filename=") {
970            let value = rest.trim().trim_matches('"');
971            if !value.is_empty() {
972                return Some(strip_rfc2047(value));
973            }
974        }
975        // Also handle filename* (RFC 5987 extended value) — best-effort fallback.
976        if let Some(rest) = seg.strip_prefix("filename*=") {
977            let value = rest.trim();
978            // Strip encoding prefix like "utf-8''filename.pdf"
979            let fname = value.split("''").last().unwrap_or(value);
980            let fname = fname.trim_matches('"');
981            if !fname.is_empty() {
982                return Some(strip_rfc2047(fname));
983            }
984        }
985    }
986    None
987}
988
989/// Convert raw HTML bytes to a plain-text string using `html2text`.
990///
991/// Uses a width of 1_000_000 columns so no line-wrapping occurs (we want the
992/// full text for indexing, not formatted output). Falls back to lossy UTF-8
993/// if `html2text` fails.
994fn html_bytes_to_text(html: &[u8]) -> String {
995    match html2text::from_read(html, 1_000_000) {
996        Ok(text) => text,
997        Err(_) => String::from_utf8_lossy(html).into_owned(),
998    }
999}
1000
1001/// Rebuild the raw byte form of a `MimePart` so it can be re-parsed by
1002/// `MimeMessage::parse_from_bytes`. This is used for nested multipart recursion.
1003fn rebuild_part_bytes(part: &rusmes_proto::mime::MimePart) -> Vec<u8> {
1004    let mut out = Vec::new();
1005    for (name, value) in &part.headers {
1006        out.extend_from_slice(name.as_bytes());
1007        out.extend_from_slice(b": ");
1008        out.extend_from_slice(value.as_bytes());
1009        out.extend_from_slice(b"\r\n");
1010    }
1011    out.extend_from_slice(b"\r\n");
1012    out.extend_from_slice(&part.body);
1013    out
1014}
1015
1016// ─── Header helpers ───────────────────────────────────────────────────────────
1017
1018/// Concatenate key header values for the `header_values` field.
1019///
1020/// Headers included: Subject, From, To, Cc, Bcc, Reply-To, Message-ID.
1021/// Each value is fold-whitespace normalised (consecutive whitespace → single
1022/// space). RFC 2047 encoded-word sequences are stripped to plain ASCII using a
1023/// best-effort approach.
1024fn build_header_values(headers: &rusmes_proto::HeaderMap) -> String {
1025    const HEADER_NAMES: &[&str] = &[
1026        "subject",
1027        "from",
1028        "to",
1029        "cc",
1030        "bcc",
1031        "reply-to",
1032        "message-id",
1033    ];
1034
1035    let mut parts: Vec<String> = Vec::new();
1036    for name in HEADER_NAMES {
1037        if let Some(values) = headers.get(name) {
1038            for value in values {
1039                let normalised = strip_rfc2047(value.trim());
1040                let normalised = normalise_whitespace(&normalised);
1041                if !normalised.is_empty() {
1042                    parts.push(normalised);
1043                }
1044            }
1045        }
1046    }
1047    parts.join(" ")
1048}
1049
1050/// Best-effort RFC 2047 encoded-word stripping.
1051///
1052/// Replaces `=?charset?encoding?payload?=` sequences with a space. This is
1053/// intentionally minimal — the goal is to avoid indexing base64 blobs, not to
1054/// produce a perfectly decoded string (a full RFC 2047 decoder would require an
1055/// additional dependency).
1056fn strip_rfc2047(input: &str) -> String {
1057    // State machine: scan for `=?` … `?=` and replace with a space.
1058    let mut result = String::with_capacity(input.len());
1059    let mut remaining = input;
1060
1061    while let Some(start) = remaining.find("=?") {
1062        // Emit the text before the encoded-word start.
1063        result.push_str(&remaining[..start]);
1064
1065        let after_start = &remaining[start + 2..];
1066        if let Some(end_offset) = after_start.find("?=") {
1067            // Skip the entire encoded word; insert a space separator.
1068            result.push(' ');
1069            remaining = &after_start[end_offset + 2..];
1070        } else {
1071            // Malformed — emit literally.
1072            result.push_str(&remaining[start..]);
1073            remaining = "";
1074            break;
1075        }
1076    }
1077    result.push_str(remaining);
1078    result
1079}
1080
1081/// Collapse consecutive whitespace characters into a single space and trim
1082/// any leading/trailing whitespace from the result.
1083fn normalise_whitespace(input: &str) -> String {
1084    let mut out = String::with_capacity(input.len());
1085    let mut last_was_space = true; // start as true to suppress leading spaces
1086    for ch in input.chars() {
1087        if ch.is_whitespace() {
1088            if !last_was_space {
1089                out.push(' ');
1090            }
1091            last_was_space = true;
1092        } else {
1093            out.push(ch);
1094            last_was_space = false;
1095        }
1096    }
1097    // Trim a possible trailing space inserted by the loop.
1098    if out.ends_with(' ') {
1099        out.pop();
1100    }
1101    out
1102}
1103
1104// ─── Date parsing ─────────────────────────────────────────────────────────────
1105
1106/// Parse the RFC 5322 `Date:` header and return a Unix timestamp (`i64`).
1107///
1108/// Uses `chrono::DateTime::parse_from_rfc2822` which handles the most common
1109/// email date formats. Returns `0` if the header is absent or unparseable.
1110fn parse_date_header(headers: &rusmes_proto::HeaderMap) -> i64 {
1111    let date_str = match headers.get_first("date") {
1112        Some(d) => d.trim(),
1113        None => return 0,
1114    };
1115
1116    // chrono's parse_from_rfc2822 handles RFC 5322 date-time strings.
1117    match chrono::DateTime::parse_from_rfc2822(date_str) {
1118        Ok(dt) => dt.timestamp(),
1119        Err(_) => 0,
1120    }
1121}
1122
1123// ─── Spawn helpers ────────────────────────────────────────────────────────────
1124
1125/// Spawn a tokio task that periodically calls [`TantivySearchIndex::rebuild`].
1126///
1127/// Pass `Duration::ZERO` (or any duration <= 0) for "manual only" semantics —
1128/// the task is not started and the returned `JoinHandle` resolves immediately.
1129/// Otherwise, the task runs `rebuild` once per `schedule`.
1130///
1131/// Errors during rebuild are logged but do not stop the loop.
1132pub fn spawn_reindex_worker(
1133    idx: Arc<TantivySearchIndex>,
1134    store: Arc<dyn StorageBackend>,
1135    schedule: Duration,
1136) -> JoinHandle<()> {
1137    tokio::spawn(async move {
1138        if schedule.is_zero() {
1139            tracing::debug!("reindex worker: schedule is zero, exiting (manual-only mode)");
1140            return;
1141        }
1142        let mut interval = tokio::time::interval(schedule);
1143        // Skip the immediate-fire on first tick — wait the full interval first.
1144        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1145        loop {
1146            interval.tick().await;
1147            match idx.rebuild(store.as_ref()).await {
1148                Ok((n, elapsed)) => {
1149                    tracing::info!("reindex worker: rebuilt {} messages in {:?}", n, elapsed);
1150                }
1151                Err(e) => {
1152                    tracing::warn!("reindex worker: rebuild failed: {}", e);
1153                }
1154            }
1155        }
1156    })
1157}
1158
1159/// Spawn a tokio task that subscribes to `store.event_stream()` and indexes
1160/// (or deletes) messages as `MessageStored` / `MessageExpunged` events arrive.
1161///
1162/// Commits are debounced: every 100 messages OR every 5 seconds, whichever
1163/// comes first.
1164///
1165/// The task exits cleanly when the storage event stream is dropped (i.e. the
1166/// backend was dropped).
1167pub fn spawn_incremental_indexer(
1168    idx: Arc<TantivySearchIndex>,
1169    store: Arc<dyn StorageBackend>,
1170) -> JoinHandle<()> {
1171    spawn_incremental_indexer_with_config(idx, store, IncrementalConfig::default())
1172}
1173
1174/// Tunables for [`spawn_incremental_indexer_with_config`].
1175#[derive(Debug, Clone)]
1176pub struct IncrementalConfig {
1177    /// Commit after this many indexed/deleted messages.
1178    pub commit_every_n: usize,
1179    /// Commit after this much time has elapsed since the last commit, even if
1180    /// `commit_every_n` has not been reached.
1181    pub commit_every: Duration,
1182}
1183
1184impl Default for IncrementalConfig {
1185    fn default() -> Self {
1186        Self {
1187            commit_every_n: 100,
1188            commit_every: Duration::from_secs(5),
1189        }
1190    }
1191}
1192
1193/// As [`spawn_incremental_indexer`] but with a custom debounce config.
1194pub fn spawn_incremental_indexer_with_config(
1195    idx: Arc<TantivySearchIndex>,
1196    store: Arc<dyn StorageBackend>,
1197    cfg: IncrementalConfig,
1198) -> JoinHandle<()> {
1199    tokio::spawn(async move {
1200        let mut rx = store.event_stream();
1201        let mut pending: usize = 0;
1202        let mut last_commit = Instant::now();
1203        let tick = if cfg.commit_every.is_zero() {
1204            Duration::from_millis(100)
1205        } else {
1206            cfg.commit_every
1207        };
1208        let mut commit_timer = tokio::time::interval(tick);
1209        commit_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1210
1211        loop {
1212            tokio::select! {
1213                event = rx.recv() => {
1214                    match event {
1215                        Ok(StorageEvent::MessageStored { account, mailbox, uid }) => {
1216                            match handle_stored(&idx, store.as_ref(), &account, &mailbox, uid).await {
1217                                Ok(true) => pending += 1,
1218                                Ok(false) => {
1219                                    tracing::debug!(
1220                                        "incremental indexer: stored event for {}/{}/uid={} produced no document",
1221                                        account, mailbox, uid
1222                                    );
1223                                }
1224                                Err(e) => {
1225                                    tracing::warn!(
1226                                        "incremental indexer: failed to index stored {}/{}/uid={}: {}",
1227                                        account, mailbox, uid, e
1228                                    );
1229                                }
1230                            }
1231                        }
1232                        Ok(StorageEvent::MessageExpunged { account, mailbox, uid }) => {
1233                            match handle_expunged(&idx, store.as_ref(), &account, &mailbox, uid).await {
1234                                Ok(true) => pending += 1,
1235                                Ok(false) => {
1236                                    tracing::debug!(
1237                                        "incremental indexer: expunge event for {}/{}/uid={} matched no message",
1238                                        account, mailbox, uid
1239                                    );
1240                                }
1241                                Err(e) => {
1242                                    tracing::warn!(
1243                                        "incremental indexer: failed to expunge {}/{}/uid={}: {}",
1244                                        account, mailbox, uid, e
1245                                    );
1246                                }
1247                            }
1248                        }
1249                        Err(broadcast::error::RecvError::Lagged(skipped)) => {
1250                            tracing::warn!(
1251                                "incremental indexer: lagged behind {} events; consider a full rebuild",
1252                                skipped
1253                            );
1254                        }
1255                        Err(broadcast::error::RecvError::Closed) => {
1256                            tracing::info!("incremental indexer: storage event stream closed; exiting");
1257                            // Final commit before exit.
1258                            if pending > 0 {
1259                                if let Err(e) = idx.commit_writer().await {
1260                                    tracing::warn!(
1261                                        "incremental indexer: final commit failed: {}",
1262                                        e
1263                                    );
1264                                }
1265                            }
1266                            return;
1267                        }
1268                    }
1269                }
1270                _ = commit_timer.tick() => {
1271                    if pending > 0 && last_commit.elapsed() >= cfg.commit_every {
1272                        if let Err(e) = idx.commit_writer().await {
1273                            tracing::warn!("incremental indexer: timer commit failed: {}", e);
1274                        } else {
1275                            pending = 0;
1276                            last_commit = Instant::now();
1277                        }
1278                    }
1279                }
1280            }
1281
1282            if pending >= cfg.commit_every_n {
1283                if let Err(e) = idx.commit_writer().await {
1284                    tracing::warn!("incremental indexer: batch commit failed: {}", e);
1285                } else {
1286                    pending = 0;
1287                    last_commit = Instant::now();
1288                }
1289            }
1290        }
1291    })
1292}
1293
1294/// Resolve `(account, mailbox, uid)` to a `MessageId` via the storage trait API.
1295///
1296/// This walks `mailbox_store.list_mailboxes(user) -> message_store.get_mailbox_messages(mbox) -> match uid`.
1297/// Returns `Ok(None)` if any step yields no match.
1298async fn resolve_event_message_id(
1299    store: &dyn StorageBackend,
1300    account: &str,
1301    mailbox: &str,
1302    uid: u32,
1303) -> Result<Option<MessageId>> {
1304    if account.is_empty() || mailbox.is_empty() {
1305        // Cluster 3's filesystem `delete_messages` fires expunge with empty
1306        // account/mailbox/uid=0; nothing we can do but skip.
1307        return Ok(None);
1308    }
1309    let user = match rusmes_proto::Username::from_str(account) {
1310        Ok(u) => u,
1311        Err(e) => {
1312            tracing::debug!(
1313                "resolve: invalid username '{}' in storage event: {}",
1314                account,
1315                e
1316            );
1317            return Ok(None);
1318        }
1319    };
1320    let mailbox_store = store.mailbox_store();
1321    let message_store = store.message_store();
1322    let mailboxes = mailbox_store
1323        .list_mailboxes(&user)
1324        .await
1325        .map_err(|e| SearchError::Storage(format!("list_mailboxes failed: {e}")))?;
1326    let mailbox_id = match mailboxes
1327        .iter()
1328        .find(|m| m.path().name().map(|n| n == mailbox).unwrap_or(false))
1329        .map(|m| *m.id())
1330    {
1331        Some(id) => id,
1332        None => return Ok(None),
1333    };
1334    let metas = message_store
1335        .get_mailbox_messages(&mailbox_id)
1336        .await
1337        .map_err(|e| SearchError::Storage(format!("get_mailbox_messages failed: {e}")))?;
1338    Ok(metas
1339        .into_iter()
1340        .find(|md| md.uid() == uid)
1341        .map(|md| *md.message_id()))
1342}
1343
1344async fn handle_stored(
1345    idx: &Arc<TantivySearchIndex>,
1346    store: &dyn StorageBackend,
1347    account: &str,
1348    mailbox: &str,
1349    uid: u32,
1350) -> Result<bool> {
1351    let message_id = match resolve_event_message_id(store, account, mailbox, uid).await? {
1352        Some(id) => id,
1353        None => return Ok(false),
1354    };
1355    let message_store = store.message_store();
1356    let mail = match message_store
1357        .get_message(&message_id)
1358        .await
1359        .map_err(|e| SearchError::Storage(format!("get_message failed: {e}")))?
1360    {
1361        Some(m) => m,
1362        None => return Ok(false),
1363    };
1364    idx.add_document_no_invalidate(&message_id, &mail)?;
1365    idx.cache.invalidate_all();
1366    Ok(true)
1367}
1368
1369async fn handle_expunged(
1370    idx: &Arc<TantivySearchIndex>,
1371    store: &dyn StorageBackend,
1372    account: &str,
1373    mailbox: &str,
1374    uid: u32,
1375) -> Result<bool> {
1376    let message_id = match resolve_event_message_id(store, account, mailbox, uid).await? {
1377        Some(id) => id,
1378        None => return Ok(false),
1379    };
1380    idx.delete_message(&message_id).await?;
1381    Ok(true)
1382}
1383
1384// ─── Tests ───────────────────────────────────────────────────────────────────
1385
1386#[cfg(test)]
1387mod tests {
1388    use super::*;
1389    use bytes::Bytes;
1390    use rusmes_proto::mail::Mail;
1391    use rusmes_proto::message::{HeaderMap, MessageBody, MessageId, MimeMessage};
1392
1393    /// Helper: build a minimal Mail wrapping a raw message body string.
1394    fn make_mail(raw_message: &str) -> (MessageId, Mail) {
1395        let message_id = MessageId::new();
1396        let data = raw_message.as_bytes();
1397        let message = MimeMessage::parse_from_bytes(data).unwrap_or_else(|_| {
1398            // Fallback: treat the whole thing as a plain body.
1399            let mut hdr = HeaderMap::new();
1400            hdr.insert("content-type", "text/plain");
1401            MimeMessage::new(hdr, MessageBody::Small(Bytes::from(raw_message.to_owned())))
1402        });
1403        let mail = Mail::new(None, vec![], message, None, None);
1404        (message_id, mail)
1405    }
1406
1407    /// Helper: create a `TantivySearchIndex` in a fresh temp dir.
1408    fn make_index() -> (TantivySearchIndex, tempfile::TempDir) {
1409        let dir = tempfile::TempDir::new().expect("temp dir");
1410        let idx = TantivySearchIndex::new(dir.path()).expect("create index");
1411        (idx, dir)
1412    }
1413
1414    // ── HTML-only message indexing ────────────────────────────────────────────
1415
1416    #[tokio::test]
1417    async fn test_html_only_message_indexed() {
1418        let (idx, _dir) = make_index();
1419
1420        // Build a message with only a text/html body.
1421        let raw = concat!(
1422            "From: alice@example.com\r\n",
1423            "To: bob@example.com\r\n",
1424            "Subject: HTML test\r\n",
1425            "Content-Type: text/html\r\n",
1426            "\r\n",
1427            "<html><body><b>Tantalising</b> content here</body></html>",
1428        );
1429        let (mid, mail) = make_mail(raw);
1430
1431        idx.index_message(&mid, &mail).await.expect("index");
1432        idx.commit().await.expect("commit");
1433
1434        // "Tantalising" appears in the HTML visible text — should be findable.
1435        let results = idx.search("Tantalising", 10).await.expect("search");
1436        assert!(
1437            !results.is_empty(),
1438            "expected HTML body text to be indexed, got no results"
1439        );
1440        assert_eq!(results[0].message_uuid, *mid.as_uuid());
1441    }
1442
1443    // ── Attachment filename indexing ─────────────────────────────────────────
1444
1445    #[tokio::test]
1446    async fn test_attachment_filename_indexed() {
1447        let (idx, _dir) = make_index();
1448
1449        let raw = concat!(
1450            "From: alice@example.com\r\n",
1451            "To: bob@example.com\r\n",
1452            "Subject: Attachment test\r\n",
1453            "Content-Type: multipart/mixed; boundary=\"boundary42\"\r\n",
1454            "\r\n",
1455            "--boundary42\r\n",
1456            "Content-Type: text/plain\r\n",
1457            "\r\n",
1458            "See the attached report.\r\n",
1459            "--boundary42\r\n",
1460            "Content-Type: application/pdf\r\n",
1461            "Content-Disposition: attachment; filename=\"quarterly_report.pdf\"\r\n",
1462            "\r\n",
1463            "PDFBINARYDATA\r\n",
1464            "--boundary42--\r\n",
1465        );
1466        let (mid, mail) = make_mail(raw);
1467
1468        idx.index_message(&mid, &mail).await.expect("index");
1469        idx.commit().await.expect("commit");
1470
1471        // Search for the attachment filename.
1472        let results = idx
1473            .search("attachment_filenames:quarterly_report.pdf", 10)
1474            .await
1475            .expect("search");
1476        assert!(
1477            !results.is_empty(),
1478            "expected attachment filename to be indexed, got no results"
1479        );
1480        assert_eq!(results[0].message_uuid, *mid.as_uuid());
1481    }
1482
1483    // ── Header value indexing (Cc field) ──────────────────────────────────────
1484
1485    #[tokio::test]
1486    async fn test_header_values_indexed() {
1487        let (idx, _dir) = make_index();
1488
1489        let raw = concat!(
1490            "From: alice@example.com\r\n",
1491            "To: bob@example.com\r\n",
1492            "Cc: carol@example.com\r\n",
1493            "Subject: Cc test\r\n",
1494            "Content-Type: text/plain\r\n",
1495            "\r\n",
1496            "Check the Cc header.\r\n",
1497        );
1498        let (mid, mail) = make_mail(raw);
1499
1500        idx.index_message(&mid, &mail).await.expect("index");
1501        idx.commit().await.expect("commit");
1502
1503        // "carol" appears in the Cc header → header_values field.
1504        let results = idx.search("header_values:carol", 10).await.expect("search");
1505        assert!(
1506            !results.is_empty(),
1507            "expected Cc header to be indexed in header_values, got no results"
1508        );
1509        assert_eq!(results[0].message_uuid, *mid.as_uuid());
1510    }
1511
1512    // ── Date field range query ────────────────────────────────────────────────
1513
1514    #[tokio::test]
1515    async fn test_date_field_range_query() {
1516        let (idx, _dir) = make_index();
1517
1518        // 2026-01-01T12:00:00+00:00 = 1767067200
1519        let raw = concat!(
1520            "From: alice@example.com\r\n",
1521            "To: bob@example.com\r\n",
1522            "Date: Thu, 1 Jan 2026 12:00:00 +0000\r\n",
1523            "Subject: Date test\r\n",
1524            "Content-Type: text/plain\r\n",
1525            "\r\n",
1526            "Happy new year.\r\n",
1527        );
1528        let (mid, mail) = make_mail(raw);
1529
1530        idx.index_message(&mid, &mail).await.expect("index");
1531        idx.commit().await.expect("commit");
1532
1533        // Verify via Tantivy RangeQuery on the date field.
1534        use tantivy::query::RangeQuery;
1535
1536        let searcher = idx.reader.searcher();
1537        let date_field = idx.schema_fields.date;
1538
1539        // Lower-bound: 2025-01-01T00:00:00+00:00 = 1735689600
1540        let lower: i64 = 1_735_689_600;
1541        let range_query = RangeQuery::new(
1542            std::ops::Bound::Included(tantivy::Term::from_field_i64(date_field, lower)),
1543            std::ops::Bound::Unbounded,
1544        );
1545
1546        let top_docs = searcher
1547            .search(&range_query, &TopDocs::with_limit(10))
1548            .expect("range search");
1549
1550        assert!(
1551            !top_docs.is_empty(),
1552            "expected date range query to return the message"
1553        );
1554
1555        // Verify the returned doc carries the correct message_id.
1556        let doc: TantivyDocument = searcher.doc(top_docs[0].1).expect("fetch doc");
1557        if let Some(v) = doc.get_first(idx.schema_fields.message_id) {
1558            if let Some(s) = v.as_str() {
1559                assert_eq!(s, mid.to_string().as_str());
1560            }
1561        }
1562
1563        // Verify date value was stored correctly (>= 2026-01-01).
1564        if let Some(date_val) = doc.get_first(date_field) {
1565            if let Some(ts) = date_val.as_i64() {
1566                assert!(
1567                    ts >= lower,
1568                    "stored timestamp {ts} should be >= lower bound {lower}"
1569                );
1570            }
1571        }
1572    }
1573
1574    // ── Schema version sentinel ───────────────────────────────────────────────
1575
1576    #[test]
1577    fn test_schema_version_sentinel_written_on_new() {
1578        let dir = tempfile::TempDir::new().expect("temp dir");
1579        let _idx = TantivySearchIndex::new(dir.path()).expect("create index");
1580        assert!(
1581            schema_version_matches(dir.path()),
1582            "schema_version.txt should be written by new()"
1583        );
1584    }
1585
1586    #[test]
1587    fn test_schema_version_mismatch_triggers_rebuild() {
1588        let dir = tempfile::TempDir::new().expect("temp dir");
1589        // Create with current version.
1590        let _idx = TantivySearchIndex::new(dir.path()).expect("create index");
1591
1592        // Overwrite the sidecar with an old version.
1593        let sidecar = dir.path().join(SCHEMA_VERSION_FILE);
1594        std::fs::write(&sidecar, "1").expect("write old version");
1595        assert!(
1596            !schema_version_matches(dir.path()),
1597            "should detect stale version"
1598        );
1599
1600        // open() should purge and recreate without error.
1601        let _idx2 = TantivySearchIndex::open(dir.path()).expect("open after purge");
1602        assert!(
1603            schema_version_matches(dir.path()),
1604            "schema_version.txt should be updated after purge+recreate"
1605        );
1606    }
1607
1608    // ── strip_rfc2047 unit test ───────────────────────────────────────────────
1609
1610    #[test]
1611    fn test_strip_rfc2047_removes_encoded_words() {
1612        let input = "=?UTF-8?Q?Hello=20World?= plain text =?ISO-8859-1?B?SGVsbG8=?=";
1613        let result = strip_rfc2047(input);
1614        // The encoded-word runs should be replaced with spaces; plain text preserved.
1615        assert!(result.contains("plain text"), "got: {result}");
1616        assert!(
1617            !result.contains("=?"),
1618            "encoded word not stripped: {result}"
1619        );
1620    }
1621
1622    // ── normalise_whitespace unit test ────────────────────────────────────────
1623
1624    #[test]
1625    fn test_normalise_whitespace() {
1626        assert_eq!(normalise_whitespace("  hello   world  "), "hello world");
1627        assert_eq!(normalise_whitespace("a\t\tb"), "a b");
1628        assert_eq!(normalise_whitespace(""), "");
1629    }
1630
1631    // ── HTML body extraction ──────────────────────────────────────────────────
1632
1633    #[test]
1634    fn test_html_bytes_to_text_extracts_visible_text() {
1635        let html = b"<html><body><h1>Report</h1><p>Some <b>bold</b> text.</p></body></html>";
1636        let text = html_bytes_to_text(html);
1637        assert!(
1638            text.contains("Report") || text.contains("bold") || text.contains("text"),
1639            "expected visible text extraction, got: {text}"
1640        );
1641    }
1642
1643    // ── extract_disposition_filename unit test ────────────────────────────────
1644
1645    #[test]
1646    fn test_extract_disposition_filename_quoted() {
1647        let disp = "attachment; filename=\"my report.pdf\"";
1648        let result = extract_disposition_filename(disp);
1649        assert_eq!(result.as_deref(), Some("my report.pdf"));
1650    }
1651
1652    #[test]
1653    fn test_extract_disposition_filename_unquoted() {
1654        let disp = "attachment; filename=report.csv";
1655        let result = extract_disposition_filename(disp);
1656        assert_eq!(result.as_deref(), Some("report.csv"));
1657    }
1658}