rust-memex 0.6.4

Operator CLI + MCP server: canonical corpus second: semantic index second to aicx
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
//! BM25 keyword search using Tantivy.
//!
//! Provides exact keyword matching to complement vector similarity search.
//! This helps distinguish between semantically similar but distinct terms
//! like "smutny" (sad) and "melancholijny" (melancholic).
//!
//! Lock strategy: On-demand IndexWriter acquisition/release per write batch.
//! This allows multiple processes to write sequentially without permanent lock holding.

use anyhow::{Result, anyhow};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use tantivy::{
    Index, IndexReader, TantivyDocument,
    collector::TopDocs,
    query::{AllQuery, QueryParser},
    schema::{
        Field, IndexRecordOption, STORED, STRING, Schema, TextFieldIndexing, TextOptions, Value,
    },
    tokenizer::{Language, LowerCaser, RemoveLongFilter, SimpleTokenizer, Stemmer, TextAnalyzer},
};
use tokio::sync::Mutex;

/// Supported languages for stemming
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum StemLanguage {
    #[default]
    English,
    German,
    French,
    Spanish,
    Italian,
    Portuguese,
    Russian,
    /// No stemming (for unsupported languages like Polish)
    None,
}

impl StemLanguage {
    fn to_tantivy_language(self) -> Option<Language> {
        match self {
            StemLanguage::English => Some(Language::English),
            StemLanguage::German => Some(Language::German),
            StemLanguage::French => Some(Language::French),
            StemLanguage::Spanish => Some(Language::Spanish),
            StemLanguage::Italian => Some(Language::Italian),
            StemLanguage::Portuguese => Some(Language::Portuguese),
            StemLanguage::Russian => Some(Language::Russian),
            StemLanguage::None => None,
        }
    }
}

/// Configuration for BM25 index
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BM25Config {
    /// Path to store the Tantivy index
    #[serde(default = "default_bm25_path")]
    pub index_path: String,
    /// Heap size for index writer (bytes)
    #[serde(default = "default_heap_size")]
    pub writer_heap_size: usize,
    /// Enable stemming for better recall
    #[serde(default = "default_true")]
    pub enable_stemming: bool,
    /// Language for stemming
    #[serde(default)]
    pub language: StemLanguage,
    /// Read-only mode - disables write operations entirely
    /// Use for dedicated read-only instances
    #[serde(default)]
    pub read_only: bool,
}

fn default_bm25_path() -> String {
    "~/.rmcp-servers/rust-memex/bm25".to_string()
}

fn default_heap_size() -> usize {
    50_000_000
}

fn default_true() -> bool {
    true
}

impl Default for BM25Config {
    fn default() -> Self {
        Self {
            index_path: default_bm25_path(),
            writer_heap_size: default_heap_size(),
            enable_stemming: true,
            language: StemLanguage::English,
            read_only: false,
        }
    }
}

impl BM25Config {
    /// Create config for multilingual content (no stemming)
    pub fn multilingual() -> Self {
        Self {
            language: StemLanguage::None,
            enable_stemming: false,
            ..Self::default()
        }
    }

    /// Create read-only config (disables write operations)
    pub fn read_only() -> Self {
        Self {
            read_only: true,
            ..Self::default()
        }
    }

    pub fn with_path(mut self, path: impl Into<String>) -> Self {
        self.index_path = path.into();
        self
    }

    pub fn with_read_only(mut self, read_only: bool) -> Self {
        self.read_only = read_only;
        self
    }
}

/// BM25 keyword search index using Tantivy
///
/// Uses on-demand IndexWriter acquisition: lock acquired only during writes,
/// released immediately after commit. This allows multiple processes to write
/// sequentially without permanent lock holding.
pub struct BM25Index {
    index: Index,
    reader: IndexReader,
    content_field: Field,
    id_field: Field,
    namespace_field: Field,
    /// Heap size for writer (used when acquiring on-demand)
    writer_heap_size: usize,
    /// Read-only mode flag
    read_only: bool,
    /// Mutex to serialize write operations within this process
    write_lock: Arc<Mutex<()>>,
    /// Index path for error messages
    index_path: PathBuf,
}

impl BM25Index {
    /// Create or open a BM25 index at the given path
    pub fn new(config: &BM25Config) -> Result<Self> {
        let path = crate::path_utils::sanitize_new_path(&config.index_path)?;

        // Create directory if it doesn't exist
        if !path.exists() {
            std::fs::create_dir_all(&path)?;
        }

        // Build schema with text analysis
        let mut schema_builder = Schema::builder();

        // Configure text field with proper tokenization
        let text_options = TextOptions::default()
            .set_indexing_options(
                TextFieldIndexing::default()
                    .set_tokenizer("custom_tokenizer")
                    .set_index_option(IndexRecordOption::WithFreqsAndPositions),
            )
            .set_stored();

        let content_field = schema_builder.add_text_field("content", text_options);
        let id_field = schema_builder.add_text_field("id", STRING | STORED);
        let namespace_field = schema_builder.add_text_field("namespace", STRING | STORED);

        let schema = schema_builder.build();

        // Open or create index
        let index = if path.join("meta.json").exists() {
            Index::open_in_dir(&path)?
        } else {
            Index::create_in_dir(&path, schema.clone())?
        };

        // Register custom tokenizer with optional stemming
        let tokenizer = if config.enable_stemming {
            if let Some(lang) = config.language.to_tantivy_language() {
                TextAnalyzer::builder(SimpleTokenizer::default())
                    .filter(RemoveLongFilter::limit(40))
                    .filter(LowerCaser)
                    .filter(Stemmer::new(lang))
                    .build()
            } else {
                // No stemming for unsupported languages
                TextAnalyzer::builder(SimpleTokenizer::default())
                    .filter(RemoveLongFilter::limit(40))
                    .filter(LowerCaser)
                    .build()
            }
        } else {
            TextAnalyzer::builder(SimpleTokenizer::default())
                .filter(RemoveLongFilter::limit(40))
                .filter(LowerCaser)
                .build()
        };

        index.tokenizers().register("custom_tokenizer", tokenizer);

        let reader = index.reader()?;

        if config.read_only {
            tracing::info!("BM25 index opened in READ-ONLY mode");
        } else {
            tracing::debug!("BM25 index opened (on-demand lock acquisition for writes)");
        }

        Ok(Self {
            index,
            reader,
            content_field,
            id_field,
            namespace_field,
            writer_heap_size: config.writer_heap_size,
            read_only: config.read_only,
            write_lock: Arc::new(Mutex::new(())),
            index_path: path,
        })
    }

    /// Check if index is in read-only mode
    pub fn is_read_only(&self) -> bool {
        self.read_only
    }

    /// Acquire IndexWriter, perform write operation, release lock
    ///
    /// This is the core pattern: acquire lock -> write -> commit -> drop (release)
    /// Includes retry with exponential backoff for lock contention.
    async fn with_writer<F, T>(&self, operation: F) -> Result<T>
    where
        F: FnOnce(&mut tantivy::IndexWriter) -> Result<T>,
    {
        if self.read_only {
            return Err(anyhow!("Cannot write: BM25 index is in read-only mode"));
        }

        // Serialize writes within this process
        let _guard = self.write_lock.lock().await;

        // Retry with exponential backoff for cross-process lock contention
        const MAX_RETRIES: u32 = 5;
        const INITIAL_BACKOFF_MS: u64 = 50;
        const MAX_BACKOFF_MS: u64 = 2000;

        let mut attempt = 0;
        let mut backoff_ms = INITIAL_BACKOFF_MS;

        let mut writer = loop {
            match self.index.writer(self.writer_heap_size) {
                Ok(w) => break w,
                Err(e) => {
                    let is_lock_busy = e.to_string().contains("LockBusy");

                    if is_lock_busy && attempt < MAX_RETRIES {
                        attempt += 1;
                        tracing::debug!(
                            "BM25 lock busy, retry {}/{} in {}ms. Path: {:?}",
                            attempt,
                            MAX_RETRIES,
                            backoff_ms,
                            self.index_path
                        );
                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff_ms)).await;
                        backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
                    } else if is_lock_busy {
                        return Err(anyhow!(
                            "BM25 index locked after {} retries. Path: {:?}. \
                             Multiple processes writing simultaneously - try again.",
                            MAX_RETRIES,
                            self.index_path
                        ));
                    } else {
                        return Err(anyhow!("Failed to acquire BM25 writer: {}", e));
                    }
                }
            }
        };

        // Perform the write operation
        let result = operation(&mut writer)?;

        // Commit changes
        writer.commit()?;

        // Writer dropped here -> Tantivy lock released
        drop(writer);

        // Reload reader to see new data
        self.reader.reload()?;

        Ok(result)
    }

    /// Add documents to the BM25 index
    ///
    /// Lock is acquired only for the duration of this operation.
    ///
    /// # Arguments
    /// * `docs` - List of (id, namespace, content) tuples
    ///
    /// # Errors
    /// Returns error if index is in read-only mode or another process holds the lock
    pub async fn add_documents(&self, docs: &[(String, String, String)]) -> Result<()> {
        let content_field = self.content_field;
        let id_field = self.id_field;
        let namespace_field = self.namespace_field;
        let doc_count = docs.len();

        // Clone docs for the closure (needed because closure must be 'static for FnOnce)
        let docs = docs.to_vec();

        self.with_writer(move |writer| {
            for (id, namespace, content) in &docs {
                let mut doc = TantivyDocument::new();
                doc.add_text(content_field, content);
                doc.add_text(id_field, id);
                doc.add_text(namespace_field, namespace);
                writer.add_document(doc)?;
            }
            Ok(())
        })
        .await?;

        tracing::debug!("Added {} documents to BM25 index", doc_count);
        Ok(())
    }

    /// Search the BM25 index
    ///
    /// # Arguments
    /// * `query` - Search query string
    /// * `namespace` - Optional namespace filter
    /// * `limit` - Maximum number of results
    ///
    /// # Returns
    /// Vector of (document_id, namespace, score) tuples, sorted by score descending
    pub fn search(
        &self,
        query: &str,
        namespace: Option<&str>,
        limit: usize,
    ) -> Result<Vec<(String, String, f32)>> {
        let searcher = self.reader.searcher();

        // Build query - search in content field
        let query_parser = QueryParser::for_index(&self.index, vec![self.content_field]);

        // Escape special characters and parse query
        let escaped_query = Self::escape_query(query);
        let parsed_query = query_parser
            .parse_query(&escaped_query)
            .map_err(|e| anyhow!("Query parse error: {}", e))?;

        // Execute search
        let top_docs = searcher.search(&parsed_query, &TopDocs::with_limit(limit * 2))?;

        let mut results = Vec::with_capacity(limit);

        for (score, doc_address) in top_docs {
            let doc: TantivyDocument = searcher.doc(doc_address)?;

            // Get document ID and namespace using stored fields.
            let id = doc
                .get_first(self.id_field)
                .and_then(|v| Value::as_str(&v).map(|s| s.to_string()))
                .ok_or_else(|| anyhow!("Document missing ID field"))?;
            let doc_namespace = doc
                .get_first(self.namespace_field)
                .and_then(|v| Value::as_str(&v).map(|s| s.to_string()))
                .ok_or_else(|| anyhow!("Document missing namespace field"))?;

            // Filter by namespace if specified
            if let Some(ns) = namespace
                && doc_namespace != ns
            {
                continue;
            }

            results.push((id, doc_namespace, score));

            if results.len() >= limit {
                break;
            }
        }

        tracing::debug!("BM25 search '{}' returned {} results", query, results.len());

        Ok(results)
    }

    /// Delete documents by ID
    ///
    /// Lock is acquired only for the duration of this operation.
    ///
    /// # Errors
    /// Returns error if index is in read-only mode or another process holds the lock
    pub async fn delete_documents(&self, ids: &[String]) -> Result<usize> {
        let id_field = self.id_field;
        let ids = ids.to_vec();
        let count = ids.len();

        self.with_writer(move |writer| {
            for id in &ids {
                let term = tantivy::Term::from_field_text(id_field, id);
                writer.delete_term(term);
            }
            Ok(count)
        })
        .await
    }

    /// Delete all documents in a namespace
    ///
    /// Lock is acquired only for the duration of this operation.
    ///
    /// # Errors
    /// Returns error if index is in read-only mode or another process holds the lock
    pub async fn delete_namespace_term(&self, namespace: &str) -> Result<usize> {
        let namespace_field = self.namespace_field;
        let namespace_owned = namespace.to_string();
        let namespace_log = namespace.to_string();

        self.with_writer(move |writer| {
            let term = tantivy::Term::from_field_text(namespace_field, &namespace_owned);
            writer.delete_term(term);
            Ok(1) // Tantivy doesn't return exact count for term deletes
        })
        .await?;

        tracing::info!("Purged namespace '{}' from BM25 index", namespace_log);
        Ok(1)
    }

    /// Escape special query characters
    fn escape_query(query: &str) -> String {
        // Tantivy query syntax special characters
        let special_chars = [
            '+', '-', '&', '|', '!', '(', ')', '{', '}', '[', ']', '^', '"', '~', '*', '?', ':',
            '\\', '/',
        ];

        let mut escaped = String::with_capacity(query.len() * 2);
        for c in query.chars() {
            if special_chars.contains(&c) {
                escaped.push('\\');
            }
            escaped.push(c);
        }
        escaped
    }

    /// Get document count in index
    pub fn doc_count(&self) -> u64 {
        let searcher = self.reader.searcher();
        searcher.num_docs()
    }

    /// Return the stored `(namespace, id)` keys currently present in the index.
    ///
    /// This is intentionally an operator/debug surface used by recovery checks,
    /// not a hot-path query primitive.
    pub fn document_keys(&self, namespace: Option<&str>) -> Result<HashSet<(String, String)>> {
        let searcher = self.reader.searcher();
        let total = usize::try_from(searcher.num_docs()).unwrap_or(usize::MAX);
        if total == 0 {
            return Ok(HashSet::new());
        }

        let all_query = AllQuery;
        let top_docs = searcher.search(&all_query, &TopDocs::with_limit(total))?;
        let mut keys = HashSet::with_capacity(total);

        for (_score, doc_address) in top_docs {
            let doc: TantivyDocument = searcher.doc(doc_address)?;
            let id = doc
                .get_first(self.id_field)
                .and_then(|value| Value::as_str(&value).map(|value| value.to_string()))
                .ok_or_else(|| anyhow!("Document missing ID field"))?;
            let doc_namespace = doc
                .get_first(self.namespace_field)
                .and_then(|value| Value::as_str(&value).map(|value| value.to_string()))
                .ok_or_else(|| anyhow!("Document missing namespace field"))?;

            if namespace.is_none_or(|expected| expected == doc_namespace) {
                keys.insert((doc_namespace, id));
            }
        }

        Ok(keys)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::TempDir;

    #[tokio::test]
    async fn test_bm25_basic() {
        let temp_dir = TempDir::new().unwrap();
        let config = BM25Config::default().with_path(temp_dir.path().to_str().unwrap());

        let index = BM25Index::new(&config).unwrap();

        // Add some documents
        let docs = vec![
            (
                "doc1".to_string(),
                "test".to_string(),
                "The quick brown fox jumps over the lazy dog".to_string(),
            ),
            (
                "doc2".to_string(),
                "test".to_string(),
                "A quick brown dog runs in the park".to_string(),
            ),
            (
                "doc3".to_string(),
                "test".to_string(),
                "The lazy cat sleeps all day".to_string(),
            ),
        ];

        index.add_documents(&docs).await.unwrap();

        // Search
        let results = index.search("quick brown", None, 10).unwrap();

        assert_eq!(results.len(), 2);
        // doc1 and doc2 should match, doc3 should not
        let ids: Vec<&str> = results.iter().map(|(id, _, _)| id.as_str()).collect();
        assert!(ids.contains(&"doc1"));
        assert!(ids.contains(&"doc2"));
    }

    #[tokio::test]
    async fn test_bm25_namespace_filter() {
        let temp_dir = TempDir::new().unwrap();
        let config = BM25Config::default().with_path(temp_dir.path().to_str().unwrap());

        let index = BM25Index::new(&config).unwrap();

        let docs = vec![
            (
                "doc1".to_string(),
                "ns1".to_string(),
                "hello world".to_string(),
            ),
            (
                "doc2".to_string(),
                "ns2".to_string(),
                "hello universe".to_string(),
            ),
        ];

        index.add_documents(&docs).await.unwrap();

        // Search with namespace filter
        let results = index.search("hello", Some("ns1"), 10).unwrap();
        assert_eq!(results.len(), 1);
        assert_eq!(results[0].0, "doc1");
        assert_eq!(results[0].1, "ns1");
    }

    #[tokio::test]
    async fn test_bm25_delete_documents_removes_exact_id_matches() {
        let temp_dir = TempDir::new().unwrap();
        let config = BM25Config::default().with_path(temp_dir.path().to_str().unwrap());

        let index = BM25Index::new(&config).unwrap();

        let docs = vec![
            (
                "doc1".to_string(),
                "team:alpha".to_string(),
                "shared search term".to_string(),
            ),
            (
                "doc2".to_string(),
                "team:alpha".to_string(),
                "shared search term".to_string(),
            ),
        ];

        index.add_documents(&docs).await.unwrap();
        assert_eq!(index.search("shared", None, 10).unwrap().len(), 2);

        let deleted = index.delete_documents(&["doc1".to_string()]).await.unwrap();
        assert_eq!(deleted, 1);

        let results = index.search("shared", None, 10).unwrap();
        assert_eq!(results.len(), 1);
        assert_eq!(results[0].0, "doc2");
    }

    #[tokio::test]
    async fn test_bm25_purge_namespace_matches_exact_string() {
        let temp_dir = TempDir::new().unwrap();
        let config = BM25Config::default().with_path(temp_dir.path().to_str().unwrap());

        let index = BM25Index::new(&config).unwrap();

        let docs = vec![
            (
                "doc1".to_string(),
                "team:alpha".to_string(),
                "shared search term".to_string(),
            ),
            (
                "doc2".to_string(),
                "team:beta".to_string(),
                "shared search term".to_string(),
            ),
        ];

        index.add_documents(&docs).await.unwrap();
        assert_eq!(index.search("shared", None, 10).unwrap().len(), 2);

        let deleted = index.delete_namespace_term("team:alpha").await.unwrap();
        assert_eq!(deleted, 1);

        assert!(
            index
                .search("shared", Some("team:alpha"), 10)
                .unwrap()
                .is_empty()
        );

        let remaining = index.search("shared", None, 10).unwrap();
        assert_eq!(remaining.len(), 1);
        assert_eq!(remaining[0].0, "doc2");
        assert_eq!(remaining[0].1, "team:beta");
    }

    #[tokio::test]
    async fn test_bm25_lock_release() {
        // Test that lock is released after write
        let temp_dir = TempDir::new().unwrap();
        let path = temp_dir.path().to_str().unwrap();

        let config = BM25Config::default().with_path(path);
        let index1 = BM25Index::new(&config).unwrap();

        // First write - use "hello world" content
        index1
            .add_documents(&[(
                "doc1".to_string(),
                "ns".to_string(),
                "hello world".to_string(),
            )])
            .await
            .unwrap();

        // Drop first instance to ensure all resources released
        drop(index1);

        // Second instance should be able to write (lock released) and see committed data
        let config2 = BM25Config::default().with_path(path);
        let index2 = BM25Index::new(&config2).unwrap();

        // Use same keyword "hello" so both match
        index2
            .add_documents(&[(
                "doc2".to_string(),
                "ns".to_string(),
                "hello there".to_string(),
            )])
            .await
            .unwrap();

        // Both docs should be searchable with "hello"
        let results = index2.search("hello", None, 10).unwrap();
        assert_eq!(results.len(), 2);
    }

    #[test]
    fn test_escape_query() {
        assert_eq!(BM25Index::escape_query("hello world"), "hello world");
        assert_eq!(BM25Index::escape_query("hello+world"), "hello\\+world");
        assert_eq!(BM25Index::escape_query("test:query"), "test\\:query");
    }
}