1pub mod cache;
70pub mod query_translator;
71
72pub use query_translator::{
73 jmap_filter_to_tantivy, parse_search_term, search_query_to_tantivy, JmapSearchFilter,
74 SearchComparator, SearchCondition, SearchField, SearchQuery, TermKind,
75};
76
77use async_trait::async_trait;
78use rusmes_proto::{Mail, MessageId};
79use rusmes_storage::{StorageBackend, StorageEvent};
80use std::path::{Path, PathBuf};
81use std::str::FromStr;
82use std::sync::Arc;
83use std::time::{Duration, Instant};
84use tantivy::{
85 collector::TopDocs,
86 indexer::LogMergePolicy,
87 query::QueryParser,
88 schema::{Field, NumericOptions, Schema, Value, STORED, TEXT},
89 Index, IndexReader, IndexWriter, TantivyDocument,
90};
91use thiserror::Error;
92use tokio::sync::broadcast;
93use tokio::task::JoinHandle;
94use uuid::Uuid;
95
96pub use cache::ResultCache;
97
98pub const SCHEMA_VERSION: u32 = 2;
103
104const SCHEMA_VERSION_FILE: &str = "schema_version.txt";
107
108#[derive(Debug, Error)]
110pub enum SearchError {
111 #[error("Tantivy error: {0}")]
112 Tantivy(#[from] tantivy::TantivyError),
113
114 #[error("Query parse error: {0}")]
115 QueryParse(#[from] tantivy::query::QueryParserError),
116
117 #[error("Message not found: {0}")]
118 MessageNotFound(String),
119
120 #[error("Invalid UTF-8 in message")]
121 InvalidUtf8,
122
123 #[error("IO error: {0}")]
124 Io(#[from] std::io::Error),
125
126 #[error("Storage error: {0}")]
127 Storage(String),
128}
129
130pub type Result<T> = std::result::Result<T, SearchError>;
131
132#[derive(Debug, Clone)]
134pub struct SearchResult {
135 pub message_uuid: Uuid,
137 pub score: f32,
139}
140
141#[derive(Debug, Clone)]
154pub struct MergePolicyConfig {
155 pub min_num_segments: usize,
157 pub min_layer_size: u32,
160 pub level_log_size: f64,
162}
163
164impl Default for MergePolicyConfig {
165 fn default() -> Self {
166 Self {
167 min_num_segments: 8,
168 min_layer_size: 100,
169 level_log_size: 0.75,
170 }
171 }
172}
173
174impl MergePolicyConfig {
175 pub fn to_tantivy(&self) -> LogMergePolicy {
177 let mut policy = LogMergePolicy::default();
178 policy.set_min_num_segments(self.min_num_segments);
179 policy.set_min_layer_size(self.min_layer_size);
180 policy.set_level_log_size(self.level_log_size);
181 policy
182 }
183}
184
185#[async_trait]
187pub trait SearchIndex: Send + Sync {
188 async fn index_message(&self, message_id: &MessageId, mail: &Mail) -> Result<()>;
190
191 async fn delete_message(&self, message_id: &MessageId) -> Result<()>;
193
194 async fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>>;
202
203 async fn commit(&self) -> Result<()>;
205}
206
207pub struct TantivySearchIndex {
209 index: Index,
210 reader: IndexReader,
211 writer: std::sync::Arc<std::sync::Mutex<IndexWriter>>,
212 schema_fields: SchemaFields,
213 index_path: PathBuf,
214 cache: Arc<ResultCache>,
215}
216
217#[derive(Clone)]
219struct SchemaFields {
220 message_id: Field,
221 from: Field,
222 to: Field,
223 subject: Field,
224 body: Field,
225 attachment_filenames: Field,
226 header_values: Field,
227 date: Field,
228}
229
230impl TantivySearchIndex {
231 pub fn new(index_path: impl AsRef<Path>) -> Result<Self> {
235 Self::new_with_merge_policy(index_path, MergePolicyConfig::default())
236 }
237
238 pub fn new_with_merge_policy(
240 index_path: impl AsRef<Path>,
241 merge_policy: MergePolicyConfig,
242 ) -> Result<Self> {
243 let (schema, fields) = Self::build_schema();
244
245 let index_path = index_path.as_ref();
246 std::fs::create_dir_all(index_path)?;
247
248 let index = Index::create_in_dir(index_path, schema.clone())?;
249 let writer = index.writer(50_000_000)?; writer.set_merge_policy(Box::new(merge_policy.to_tantivy()));
251 let reader = index.reader()?;
252
253 write_schema_version(index_path)?;
255
256 Ok(Self {
257 index,
258 reader,
259 writer: std::sync::Arc::new(std::sync::Mutex::new(writer)),
260 schema_fields: fields,
261 index_path: index_path.to_path_buf(),
262 cache: Arc::new(ResultCache::new_default()),
263 })
264 }
265
266 pub fn open(index_path: impl AsRef<Path>) -> Result<Self> {
271 Self::open_with_merge_policy(index_path, MergePolicyConfig::default())
272 }
273
274 pub fn open_with_merge_policy(
279 index_path: impl AsRef<Path>,
280 merge_policy: MergePolicyConfig,
281 ) -> Result<Self> {
282 let path_buf = index_path.as_ref().to_path_buf();
283
284 if !schema_version_matches(&path_buf) {
286 tracing::warn!(
287 "rusmes-search: schema version mismatch at {:?} — purging and rebuilding index",
288 path_buf
289 );
290 purge_index_dir(&path_buf)?;
291 return Self::new_with_merge_policy(path_buf, merge_policy);
292 }
293
294 let index = Index::open_in_dir(&path_buf)?;
295 let schema = index.schema();
296
297 let fields = SchemaFields {
299 message_id: schema.get_field("message_id")?,
300 from: schema.get_field("from")?,
301 to: schema.get_field("to")?,
302 subject: schema.get_field("subject")?,
303 body: schema.get_field("body")?,
304 attachment_filenames: schema.get_field("attachment_filenames")?,
305 header_values: schema.get_field("header_values")?,
306 date: schema.get_field("date")?,
307 };
308
309 let writer = index.writer(50_000_000)?;
310 writer.set_merge_policy(Box::new(merge_policy.to_tantivy()));
311 let reader = index.reader()?;
312
313 Ok(Self {
314 index,
315 reader,
316 writer: std::sync::Arc::new(std::sync::Mutex::new(writer)),
317 schema_fields: fields,
318 index_path: path_buf,
319 cache: Arc::new(ResultCache::new_default()),
320 })
321 }
322
323 fn build_schema() -> (Schema, SchemaFields) {
325 let mut schema_builder = Schema::builder();
326
327 let message_id = schema_builder.add_text_field("message_id", STORED);
328 let from = schema_builder.add_text_field("from", TEXT | STORED);
329 let to = schema_builder.add_text_field("to", TEXT | STORED);
330 let subject = schema_builder.add_text_field("subject", TEXT | STORED);
331 let body = schema_builder.add_text_field("body", TEXT);
332 let attachment_filenames =
333 schema_builder.add_text_field("attachment_filenames", TEXT | STORED);
334 let header_values = schema_builder.add_text_field("header_values", TEXT);
335 let date = schema_builder
336 .add_i64_field("date", NumericOptions::default().set_indexed().set_stored());
337
338 let schema = schema_builder.build();
339 let fields = SchemaFields {
340 message_id,
341 from,
342 to,
343 subject,
344 body,
345 attachment_filenames,
346 header_values,
347 date,
348 };
349
350 (schema, fields)
351 }
352
353 fn extract_mail_text(
371 &self,
372 mail: &Mail,
373 ) -> (String, String, String, String, String, String, i64) {
374 let message = mail.message();
375 let headers = message.headers();
376
377 let from = headers.get_first("from").unwrap_or("").to_string();
379 let to = headers.get_first("to").unwrap_or("").to_string();
380 let subject = headers.get_first("subject").unwrap_or("").to_string();
381
382 let (body, attachment_filenames) = extract_body_and_attachments(message);
384
385 let header_values = build_header_values(headers);
387
388 let date_unix = parse_date_header(headers);
390
391 (
392 from,
393 to,
394 subject,
395 body,
396 attachment_filenames,
397 header_values,
398 date_unix,
399 )
400 }
401
402 pub fn cache(&self) -> Arc<ResultCache> {
405 self.cache.clone()
406 }
407
408 pub fn schema(&self) -> Schema {
413 self.index.schema()
414 }
415
416 pub fn search_by_query(
428 &self,
429 query: Box<dyn tantivy::query::Query>,
430 limit: usize,
431 ) -> Result<Vec<uuid::Uuid>> {
432 use tantivy::collector::TopDocs;
433 let searcher = self.reader.searcher();
434 let top_docs = searcher.search(query.as_ref(), &TopDocs::with_limit(limit))?;
435 let mut results = Vec::with_capacity(top_docs.len());
436 for (_score, addr) in top_docs {
437 let doc: TantivyDocument = searcher.doc(addr)?;
438 if let Some(v) = doc.get_first(self.schema_fields.message_id) {
439 if let Some(s) = v.as_str() {
440 if let Ok(uuid) = s.parse::<uuid::Uuid>() {
441 results.push(uuid);
442 }
443 }
444 }
445 }
446 Ok(results)
447 }
448
449 pub fn search_imap(
455 &self,
456 query: &query_translator::SearchQuery,
457 limit: usize,
458 ) -> Result<Vec<uuid::Uuid>> {
459 let schema = self.schema();
460 let tantivy_query = query_translator::search_query_to_tantivy(query, &schema);
461 self.search_by_query(tantivy_query, limit)
462 }
463
464 pub fn search_jmap(
471 &self,
472 filter: &query_translator::JmapSearchFilter,
473 limit: usize,
474 ) -> Result<Vec<uuid::Uuid>> {
475 let schema = self.schema();
476 let tantivy_query = query_translator::jmap_filter_to_tantivy(filter, &schema);
477 self.search_by_query(tantivy_query, limit)
478 }
479
480 pub fn index_size_bytes(&self) -> u64 {
485 let mut total: u64 = 0;
486 for entry in walkdir::WalkDir::new(&self.index_path)
487 .follow_links(false)
488 .into_iter()
489 .filter_map(std::result::Result::ok)
490 {
491 if entry.file_type().is_file() {
492 if let Ok(meta) = entry.metadata() {
493 total = total.saturating_add(meta.len());
494 }
495 }
496 }
497 total
498 }
499
500 async fn truncate(&self) -> Result<()> {
502 {
503 let mut writer = self.writer.lock().map_err(|e| {
504 SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
505 "Writer mutex poisoned: {e}"
506 )))
507 })?;
508 writer.delete_all_documents()?;
509 writer.commit()?;
510 }
511 self.reader.reload()?;
512 self.cache.invalidate_all();
513 Ok(())
514 }
515
516 pub async fn rebuild(&self, store: &dyn StorageBackend) -> Result<(usize, Duration)> {
526 const BATCH_SIZE: usize = 1000;
527
528 let started = Instant::now();
529 self.truncate().await?;
530
531 let mailbox_store = store.mailbox_store();
532 let message_store = store.message_store();
533
534 let users = store
535 .list_all_users()
536 .await
537 .map_err(|e| SearchError::Storage(format!("list_all_users failed: {e}")))?;
538
539 let mut indexed = 0usize;
540 let mut since_commit = 0usize;
541
542 for user in users {
543 let mailboxes = mailbox_store
544 .list_mailboxes(&user)
545 .await
546 .map_err(|e| SearchError::Storage(format!("list_mailboxes failed: {e}")))?;
547 for mailbox in mailboxes {
548 let messages = message_store
549 .get_mailbox_messages(mailbox.id())
550 .await
551 .map_err(|e| {
552 SearchError::Storage(format!("get_mailbox_messages failed: {e}"))
553 })?;
554 for metadata in messages {
555 let mail = match message_store.get_message(metadata.message_id()).await {
556 Ok(Some(m)) => m,
557 Ok(None) => {
558 tracing::debug!(
559 "rebuild: message {} not retrievable, skipping",
560 metadata.message_id()
561 );
562 continue;
563 }
564 Err(e) => {
565 tracing::warn!(
566 "rebuild: get_message({}) failed: {}",
567 metadata.message_id(),
568 e
569 );
570 continue;
571 }
572 };
573 self.add_document_no_invalidate(metadata.message_id(), &mail)?;
574 indexed += 1;
575 since_commit += 1;
576 if since_commit >= BATCH_SIZE {
577 self.commit_writer().await?;
578 since_commit = 0;
579 }
580 }
581 }
582 }
583
584 if since_commit > 0 {
585 self.commit_writer().await?;
586 }
587 self.reader.reload()?;
589 self.cache.invalidate_all();
591
592 Ok((indexed, started.elapsed()))
593 }
594
595 #[doc(hidden)]
600 pub fn add_document_for_test(&self, message_id: &MessageId, mail: &Mail) -> Result<()> {
601 self.add_document_no_invalidate(message_id, mail)
602 }
603
604 fn add_document_no_invalidate(&self, message_id: &MessageId, mail: &Mail) -> Result<()> {
608 let (from, to, subject, body, attachment_filenames, header_values, date_unix) =
609 self.extract_mail_text(mail);
610 let mut doc = TantivyDocument::new();
611 doc.add_text(self.schema_fields.message_id, message_id.to_string());
612 doc.add_text(self.schema_fields.from, from);
613 doc.add_text(self.schema_fields.to, to);
614 doc.add_text(self.schema_fields.subject, subject);
615 doc.add_text(self.schema_fields.body, body);
616 doc.add_text(
617 self.schema_fields.attachment_filenames,
618 attachment_filenames,
619 );
620 doc.add_text(self.schema_fields.header_values, header_values);
621 doc.add_i64(self.schema_fields.date, date_unix);
622 let writer = self.writer.lock().map_err(|e| {
623 SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
624 "Writer mutex poisoned: {e}"
625 )))
626 })?;
627 let term =
629 tantivy::Term::from_field_text(self.schema_fields.message_id, &message_id.to_string());
630 writer.delete_term(term);
631 writer.add_document(doc)?;
632 Ok(())
633 }
634
635 async fn commit_writer(&self) -> Result<()> {
637 let mut writer = self.writer.lock().map_err(|e| {
638 SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
639 "Writer mutex poisoned: {e}"
640 )))
641 })?;
642 writer.commit()?;
643 self.reader.reload()?;
644 Ok(())
645 }
646}
647
648#[async_trait]
649impl SearchIndex for TantivySearchIndex {
650 async fn index_message(&self, message_id: &MessageId, mail: &Mail) -> Result<()> {
651 self.add_document_no_invalidate(message_id, mail)?;
652 self.cache.invalidate_all();
654 Ok(())
655 }
656
657 async fn delete_message(&self, message_id: &MessageId) -> Result<()> {
658 let writer = self.writer.lock().map_err(|e| {
659 SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
660 "Writer mutex poisoned: {e}"
661 )))
662 })?;
663 let term =
664 tantivy::Term::from_field_text(self.schema_fields.message_id, &message_id.to_string());
665 writer.delete_term(term);
666 drop(writer);
667 self.cache.invalidate_all();
668 Ok(())
669 }
670
671 async fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>> {
672 let key = ResultCache::make_key(query, None);
673 if let Some(ids) = self.cache.get(&key) {
674 return Ok(ids
677 .into_iter()
678 .map(|m| SearchResult {
679 message_uuid: *m.as_uuid(),
680 score: 0.0,
681 })
682 .collect());
683 }
684
685 let searcher = self.reader.searcher();
686
687 let query_parser = QueryParser::for_index(
688 &self.index,
689 vec![
690 self.schema_fields.from,
691 self.schema_fields.to,
692 self.schema_fields.subject,
693 self.schema_fields.body,
694 self.schema_fields.attachment_filenames,
695 self.schema_fields.header_values,
696 ],
697 );
698
699 let parsed = query_parser.parse_query(query)?;
700 let top_docs = searcher.search(&parsed, &TopDocs::with_limit(limit))?;
701
702 let mut results = Vec::new();
703 let mut ids_for_cache = Vec::new();
704 for (score, doc_address) in top_docs {
705 let retrieved_doc: TantivyDocument = searcher.doc(doc_address)?;
706
707 if let Some(message_id_value) = retrieved_doc.get_first(self.schema_fields.message_id) {
708 if let Some(message_id_str) = message_id_value.as_str() {
709 if let Ok(uuid) = message_id_str.parse::<Uuid>() {
710 results.push(SearchResult {
711 message_uuid: uuid,
712 score,
713 });
714 ids_for_cache.push(MessageId::from_uuid(uuid));
715 }
716 }
717 }
718 }
719
720 self.cache.put(key, ids_for_cache);
721
722 Ok(results)
723 }
724
725 async fn commit(&self) -> Result<()> {
726 let mut writer = self.writer.lock().map_err(|e| {
727 SearchError::Tantivy(tantivy::TantivyError::SystemError(format!(
728 "Writer mutex poisoned: {e}"
729 )))
730 })?;
731 writer.commit()?;
732 self.reader.reload()?;
733 Ok(())
734 }
735}
736
737fn schema_version_matches(dir: &Path) -> bool {
743 let path = dir.join(SCHEMA_VERSION_FILE);
744 match std::fs::read_to_string(&path) {
745 Ok(contents) => contents
746 .trim()
747 .parse::<u32>()
748 .map(|v| v == SCHEMA_VERSION)
749 .unwrap_or(false),
750 Err(_) => false,
751 }
752}
753
754fn write_schema_version(dir: &Path) -> Result<()> {
756 let path = dir.join(SCHEMA_VERSION_FILE);
757 std::fs::write(path, SCHEMA_VERSION.to_string()).map_err(SearchError::Io)
758}
759
760fn purge_index_dir(dir: &Path) -> Result<()> {
763 if !dir.exists() {
764 return Ok(());
765 }
766 for entry in std::fs::read_dir(dir)? {
767 let entry = entry?;
768 let path = entry.path();
769 if path.is_dir() {
770 std::fs::remove_dir_all(&path)?;
771 } else {
772 std::fs::remove_file(&path)?;
773 }
774 }
775 Ok(())
776}
777
778fn extract_body_and_attachments(message: &rusmes_proto::MimeMessage) -> (String, String) {
791 use rusmes_proto::mime::{split_multipart, ContentType};
792
793 let small_body_str = |msg: &rusmes_proto::MimeMessage| -> String {
796 match msg.body() {
797 rusmes_proto::MessageBody::Small(b) => String::from_utf8_lossy(b).into_owned(),
798 rusmes_proto::MessageBody::Large(_) => String::new(),
799 }
800 };
801
802 let ct = match message.content_type() {
804 Ok(Some(ct)) => ct,
805 _ => {
806 return (small_body_str(message), String::new());
809 }
810 };
811
812 if ct.is_multipart() {
813 let boundary = match ct.boundary() {
815 Some(b) => b.to_owned(),
816 None => {
817 return (small_body_str(message), String::new());
818 }
819 };
820
821 let raw_body: Vec<u8> = match message.body() {
823 rusmes_proto::MessageBody::Small(b) => b.to_vec(),
824 rusmes_proto::MessageBody::Large(_) => {
825 return (String::new(), String::new());
826 }
827 };
828
829 let parts = match split_multipart(&raw_body, &boundary) {
830 Ok(p) => p,
831 Err(_) => {
832 return (small_body_str(message), String::new());
833 }
834 };
835
836 let mut plain_body: Option<String> = None;
837 let mut html_body: Option<String> = None;
838 let mut attachment_filenames: Vec<String> = Vec::new();
839
840 for part in &parts {
841 let part_ct = part
842 .content_type()
843 .ok()
844 .flatten()
845 .unwrap_or_else(|| ContentType {
846 main_type: "text".to_string(),
847 sub_type: "plain".to_string(),
848 parameters: std::collections::HashMap::new(),
849 });
850
851 let disposition = part
852 .headers
853 .get("content-disposition")
854 .map(|s| s.to_lowercase())
855 .unwrap_or_default();
856
857 let is_attachment = disposition.starts_with("attachment");
859 let is_inline = disposition.starts_with("inline");
860
861 if is_attachment || (!is_inline && !is_body_part(&part_ct)) {
863 if let Some(fname) = extract_disposition_filename(&disposition) {
865 attachment_filenames.push(fname);
866 } else if let Some(fname) = part_ct.parameters.get("name") {
867 attachment_filenames.push(strip_rfc2047(fname));
868 }
869 }
870
871 if is_attachment {
872 continue;
874 }
875
876 match (part_ct.main_type.as_str(), part_ct.sub_type.as_str()) {
877 ("text", "plain") if plain_body.is_none() => {
878 if let Ok(decoded) = part.decode_body() {
879 plain_body = Some(String::from_utf8_lossy(&decoded).into_owned());
880 }
881 }
882 ("text", "html") if html_body.is_none() && plain_body.is_none() => {
883 if let Ok(decoded) = part.decode_body() {
884 html_body = Some(html_bytes_to_text(&decoded));
885 }
886 }
887 ("multipart", _) => {
888 let sub_bytes = rebuild_part_bytes(part);
891 if let Ok(sub_msg) = rusmes_proto::MimeMessage::parse_from_bytes(&sub_bytes) {
892 let (sub_body, sub_filenames) = extract_body_and_attachments(&sub_msg);
893 if !sub_body.is_empty() && plain_body.is_none() && html_body.is_none() {
894 plain_body = Some(sub_body);
895 }
896 if !sub_filenames.is_empty() {
897 attachment_filenames.push(sub_filenames);
898 }
899 }
900 }
901 _ => {}
902 }
903 }
904
905 let body = plain_body.or(html_body).unwrap_or_default();
906 (body, attachment_filenames.join(" "))
907 } else if ct.main_type == "text" && ct.sub_type == "html" {
908 match message.body() {
910 rusmes_proto::MessageBody::Small(bytes) => {
911 let encoding = message.content_transfer_encoding();
912 let decoded = match encoding {
913 rusmes_proto::ContentTransferEncoding::Base64 => {
914 rusmes_proto::mime::decode_base64(bytes).unwrap_or_default()
915 }
916 rusmes_proto::ContentTransferEncoding::QuotedPrintable => {
917 rusmes_proto::mime::decode_quoted_printable(bytes).unwrap_or_default()
918 }
919 _ => bytes.to_vec(),
920 };
921 let text = html_bytes_to_text(&decoded);
922 (text, String::new())
923 }
924 rusmes_proto::MessageBody::Large(_) => (String::new(), String::new()),
925 }
926 } else if ct.main_type == "text" {
927 match message.body() {
929 rusmes_proto::MessageBody::Small(bytes) => {
930 let encoding = message.content_transfer_encoding();
931 let decoded = match encoding {
932 rusmes_proto::ContentTransferEncoding::Base64 => {
933 rusmes_proto::mime::decode_base64(bytes).unwrap_or_default()
934 }
935 rusmes_proto::ContentTransferEncoding::QuotedPrintable => {
936 rusmes_proto::mime::decode_quoted_printable(bytes).unwrap_or_default()
937 }
938 _ => bytes.to_vec(),
939 };
940 (
941 String::from_utf8_lossy(&decoded).into_owned(),
942 String::new(),
943 )
944 }
945 rusmes_proto::MessageBody::Large(_) => (String::new(), String::new()),
946 }
947 } else {
948 (String::new(), String::new())
950 }
951}
952
953fn is_body_part(ct: &rusmes_proto::mime::ContentType) -> bool {
955 matches!(
956 (ct.main_type.as_str(), ct.sub_type.as_str()),
957 ("text", "plain") | ("text", "html") | ("multipart", _)
958 )
959}
960
961fn extract_disposition_filename(disposition: &str) -> Option<String> {
966 for segment in disposition.split(';') {
968 let seg = segment.trim();
969 if let Some(rest) = seg.strip_prefix("filename=") {
970 let value = rest.trim().trim_matches('"');
971 if !value.is_empty() {
972 return Some(strip_rfc2047(value));
973 }
974 }
975 if let Some(rest) = seg.strip_prefix("filename*=") {
977 let value = rest.trim();
978 let fname = value.split("''").last().unwrap_or(value);
980 let fname = fname.trim_matches('"');
981 if !fname.is_empty() {
982 return Some(strip_rfc2047(fname));
983 }
984 }
985 }
986 None
987}
988
989fn html_bytes_to_text(html: &[u8]) -> String {
995 match html2text::from_read(html, 1_000_000) {
996 Ok(text) => text,
997 Err(_) => String::from_utf8_lossy(html).into_owned(),
998 }
999}
1000
1001fn rebuild_part_bytes(part: &rusmes_proto::mime::MimePart) -> Vec<u8> {
1004 let mut out = Vec::new();
1005 for (name, value) in &part.headers {
1006 out.extend_from_slice(name.as_bytes());
1007 out.extend_from_slice(b": ");
1008 out.extend_from_slice(value.as_bytes());
1009 out.extend_from_slice(b"\r\n");
1010 }
1011 out.extend_from_slice(b"\r\n");
1012 out.extend_from_slice(&part.body);
1013 out
1014}
1015
1016fn build_header_values(headers: &rusmes_proto::HeaderMap) -> String {
1025 const HEADER_NAMES: &[&str] = &[
1026 "subject",
1027 "from",
1028 "to",
1029 "cc",
1030 "bcc",
1031 "reply-to",
1032 "message-id",
1033 ];
1034
1035 let mut parts: Vec<String> = Vec::new();
1036 for name in HEADER_NAMES {
1037 if let Some(values) = headers.get(name) {
1038 for value in values {
1039 let normalised = strip_rfc2047(value.trim());
1040 let normalised = normalise_whitespace(&normalised);
1041 if !normalised.is_empty() {
1042 parts.push(normalised);
1043 }
1044 }
1045 }
1046 }
1047 parts.join(" ")
1048}
1049
1050fn strip_rfc2047(input: &str) -> String {
1057 let mut result = String::with_capacity(input.len());
1059 let mut remaining = input;
1060
1061 while let Some(start) = remaining.find("=?") {
1062 result.push_str(&remaining[..start]);
1064
1065 let after_start = &remaining[start + 2..];
1066 if let Some(end_offset) = after_start.find("?=") {
1067 result.push(' ');
1069 remaining = &after_start[end_offset + 2..];
1070 } else {
1071 result.push_str(&remaining[start..]);
1073 remaining = "";
1074 break;
1075 }
1076 }
1077 result.push_str(remaining);
1078 result
1079}
1080
1081fn normalise_whitespace(input: &str) -> String {
1084 let mut out = String::with_capacity(input.len());
1085 let mut last_was_space = true; for ch in input.chars() {
1087 if ch.is_whitespace() {
1088 if !last_was_space {
1089 out.push(' ');
1090 }
1091 last_was_space = true;
1092 } else {
1093 out.push(ch);
1094 last_was_space = false;
1095 }
1096 }
1097 if out.ends_with(' ') {
1099 out.pop();
1100 }
1101 out
1102}
1103
1104fn parse_date_header(headers: &rusmes_proto::HeaderMap) -> i64 {
1111 let date_str = match headers.get_first("date") {
1112 Some(d) => d.trim(),
1113 None => return 0,
1114 };
1115
1116 match chrono::DateTime::parse_from_rfc2822(date_str) {
1118 Ok(dt) => dt.timestamp(),
1119 Err(_) => 0,
1120 }
1121}
1122
1123pub fn spawn_reindex_worker(
1133 idx: Arc<TantivySearchIndex>,
1134 store: Arc<dyn StorageBackend>,
1135 schedule: Duration,
1136) -> JoinHandle<()> {
1137 tokio::spawn(async move {
1138 if schedule.is_zero() {
1139 tracing::debug!("reindex worker: schedule is zero, exiting (manual-only mode)");
1140 return;
1141 }
1142 let mut interval = tokio::time::interval(schedule);
1143 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1145 loop {
1146 interval.tick().await;
1147 match idx.rebuild(store.as_ref()).await {
1148 Ok((n, elapsed)) => {
1149 tracing::info!("reindex worker: rebuilt {} messages in {:?}", n, elapsed);
1150 }
1151 Err(e) => {
1152 tracing::warn!("reindex worker: rebuild failed: {}", e);
1153 }
1154 }
1155 }
1156 })
1157}
1158
1159pub fn spawn_incremental_indexer(
1168 idx: Arc<TantivySearchIndex>,
1169 store: Arc<dyn StorageBackend>,
1170) -> JoinHandle<()> {
1171 spawn_incremental_indexer_with_config(idx, store, IncrementalConfig::default())
1172}
1173
1174#[derive(Debug, Clone)]
1176pub struct IncrementalConfig {
1177 pub commit_every_n: usize,
1179 pub commit_every: Duration,
1182}
1183
1184impl Default for IncrementalConfig {
1185 fn default() -> Self {
1186 Self {
1187 commit_every_n: 100,
1188 commit_every: Duration::from_secs(5),
1189 }
1190 }
1191}
1192
1193pub fn spawn_incremental_indexer_with_config(
1195 idx: Arc<TantivySearchIndex>,
1196 store: Arc<dyn StorageBackend>,
1197 cfg: IncrementalConfig,
1198) -> JoinHandle<()> {
1199 tokio::spawn(async move {
1200 let mut rx = store.event_stream();
1201 let mut pending: usize = 0;
1202 let mut last_commit = Instant::now();
1203 let tick = if cfg.commit_every.is_zero() {
1204 Duration::from_millis(100)
1205 } else {
1206 cfg.commit_every
1207 };
1208 let mut commit_timer = tokio::time::interval(tick);
1209 commit_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1210
1211 loop {
1212 tokio::select! {
1213 event = rx.recv() => {
1214 match event {
1215 Ok(StorageEvent::MessageStored { account, mailbox, uid }) => {
1216 match handle_stored(&idx, store.as_ref(), &account, &mailbox, uid).await {
1217 Ok(true) => pending += 1,
1218 Ok(false) => {
1219 tracing::debug!(
1220 "incremental indexer: stored event for {}/{}/uid={} produced no document",
1221 account, mailbox, uid
1222 );
1223 }
1224 Err(e) => {
1225 tracing::warn!(
1226 "incremental indexer: failed to index stored {}/{}/uid={}: {}",
1227 account, mailbox, uid, e
1228 );
1229 }
1230 }
1231 }
1232 Ok(StorageEvent::MessageExpunged { account, mailbox, uid }) => {
1233 match handle_expunged(&idx, store.as_ref(), &account, &mailbox, uid).await {
1234 Ok(true) => pending += 1,
1235 Ok(false) => {
1236 tracing::debug!(
1237 "incremental indexer: expunge event for {}/{}/uid={} matched no message",
1238 account, mailbox, uid
1239 );
1240 }
1241 Err(e) => {
1242 tracing::warn!(
1243 "incremental indexer: failed to expunge {}/{}/uid={}: {}",
1244 account, mailbox, uid, e
1245 );
1246 }
1247 }
1248 }
1249 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1250 tracing::warn!(
1251 "incremental indexer: lagged behind {} events; consider a full rebuild",
1252 skipped
1253 );
1254 }
1255 Err(broadcast::error::RecvError::Closed) => {
1256 tracing::info!("incremental indexer: storage event stream closed; exiting");
1257 if pending > 0 {
1259 if let Err(e) = idx.commit_writer().await {
1260 tracing::warn!(
1261 "incremental indexer: final commit failed: {}",
1262 e
1263 );
1264 }
1265 }
1266 return;
1267 }
1268 }
1269 }
1270 _ = commit_timer.tick() => {
1271 if pending > 0 && last_commit.elapsed() >= cfg.commit_every {
1272 if let Err(e) = idx.commit_writer().await {
1273 tracing::warn!("incremental indexer: timer commit failed: {}", e);
1274 } else {
1275 pending = 0;
1276 last_commit = Instant::now();
1277 }
1278 }
1279 }
1280 }
1281
1282 if pending >= cfg.commit_every_n {
1283 if let Err(e) = idx.commit_writer().await {
1284 tracing::warn!("incremental indexer: batch commit failed: {}", e);
1285 } else {
1286 pending = 0;
1287 last_commit = Instant::now();
1288 }
1289 }
1290 }
1291 })
1292}
1293
1294async fn resolve_event_message_id(
1299 store: &dyn StorageBackend,
1300 account: &str,
1301 mailbox: &str,
1302 uid: u32,
1303) -> Result<Option<MessageId>> {
1304 if account.is_empty() || mailbox.is_empty() {
1305 return Ok(None);
1308 }
1309 let user = match rusmes_proto::Username::from_str(account) {
1310 Ok(u) => u,
1311 Err(e) => {
1312 tracing::debug!(
1313 "resolve: invalid username '{}' in storage event: {}",
1314 account,
1315 e
1316 );
1317 return Ok(None);
1318 }
1319 };
1320 let mailbox_store = store.mailbox_store();
1321 let message_store = store.message_store();
1322 let mailboxes = mailbox_store
1323 .list_mailboxes(&user)
1324 .await
1325 .map_err(|e| SearchError::Storage(format!("list_mailboxes failed: {e}")))?;
1326 let mailbox_id = match mailboxes
1327 .iter()
1328 .find(|m| m.path().name().map(|n| n == mailbox).unwrap_or(false))
1329 .map(|m| *m.id())
1330 {
1331 Some(id) => id,
1332 None => return Ok(None),
1333 };
1334 let metas = message_store
1335 .get_mailbox_messages(&mailbox_id)
1336 .await
1337 .map_err(|e| SearchError::Storage(format!("get_mailbox_messages failed: {e}")))?;
1338 Ok(metas
1339 .into_iter()
1340 .find(|md| md.uid() == uid)
1341 .map(|md| *md.message_id()))
1342}
1343
1344async fn handle_stored(
1345 idx: &Arc<TantivySearchIndex>,
1346 store: &dyn StorageBackend,
1347 account: &str,
1348 mailbox: &str,
1349 uid: u32,
1350) -> Result<bool> {
1351 let message_id = match resolve_event_message_id(store, account, mailbox, uid).await? {
1352 Some(id) => id,
1353 None => return Ok(false),
1354 };
1355 let message_store = store.message_store();
1356 let mail = match message_store
1357 .get_message(&message_id)
1358 .await
1359 .map_err(|e| SearchError::Storage(format!("get_message failed: {e}")))?
1360 {
1361 Some(m) => m,
1362 None => return Ok(false),
1363 };
1364 idx.add_document_no_invalidate(&message_id, &mail)?;
1365 idx.cache.invalidate_all();
1366 Ok(true)
1367}
1368
1369async fn handle_expunged(
1370 idx: &Arc<TantivySearchIndex>,
1371 store: &dyn StorageBackend,
1372 account: &str,
1373 mailbox: &str,
1374 uid: u32,
1375) -> Result<bool> {
1376 let message_id = match resolve_event_message_id(store, account, mailbox, uid).await? {
1377 Some(id) => id,
1378 None => return Ok(false),
1379 };
1380 idx.delete_message(&message_id).await?;
1381 Ok(true)
1382}
1383
1384#[cfg(test)]
1387mod tests {
1388 use super::*;
1389 use bytes::Bytes;
1390 use rusmes_proto::mail::Mail;
1391 use rusmes_proto::message::{HeaderMap, MessageBody, MessageId, MimeMessage};
1392
1393 fn make_mail(raw_message: &str) -> (MessageId, Mail) {
1395 let message_id = MessageId::new();
1396 let data = raw_message.as_bytes();
1397 let message = MimeMessage::parse_from_bytes(data).unwrap_or_else(|_| {
1398 let mut hdr = HeaderMap::new();
1400 hdr.insert("content-type", "text/plain");
1401 MimeMessage::new(hdr, MessageBody::Small(Bytes::from(raw_message.to_owned())))
1402 });
1403 let mail = Mail::new(None, vec![], message, None, None);
1404 (message_id, mail)
1405 }
1406
1407 fn make_index() -> (TantivySearchIndex, tempfile::TempDir) {
1409 let dir = tempfile::TempDir::new().expect("temp dir");
1410 let idx = TantivySearchIndex::new(dir.path()).expect("create index");
1411 (idx, dir)
1412 }
1413
1414 #[tokio::test]
1417 async fn test_html_only_message_indexed() {
1418 let (idx, _dir) = make_index();
1419
1420 let raw = concat!(
1422 "From: alice@example.com\r\n",
1423 "To: bob@example.com\r\n",
1424 "Subject: HTML test\r\n",
1425 "Content-Type: text/html\r\n",
1426 "\r\n",
1427 "<html><body><b>Tantalising</b> content here</body></html>",
1428 );
1429 let (mid, mail) = make_mail(raw);
1430
1431 idx.index_message(&mid, &mail).await.expect("index");
1432 idx.commit().await.expect("commit");
1433
1434 let results = idx.search("Tantalising", 10).await.expect("search");
1436 assert!(
1437 !results.is_empty(),
1438 "expected HTML body text to be indexed, got no results"
1439 );
1440 assert_eq!(results[0].message_uuid, *mid.as_uuid());
1441 }
1442
1443 #[tokio::test]
1446 async fn test_attachment_filename_indexed() {
1447 let (idx, _dir) = make_index();
1448
1449 let raw = concat!(
1450 "From: alice@example.com\r\n",
1451 "To: bob@example.com\r\n",
1452 "Subject: Attachment test\r\n",
1453 "Content-Type: multipart/mixed; boundary=\"boundary42\"\r\n",
1454 "\r\n",
1455 "--boundary42\r\n",
1456 "Content-Type: text/plain\r\n",
1457 "\r\n",
1458 "See the attached report.\r\n",
1459 "--boundary42\r\n",
1460 "Content-Type: application/pdf\r\n",
1461 "Content-Disposition: attachment; filename=\"quarterly_report.pdf\"\r\n",
1462 "\r\n",
1463 "PDFBINARYDATA\r\n",
1464 "--boundary42--\r\n",
1465 );
1466 let (mid, mail) = make_mail(raw);
1467
1468 idx.index_message(&mid, &mail).await.expect("index");
1469 idx.commit().await.expect("commit");
1470
1471 let results = idx
1473 .search("attachment_filenames:quarterly_report.pdf", 10)
1474 .await
1475 .expect("search");
1476 assert!(
1477 !results.is_empty(),
1478 "expected attachment filename to be indexed, got no results"
1479 );
1480 assert_eq!(results[0].message_uuid, *mid.as_uuid());
1481 }
1482
1483 #[tokio::test]
1486 async fn test_header_values_indexed() {
1487 let (idx, _dir) = make_index();
1488
1489 let raw = concat!(
1490 "From: alice@example.com\r\n",
1491 "To: bob@example.com\r\n",
1492 "Cc: carol@example.com\r\n",
1493 "Subject: Cc test\r\n",
1494 "Content-Type: text/plain\r\n",
1495 "\r\n",
1496 "Check the Cc header.\r\n",
1497 );
1498 let (mid, mail) = make_mail(raw);
1499
1500 idx.index_message(&mid, &mail).await.expect("index");
1501 idx.commit().await.expect("commit");
1502
1503 let results = idx.search("header_values:carol", 10).await.expect("search");
1505 assert!(
1506 !results.is_empty(),
1507 "expected Cc header to be indexed in header_values, got no results"
1508 );
1509 assert_eq!(results[0].message_uuid, *mid.as_uuid());
1510 }
1511
1512 #[tokio::test]
1515 async fn test_date_field_range_query() {
1516 let (idx, _dir) = make_index();
1517
1518 let raw = concat!(
1520 "From: alice@example.com\r\n",
1521 "To: bob@example.com\r\n",
1522 "Date: Thu, 1 Jan 2026 12:00:00 +0000\r\n",
1523 "Subject: Date test\r\n",
1524 "Content-Type: text/plain\r\n",
1525 "\r\n",
1526 "Happy new year.\r\n",
1527 );
1528 let (mid, mail) = make_mail(raw);
1529
1530 idx.index_message(&mid, &mail).await.expect("index");
1531 idx.commit().await.expect("commit");
1532
1533 use tantivy::query::RangeQuery;
1535
1536 let searcher = idx.reader.searcher();
1537 let date_field = idx.schema_fields.date;
1538
1539 let lower: i64 = 1_735_689_600;
1541 let range_query = RangeQuery::new(
1542 std::ops::Bound::Included(tantivy::Term::from_field_i64(date_field, lower)),
1543 std::ops::Bound::Unbounded,
1544 );
1545
1546 let top_docs = searcher
1547 .search(&range_query, &TopDocs::with_limit(10))
1548 .expect("range search");
1549
1550 assert!(
1551 !top_docs.is_empty(),
1552 "expected date range query to return the message"
1553 );
1554
1555 let doc: TantivyDocument = searcher.doc(top_docs[0].1).expect("fetch doc");
1557 if let Some(v) = doc.get_first(idx.schema_fields.message_id) {
1558 if let Some(s) = v.as_str() {
1559 assert_eq!(s, mid.to_string().as_str());
1560 }
1561 }
1562
1563 if let Some(date_val) = doc.get_first(date_field) {
1565 if let Some(ts) = date_val.as_i64() {
1566 assert!(
1567 ts >= lower,
1568 "stored timestamp {ts} should be >= lower bound {lower}"
1569 );
1570 }
1571 }
1572 }
1573
1574 #[test]
1577 fn test_schema_version_sentinel_written_on_new() {
1578 let dir = tempfile::TempDir::new().expect("temp dir");
1579 let _idx = TantivySearchIndex::new(dir.path()).expect("create index");
1580 assert!(
1581 schema_version_matches(dir.path()),
1582 "schema_version.txt should be written by new()"
1583 );
1584 }
1585
1586 #[test]
1587 fn test_schema_version_mismatch_triggers_rebuild() {
1588 let dir = tempfile::TempDir::new().expect("temp dir");
1589 let _idx = TantivySearchIndex::new(dir.path()).expect("create index");
1591
1592 let sidecar = dir.path().join(SCHEMA_VERSION_FILE);
1594 std::fs::write(&sidecar, "1").expect("write old version");
1595 assert!(
1596 !schema_version_matches(dir.path()),
1597 "should detect stale version"
1598 );
1599
1600 let _idx2 = TantivySearchIndex::open(dir.path()).expect("open after purge");
1602 assert!(
1603 schema_version_matches(dir.path()),
1604 "schema_version.txt should be updated after purge+recreate"
1605 );
1606 }
1607
1608 #[test]
1611 fn test_strip_rfc2047_removes_encoded_words() {
1612 let input = "=?UTF-8?Q?Hello=20World?= plain text =?ISO-8859-1?B?SGVsbG8=?=";
1613 let result = strip_rfc2047(input);
1614 assert!(result.contains("plain text"), "got: {result}");
1616 assert!(
1617 !result.contains("=?"),
1618 "encoded word not stripped: {result}"
1619 );
1620 }
1621
1622 #[test]
1625 fn test_normalise_whitespace() {
1626 assert_eq!(normalise_whitespace(" hello world "), "hello world");
1627 assert_eq!(normalise_whitespace("a\t\tb"), "a b");
1628 assert_eq!(normalise_whitespace(""), "");
1629 }
1630
1631 #[test]
1634 fn test_html_bytes_to_text_extracts_visible_text() {
1635 let html = b"<html><body><h1>Report</h1><p>Some <b>bold</b> text.</p></body></html>";
1636 let text = html_bytes_to_text(html);
1637 assert!(
1638 text.contains("Report") || text.contains("bold") || text.contains("text"),
1639 "expected visible text extraction, got: {text}"
1640 );
1641 }
1642
1643 #[test]
1646 fn test_extract_disposition_filename_quoted() {
1647 let disp = "attachment; filename=\"my report.pdf\"";
1648 let result = extract_disposition_filename(disp);
1649 assert_eq!(result.as_deref(), Some("my report.pdf"));
1650 }
1651
1652 #[test]
1653 fn test_extract_disposition_filename_unquoted() {
1654 let disp = "attachment; filename=report.csv";
1655 let result = extract_disposition_filename(disp);
1656 assert_eq!(result.as_deref(), Some("report.csv"));
1657 }
1658}