Skip to main content

nodedb_fts/index/
writer.rs

1//! Core FtsIndex: indexing and document management over any backend.
2
3use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use tracing::debug;
7
8use crate::backend::FtsBackend;
9use crate::block::CompactPosting;
10use crate::codec::DocIdMap;
11use crate::codec::smallfloat;
12use crate::lsm::compaction;
13use crate::lsm::memtable::{Memtable, MemtableConfig};
14use crate::lsm::segment::writer as seg_writer;
15use crate::posting::Bm25Params;
16
17/// Full-text search index generic over storage backend.
18///
19/// Provides identical indexing, search, and highlighting logic
20/// for Origin (redb), Lite (in-memory), and WASM deployments.
21///
22/// Writes accumulate in an in-memory `Memtable`. When the memtable
23/// exceeds its threshold, it is flushed to an immutable segment
24/// stored via the backend. Queries merge the active memtable with
25/// all persisted segments.
26pub struct FtsIndex<B: FtsBackend> {
27    pub(crate) backend: B,
28    pub(crate) bm25_params: Bm25Params,
29    pub(crate) memtable: Memtable,
30    /// Monotonic segment ID counter.
31    next_segment_id: AtomicU64,
32}
33
34impl<B: FtsBackend> FtsIndex<B> {
35    /// Create a new FTS index with the given backend and default BM25 params.
36    pub fn new(backend: B) -> Self {
37        Self {
38            backend,
39            bm25_params: Bm25Params::default(),
40            memtable: Memtable::new(MemtableConfig::default()),
41            next_segment_id: AtomicU64::new(1),
42        }
43    }
44
45    /// Create a new FTS index with custom BM25 parameters.
46    pub fn with_params(backend: B, params: Bm25Params) -> Self {
47        Self {
48            backend,
49            bm25_params: params,
50            memtable: Memtable::new(MemtableConfig::default()),
51            next_segment_id: AtomicU64::new(1),
52        }
53    }
54
55    /// Access the underlying backend.
56    pub fn backend(&self) -> &B {
57        &self.backend
58    }
59
60    /// Mutable access to the underlying backend.
61    pub fn backend_mut(&mut self) -> &mut B {
62        &mut self.backend
63    }
64
65    /// Access the active memtable (for LSM query merging).
66    pub fn memtable(&self) -> &Memtable {
67        &self.memtable
68    }
69
70    /// Load the DocIdMap for a collection from backend metadata.
71    pub fn load_doc_id_map(&self, tid: u32, collection: &str) -> Result<DocIdMap, B::Error> {
72        match self.backend.read_meta(tid, collection, "docmap")? {
73            Some(bytes) => Ok(DocIdMap::from_bytes(&bytes).unwrap_or_default()),
74            None => Ok(DocIdMap::new()),
75        }
76    }
77
78    /// Persist the DocIdMap for a collection to backend metadata.
79    fn save_doc_id_map(&self, tid: u32, collection: &str, map: &DocIdMap) -> Result<(), B::Error> {
80        self.backend
81            .write_meta(tid, collection, "docmap", &map.to_bytes())
82    }
83
84    /// Index a document's text content.
85    pub fn index_document(
86        &self,
87        tid: u32,
88        collection: &str,
89        doc_id: &str,
90        text: &str,
91    ) -> Result<(), B::Error> {
92        let tokens = self.analyze_for_collection(tid, collection, text)?;
93        if tokens.is_empty() {
94            return Ok(());
95        }
96
97        let mut doc_map = self.load_doc_id_map(tid, collection)?;
98        let int_id = doc_map.get_or_assign(doc_id);
99        self.save_doc_id_map(tid, collection, &doc_map)?;
100
101        let mut term_data: HashMap<&str, (u32, Vec<u32>)> = HashMap::new();
102        for (pos, token) in tokens.iter().enumerate() {
103            let entry = term_data.entry(token.as_str()).or_insert((0, Vec::new()));
104            entry.0 += 1;
105            entry.1.push(pos as u32);
106        }
107
108        let doc_len = tokens.len() as u32;
109
110        for (term, (freq, positions)) in &term_data {
111            let compact = CompactPosting {
112                doc_id: int_id,
113                term_freq: *freq,
114                fieldnorm: smallfloat::encode(doc_len),
115                positions: positions.clone(),
116            };
117            let scoped_term = memtable_key(tid, collection, term);
118            self.memtable.insert(&scoped_term, compact);
119        }
120        self.memtable.record_doc(int_id, doc_len);
121
122        // Write document length, fieldnorm, and update incremental stats.
123        self.backend
124            .write_doc_length(tid, collection, doc_id, doc_len)?;
125        self.write_fieldnorm(tid, collection, int_id, doc_len)?;
126        self.backend.increment_stats(tid, collection, doc_len)?;
127
128        if self.memtable.should_flush() {
129            self.flush_memtable(tid, collection)?;
130        }
131
132        debug!(tid, %collection, %doc_id, int_id, tokens = tokens.len(), terms = term_data.len(), "indexed document");
133        Ok(())
134    }
135
136    /// Flush the active memtable to an immutable segment.
137    fn flush_memtable(&self, tid: u32, collection: &str) -> Result<(), B::Error> {
138        let drained = self.memtable.drain();
139        if drained.is_empty() {
140            return Ok(());
141        }
142
143        let segment_bytes = seg_writer::flush_to_segment(drained);
144        let seg_id = self.next_segment_id.fetch_add(1, Ordering::Relaxed);
145        let id = compaction::segment_id(seg_id, 0);
146        self.backend
147            .write_segment(tid, collection, &id, &segment_bytes)?;
148
149        debug!(tid, %collection, seg_id, bytes = segment_bytes.len(), "flushed memtable to segment");
150        Ok(())
151    }
152
153    /// Remove a document from the index.
154    pub fn remove_document(
155        &self,
156        tid: u32,
157        collection: &str,
158        doc_id: &str,
159    ) -> Result<(), B::Error> {
160        let doc_len = self.backend.read_doc_length(tid, collection, doc_id)?;
161
162        let mut doc_map = self.load_doc_id_map(tid, collection)?;
163        if let Some(int_id) = doc_map.to_u32(doc_id) {
164            self.memtable.remove_doc(int_id);
165        }
166        doc_map.remove(doc_id);
167        self.save_doc_id_map(tid, collection, &doc_map)?;
168
169        self.backend.remove_doc_length(tid, collection, doc_id)?;
170
171        if let Some(len) = doc_len {
172            self.backend.decrement_stats(tid, collection, len)?;
173        }
174
175        Ok(())
176    }
177
178    /// Purge all entries for a collection. Returns count of removed entries.
179    pub fn purge_collection(&self, tid: u32, collection: &str) -> Result<usize, B::Error> {
180        self.memtable
181            .drain_collection(&memtable_collection_prefix(tid, collection));
182        self.backend.purge_collection(tid, collection)
183    }
184
185    /// Purge all entries for a tenant across every collection.
186    pub fn purge_tenant(&self, tid: u32) -> Result<usize, B::Error> {
187        self.memtable.drain_collection(&memtable_tenant_prefix(tid));
188        self.backend.purge_tenant(tid)
189    }
190}
191
192/// Memtable key format: `"{tid}:{collection}:{term}"`. The memtable is a
193/// single in-memory map shared across tenants, so keys must carry the
194/// full tenant + collection scope.
195pub(crate) fn memtable_key(tid: u32, collection: &str, term: &str) -> String {
196    format!("{tid}:{collection}:{term}")
197}
198
199/// Prefix used by `drain_collection` to remove all memtable entries for
200/// a given `(tid, collection)`.
201pub(crate) fn memtable_collection_prefix(tid: u32, collection: &str) -> String {
202    format!("{tid}:{collection}:")
203}
204
205/// Prefix used to remove every memtable entry for a given tenant.
206pub(crate) fn memtable_tenant_prefix(tid: u32) -> String {
207    format!("{tid}:")
208}
209
210#[cfg(test)]
211mod tests {
212    use crate::backend::memory::MemoryBackend;
213
214    use super::*;
215
216    const T: u32 = 1;
217
218    fn make_index() -> FtsIndex<MemoryBackend> {
219        FtsIndex::new(MemoryBackend::new())
220    }
221
222    #[test]
223    fn index_writes_to_memtable() {
224        let idx = make_index();
225        idx.index_document(T, "docs", "d1", "hello world greeting")
226            .unwrap();
227
228        assert!(!idx.memtable.is_empty());
229        assert!(idx.memtable.posting_count() > 0);
230    }
231
232    #[test]
233    fn memtable_flush_on_threshold() {
234        let backend = MemoryBackend::new();
235        let idx = FtsIndex {
236            backend,
237            bm25_params: Bm25Params::default(),
238            memtable: Memtable::new(MemtableConfig {
239                max_postings: 5,
240                max_terms: 100,
241            }),
242            next_segment_id: AtomicU64::new(1),
243        };
244
245        idx.index_document(T, "docs", "d1", "alpha bravo charlie delta echo foxtrot")
246            .unwrap();
247
248        assert!(idx.memtable.is_empty());
249        let segments = idx.backend.list_segments(T, "docs").unwrap();
250        assert!(!segments.is_empty(), "segment should have been written");
251    }
252
253    #[test]
254    fn index_assigns_doc_ids() {
255        let idx = make_index();
256        idx.index_document(T, "docs", "d1", "hello world greeting")
257            .unwrap();
258        idx.index_document(T, "docs", "d2", "hello rust language")
259            .unwrap();
260
261        let map = idx.load_doc_id_map(T, "docs").unwrap();
262        assert_eq!(map.to_u32("d1"), Some(0));
263        assert_eq!(map.to_u32("d2"), Some(1));
264    }
265
266    #[test]
267    fn remove_tombstones_docmap() {
268        let idx = make_index();
269        idx.index_document(T, "docs", "d1", "hello world").unwrap();
270        idx.index_document(T, "docs", "d2", "hello rust").unwrap();
271
272        idx.remove_document(T, "docs", "d1").unwrap();
273
274        let map = idx.load_doc_id_map(T, "docs").unwrap();
275        assert_eq!(map.to_u32("d1"), None);
276        assert_eq!(map.to_u32("d2"), Some(1));
277    }
278
279    #[test]
280    fn index_updates_stats() {
281        let idx = make_index();
282        idx.index_document(T, "docs", "d1", "hello world greeting")
283            .unwrap();
284        idx.index_document(T, "docs", "d2", "hello rust language")
285            .unwrap();
286
287        let (count, total) = idx.backend.collection_stats(T, "docs").unwrap();
288        assert_eq!(count, 2);
289        assert!(total > 0);
290    }
291
292    #[test]
293    fn remove_decrements_stats() {
294        let idx = make_index();
295        idx.index_document(T, "docs", "d1", "hello world").unwrap();
296        idx.index_document(T, "docs", "d2", "hello rust").unwrap();
297
298        idx.remove_document(T, "docs", "d1").unwrap();
299
300        let (count, _) = idx.backend.collection_stats(T, "docs").unwrap();
301        assert_eq!(count, 1);
302    }
303
304    #[test]
305    fn purge_collection_preserves_others() {
306        let idx = make_index();
307        idx.index_document(T, "col_a", "d1", "alpha bravo").unwrap();
308        idx.index_document(T, "col_b", "d1", "delta echo").unwrap();
309
310        idx.purge_collection(T, "col_a").unwrap();
311        assert_eq!(idx.backend.collection_stats(T, "col_a").unwrap(), (0, 0));
312        assert!(idx.backend.collection_stats(T, "col_b").unwrap().0 > 0);
313
314        assert!(
315            !idx.memtable
316                .get_postings(&memtable_key(T, "col_b", "delta"))
317                .is_empty()
318        );
319        assert!(
320            idx.memtable
321                .get_postings(&memtable_key(T, "col_a", "alpha"))
322                .is_empty()
323        );
324    }
325
326    #[test]
327    fn empty_text_is_noop() {
328        let idx = make_index();
329        idx.index_document(T, "docs", "d1", "the a is").unwrap();
330        assert_eq!(idx.backend.collection_stats(T, "docs").unwrap(), (0, 0));
331        assert!(idx.memtable.is_empty());
332    }
333}