Skip to main content

nodedb_fts/index/
writer.rs

1//! Core FtsIndex: indexing and document management over any backend.
2
3use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use tracing::debug;
7
8use crate::backend::FtsBackend;
9use crate::block::CompactPosting;
10use crate::codec::DocIdMap;
11use crate::codec::smallfloat;
12use crate::lsm::compaction;
13use crate::lsm::memtable::{Memtable, MemtableConfig};
14use crate::lsm::segment::writer as seg_writer;
15use crate::posting::Bm25Params;
16
17/// Full-text search index generic over storage backend.
18///
19/// Provides identical indexing, search, and highlighting logic
20/// for Origin (redb), Lite (in-memory), and WASM deployments.
21///
22/// Writes accumulate in an in-memory `Memtable`. When the memtable
23/// exceeds its threshold, it is flushed to an immutable segment
24/// stored via the backend. Queries merge the active memtable with
25/// all persisted segments.
26pub struct FtsIndex<B: FtsBackend> {
27    pub(crate) backend: B,
28    pub(crate) bm25_params: Bm25Params,
29    pub(crate) memtable: Memtable,
30    /// Monotonic segment ID counter.
31    next_segment_id: AtomicU64,
32}
33
34impl<B: FtsBackend> FtsIndex<B> {
35    /// Create a new FTS index with the given backend and default BM25 params.
36    pub fn new(backend: B) -> Self {
37        Self {
38            backend,
39            bm25_params: Bm25Params::default(),
40            memtable: Memtable::new(MemtableConfig::default()),
41            next_segment_id: AtomicU64::new(1),
42        }
43    }
44
45    /// Create a new FTS index with custom BM25 parameters.
46    pub fn with_params(backend: B, params: Bm25Params) -> Self {
47        Self {
48            backend,
49            bm25_params: params,
50            memtable: Memtable::new(MemtableConfig::default()),
51            next_segment_id: AtomicU64::new(1),
52        }
53    }
54
55    /// Access the underlying backend.
56    pub fn backend(&self) -> &B {
57        &self.backend
58    }
59
60    /// Mutable access to the underlying backend.
61    pub fn backend_mut(&mut self) -> &mut B {
62        &mut self.backend
63    }
64
65    /// Access the active memtable (for LSM query merging).
66    pub fn memtable(&self) -> &Memtable {
67        &self.memtable
68    }
69
70    /// Load the DocIdMap for a collection from backend metadata.
71    pub fn load_doc_id_map(&self, collection: &str) -> Result<DocIdMap, B::Error> {
72        let key = format!("{collection}:docmap");
73        match self.backend.read_meta(&key)? {
74            Some(bytes) => Ok(DocIdMap::from_bytes(&bytes).unwrap_or_default()),
75            None => Ok(DocIdMap::new()),
76        }
77    }
78
79    /// Persist the DocIdMap for a collection to backend metadata.
80    fn save_doc_id_map(&self, collection: &str, map: &DocIdMap) -> Result<(), B::Error> {
81        let key = format!("{collection}:docmap");
82        self.backend.write_meta(&key, &map.to_bytes())
83    }
84
85    /// Index a document's text content.
86    ///
87    /// Analyzes `text` into tokens, writes postings to the LSM memtable,
88    /// and flushes to an immutable segment when the threshold is reached.
89    pub fn index_document(
90        &self,
91        collection: &str,
92        doc_id: &str,
93        text: &str,
94    ) -> Result<(), B::Error> {
95        let tokens = self.analyze_for_collection(collection, text)?;
96        if tokens.is_empty() {
97            return Ok(());
98        }
99
100        // Assign u32 ID and persist map.
101        let mut doc_map = self.load_doc_id_map(collection)?;
102        let int_id = doc_map.get_or_assign(doc_id);
103        self.save_doc_id_map(collection, &doc_map)?;
104
105        // Build per-term frequency and position data.
106        let mut term_data: HashMap<&str, (u32, Vec<u32>)> = HashMap::new();
107        for (pos, token) in tokens.iter().enumerate() {
108            let entry = term_data.entry(token.as_str()).or_insert((0, Vec::new()));
109            entry.0 += 1;
110            entry.1.push(pos as u32);
111        }
112
113        let doc_len = tokens.len() as u32;
114
115        // Write to LSM memtable (CompactPosting with u32 doc ID).
116        for (term, (freq, positions)) in &term_data {
117            let compact = CompactPosting {
118                doc_id: int_id,
119                term_freq: *freq,
120                fieldnorm: smallfloat::encode(doc_len),
121                positions: positions.clone(),
122            };
123            let scoped_term = format!("{collection}:{term}");
124            self.memtable.insert(&scoped_term, compact);
125        }
126        self.memtable.record_doc(int_id, doc_len);
127
128        // Write document length, fieldnorm, and update incremental stats.
129        // Note: postings are NOT written to the backend — they live in the LSM
130        // memtable (and segments after flush). The backend stores only metadata
131        // (doc lengths, stats, DocIdMap, fieldnorms). Origin's transaction-based
132        // writes bypass FtsIndex entirely and write directly to redb tables.
133        self.backend.write_doc_length(collection, doc_id, doc_len)?;
134        self.write_fieldnorm(collection, int_id, doc_len)?;
135        self.backend.increment_stats(collection, doc_len)?;
136
137        // Check if memtable needs flushing.
138        if self.memtable.should_flush() {
139            self.flush_memtable(collection)?;
140        }
141
142        debug!(%collection, %doc_id, int_id, tokens = tokens.len(), terms = term_data.len(), "indexed document");
143        Ok(())
144    }
145
146    /// Flush the active memtable to an immutable segment.
147    fn flush_memtable(&self, collection: &str) -> Result<(), B::Error> {
148        let drained = self.memtable.drain();
149        if drained.is_empty() {
150            return Ok(());
151        }
152
153        let segment_bytes = seg_writer::flush_to_segment(drained);
154        let seg_id = self.next_segment_id.fetch_add(1, Ordering::Relaxed);
155        let key = compaction::segment_key(collection, seg_id, 0);
156        self.backend.write_segment(&key, &segment_bytes)?;
157
158        debug!(%collection, seg_id, bytes = segment_bytes.len(), "flushed memtable to segment");
159        Ok(())
160    }
161
162    /// Remove a document from the index.
163    ///
164    /// Tombstones the document in the DocIdMap, removes from the LSM memtable,
165    /// and decrements backend stats. Segment postings are filtered at query time
166    /// via the DocIdMap tombstone.
167    pub fn remove_document(&self, collection: &str, doc_id: &str) -> Result<(), B::Error> {
168        // Read doc length before removing (needed for stats decrement).
169        let doc_len = self.backend.read_doc_length(collection, doc_id)?;
170
171        // Tombstone in DocIdMap and remove from memtable.
172        let mut doc_map = self.load_doc_id_map(collection)?;
173        if let Some(int_id) = doc_map.to_u32(doc_id) {
174            self.memtable.remove_doc(int_id);
175        }
176        doc_map.remove(doc_id);
177        self.save_doc_id_map(collection, &doc_map)?;
178
179        self.backend.remove_doc_length(collection, doc_id)?;
180
181        if let Some(len) = doc_len {
182            self.backend.decrement_stats(collection, len)?;
183        }
184
185        Ok(())
186    }
187
188    /// Purge all entries for a collection. Returns count of removed entries.
189    pub fn purge_collection(&self, collection: &str) -> Result<usize, B::Error> {
190        // Selectively remove only this collection's terms from the shared memtable.
191        self.memtable.drain_collection(collection);
192        self.backend.purge_collection(collection)
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use crate::backend::memory::MemoryBackend;
199
200    use super::*;
201
202    fn make_index() -> FtsIndex<MemoryBackend> {
203        FtsIndex::new(MemoryBackend::new())
204    }
205
206    #[test]
207    fn index_writes_to_memtable() {
208        let idx = make_index();
209        idx.index_document("docs", "d1", "hello world greeting")
210            .unwrap();
211
212        // Memtable should have postings.
213        assert!(!idx.memtable.is_empty());
214        assert!(idx.memtable.posting_count() > 0);
215    }
216
217    #[test]
218    fn memtable_flush_on_threshold() {
219        // Use a tiny threshold to trigger flush.
220        let backend = MemoryBackend::new();
221        let idx = FtsIndex {
222            backend,
223            bm25_params: Bm25Params::default(),
224            memtable: Memtable::new(MemtableConfig {
225                max_postings: 5,
226                max_terms: 100,
227            }),
228            next_segment_id: AtomicU64::new(1),
229        };
230
231        // Index enough to exceed 5 postings.
232        idx.index_document("docs", "d1", "alpha bravo charlie delta echo foxtrot")
233            .unwrap();
234
235        // After flush, memtable should be empty and a segment should exist.
236        assert!(idx.memtable.is_empty());
237        let segments = idx.backend.list_segments("docs").unwrap();
238        assert!(!segments.is_empty(), "segment should have been written");
239    }
240
241    #[test]
242    fn index_assigns_doc_ids() {
243        let idx = make_index();
244        idx.index_document("docs", "d1", "hello world greeting")
245            .unwrap();
246        idx.index_document("docs", "d2", "hello rust language")
247            .unwrap();
248
249        let map = idx.load_doc_id_map("docs").unwrap();
250        assert_eq!(map.to_u32("d1"), Some(0));
251        assert_eq!(map.to_u32("d2"), Some(1));
252    }
253
254    #[test]
255    fn remove_tombstones_docmap() {
256        let idx = make_index();
257        idx.index_document("docs", "d1", "hello world").unwrap();
258        idx.index_document("docs", "d2", "hello rust").unwrap();
259
260        idx.remove_document("docs", "d1").unwrap();
261
262        // DocIdMap should have d1 tombstoned.
263        let map = idx.load_doc_id_map("docs").unwrap();
264        assert_eq!(map.to_u32("d1"), None);
265        assert_eq!(map.to_u32("d2"), Some(1));
266    }
267
268    #[test]
269    fn index_updates_stats() {
270        let idx = make_index();
271        idx.index_document("docs", "d1", "hello world greeting")
272            .unwrap();
273        idx.index_document("docs", "d2", "hello rust language")
274            .unwrap();
275
276        let (count, total) = idx.backend.collection_stats("docs").unwrap();
277        assert_eq!(count, 2);
278        assert!(total > 0);
279    }
280
281    #[test]
282    fn remove_decrements_stats() {
283        let idx = make_index();
284        idx.index_document("docs", "d1", "hello world").unwrap();
285        idx.index_document("docs", "d2", "hello rust").unwrap();
286
287        idx.remove_document("docs", "d1").unwrap();
288
289        let (count, _) = idx.backend.collection_stats("docs").unwrap();
290        assert_eq!(count, 1);
291    }
292
293    #[test]
294    fn purge_collection_preserves_others() {
295        let idx = make_index();
296        idx.index_document("col_a", "d1", "alpha bravo").unwrap();
297        idx.index_document("col_b", "d1", "delta echo").unwrap();
298
299        idx.purge_collection("col_a").unwrap();
300        assert_eq!(idx.backend.collection_stats("col_a").unwrap(), (0, 0));
301        assert!(idx.backend.collection_stats("col_b").unwrap().0 > 0);
302
303        // col_b's memtable postings should still be queryable.
304        assert!(!idx.memtable.get_postings("col_b:delta").is_empty());
305        // col_a's memtable postings should be gone.
306        assert!(idx.memtable.get_postings("col_a:alpha").is_empty());
307    }
308
309    #[test]
310    fn empty_text_is_noop() {
311        let idx = make_index();
312        idx.index_document("docs", "d1", "the a is").unwrap();
313        assert_eq!(idx.backend.collection_stats("docs").unwrap(), (0, 0));
314        assert!(idx.memtable.is_empty());
315    }
316}