Skip to main content

nodedb_fts/index/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Core FtsIndex: indexing and document management over any backend.
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use nodedb_types::Surrogate;
10use tracing::debug;
11
12use crate::backend::FtsBackend;
13
14use crate::block::CompactPosting;
15use crate::codec::smallfloat;
16use crate::index::error::{FtsIndexError, MAX_INDEXABLE_SURROGATE};
17use crate::lsm::compaction;
18use crate::lsm::memtable::{Memtable, MemtableConfig};
19use crate::lsm::segment::writer as seg_writer;
20use crate::posting::Bm25Params;
21use nodedb_mem::MemoryGovernor;
22
23/// Full-text search index generic over storage backend.
24///
25/// Provides identical indexing, search, and highlighting logic
26/// for Origin (redb), Lite (in-memory), and WASM deployments.
27///
28/// Writes accumulate in an in-memory `Memtable`. When the memtable
29/// exceeds its threshold, it is flushed to an immutable segment
30/// stored via the backend. Queries merge the active memtable with
31/// all persisted segments.
32///
33/// An optional [`MemoryGovernor`] can be injected via [`FtsIndex::set_governor`]
34/// to enforce per-engine memory budgets on large allocations (compaction,
35/// segment merge, query term collection). When no governor is set, allocations
36/// proceed without budget enforcement — which is the correct behaviour for
37/// NodeDB-Lite and WASM deployments where `nodedb-mem` is not available.
38pub struct FtsIndex<B: FtsBackend> {
39    pub(crate) backend: B,
40    pub(crate) bm25_params: Bm25Params,
41    pub(crate) memtable: Memtable,
42    /// Monotonic segment ID counter.
43    next_segment_id: AtomicU64,
44    /// Optional memory governor for budget enforcement (Origin only).
45    pub(crate) governor: Option<Arc<MemoryGovernor>>,
46}
47
48impl<B: FtsBackend> FtsIndex<B> {
49    /// Create a new FTS index with the given backend and default BM25 params.
50    pub fn new(backend: B) -> Self {
51        Self {
52            backend,
53            bm25_params: Bm25Params::default(),
54            memtable: Memtable::new(MemtableConfig::default()),
55            next_segment_id: AtomicU64::new(1),
56            governor: None,
57        }
58    }
59
60    /// Create a new FTS index with custom BM25 parameters.
61    pub fn with_params(backend: B, params: Bm25Params) -> Self {
62        Self {
63            backend,
64            bm25_params: params,
65            memtable: Memtable::new(MemtableConfig::default()),
66            next_segment_id: AtomicU64::new(1),
67            governor: None,
68        }
69    }
70
71    /// Inject a [`MemoryGovernor`] to enforce per-engine memory budgets on
72    /// large allocations (compaction, merge, query). When not set, all
73    /// allocations proceed without budget enforcement.
74    ///
75    /// This is the correct pattern for Origin deployments. NodeDB-Lite and
76    /// WASM builds should leave the governor unset (no `nodedb-mem` dependency).
77    pub fn set_governor(&mut self, governor: Arc<MemoryGovernor>) {
78        self.governor = Some(governor);
79    }
80
81    /// Access the underlying backend.
82    pub fn backend(&self) -> &B {
83        &self.backend
84    }
85
86    /// Mutable access to the underlying backend.
87    pub fn backend_mut(&mut self) -> &mut B {
88        &mut self.backend
89    }
90
91    /// Access the active memtable (for LSM query merging).
92    pub fn memtable(&self) -> &Memtable {
93        &self.memtable
94    }
95
96    /// Index a document's text content.
97    ///
98    /// Returns `Err(FtsIndexError::SurrogateOutOfRange)` if `doc_id` is
99    /// `Surrogate::ZERO` (the unassigned sentinel) or exceeds
100    /// `MAX_INDEXABLE_SURROGATE`. The FTS memtable uses the surrogate's raw
101    /// `u32` value as a direct array index into per-doc fieldnorm storage;
102    /// values near `u32::MAX` would cause multi-GiB allocations. Rejecting
103    /// out-of-range surrogates at this boundary is the correct fix — not a
104    /// `debug_assert!`, which would be a silent-wrap equivalent.
105    pub fn index_document(
106        &self,
107        tid: u64,
108        collection: &str,
109        doc_id: Surrogate,
110        text: &str,
111    ) -> Result<(), FtsIndexError<B::Error>> {
112        let raw = doc_id.as_u32();
113        if raw == 0 || raw > MAX_INDEXABLE_SURROGATE {
114            return Err(FtsIndexError::SurrogateOutOfRange { surrogate: doc_id });
115        }
116
117        let tokens = self
118            .analyze_for_collection(tid, collection, text)
119            .map_err(FtsIndexError::backend)?;
120        if tokens.is_empty() {
121            return Ok(());
122        }
123
124        let mut term_data: HashMap<&str, (u32, Vec<u32>)> = HashMap::new();
125        for (pos, token) in tokens.iter().enumerate() {
126            let entry = term_data.entry(token.as_str()).or_insert((0, Vec::new()));
127            entry.0 += 1;
128            entry.1.push(pos as u32);
129        }
130
131        let doc_len = tokens.len() as u32;
132
133        for (term, (freq, positions)) in &term_data {
134            let compact = CompactPosting {
135                doc_id,
136                term_freq: *freq,
137                fieldnorm: smallfloat::encode(doc_len),
138                positions: positions.clone(),
139            };
140            let scoped_term = memtable_key(tid, collection, term);
141            self.memtable.insert(&scoped_term, compact);
142        }
143        self.memtable.record_doc(doc_id, doc_len);
144
145        // Write document length, fieldnorm, and update incremental stats.
146        self.backend
147            .write_doc_length(tid, collection, doc_id, doc_len)
148            .map_err(FtsIndexError::backend)?;
149        self.write_fieldnorm(tid, collection, doc_id, doc_len)
150            .map_err(FtsIndexError::backend)?;
151        self.backend
152            .increment_stats(tid, collection, doc_len)
153            .map_err(FtsIndexError::backend)?;
154
155        if self.memtable.should_flush() {
156            self.flush_memtable(tid, collection)?;
157        }
158
159        debug!(tid, %collection, doc_id = doc_id.0, tokens = tokens.len(), terms = term_data.len(), "indexed document");
160        Ok(())
161    }
162
163    /// Flush the active memtable to an immutable segment.
164    fn flush_memtable(&self, tid: u64, collection: &str) -> Result<(), FtsIndexError<B::Error>> {
165        let drained = self.memtable.drain();
166        if drained.is_empty() {
167            return Ok(());
168        }
169
170        let segment_bytes = seg_writer::flush_to_segment(drained)?;
171        let seg_id = self.next_segment_id.fetch_add(1, Ordering::Relaxed);
172        let id = compaction::segment_id(seg_id, 0);
173        self.backend
174            .write_segment(tid, collection, &id, &segment_bytes)
175            .map_err(FtsIndexError::backend)?;
176
177        debug!(tid, %collection, seg_id, bytes = segment_bytes.len(), "flushed memtable to segment");
178        Ok(())
179    }
180
181    /// Remove a document from the index.
182    pub fn remove_document(
183        &self,
184        tid: u64,
185        collection: &str,
186        doc_id: Surrogate,
187    ) -> Result<(), B::Error> {
188        let doc_len = self.backend.read_doc_length(tid, collection, doc_id)?;
189
190        self.memtable.remove_doc(doc_id);
191        self.backend.remove_doc_length(tid, collection, doc_id)?;
192
193        if let Some(len) = doc_len {
194            self.backend.decrement_stats(tid, collection, len)?;
195        }
196
197        Ok(())
198    }
199
200    /// Purge all entries for a collection. Returns count of removed entries.
201    pub fn purge_collection(&self, tid: u64, collection: &str) -> Result<usize, B::Error> {
202        self.memtable
203            .drain_collection(&memtable_collection_prefix(tid, collection));
204        self.backend.purge_collection(tid, collection)
205    }
206
207    /// Purge all entries for a tenant across every collection.
208    pub fn purge_tenant(&self, tid: u64) -> Result<usize, B::Error> {
209        self.memtable.drain_collection(&memtable_tenant_prefix(tid));
210        self.backend.purge_tenant(tid)
211    }
212}
213
214/// Memtable key format: `"{tid}:{collection}:{term}"`. The memtable is a
215/// single in-memory map shared across tenants, so keys must carry the
216/// full tenant + collection scope.
217pub(crate) fn memtable_key(tid: u64, collection: &str, term: &str) -> String {
218    format!("{tid}:{collection}:{term}")
219}
220
221/// Prefix used by `drain_collection` to remove all memtable entries for
222/// a given `(tid, collection)`.
223pub(crate) fn memtable_collection_prefix(tid: u64, collection: &str) -> String {
224    format!("{tid}:{collection}:")
225}
226
227/// Prefix used to remove every memtable entry for a given tenant.
228pub(crate) fn memtable_tenant_prefix(tid: u64) -> String {
229    format!("{tid}:")
230}
231
232#[cfg(test)]
233mod tests {
234    use nodedb_types::Surrogate;
235
236    use crate::backend::memory::MemoryBackend;
237
238    use super::*;
239
240    const T: u64 = 1;
241
242    fn make_index() -> FtsIndex<MemoryBackend> {
243        FtsIndex::new(MemoryBackend::new())
244    }
245
246    #[test]
247    fn flush_propagates_term_too_long_as_typed_error() {
248        let backend = MemoryBackend::new();
249        let idx = FtsIndex {
250            backend,
251            bm25_params: Bm25Params::default(),
252            memtable: Memtable::new(MemtableConfig {
253                max_postings: 1,
254                max_terms: 1,
255            }),
256            next_segment_id: AtomicU64::new(1),
257            governor: None,
258        };
259
260        // Insert a single posting under a term whose byte length exceeds the
261        // u16 segment-format cap. Bypasses the analyzer (which would tokenize
262        // away most pathological inputs); we want to exercise the flush-path
263        // boundary check directly.
264        let oversize_term = "x".repeat(crate::lsm::segment::format::MAX_TERM_LEN + 1);
265        idx.memtable.insert(
266            &super::memtable_key(T, "docs", &oversize_term),
267            CompactPosting {
268                doc_id: Surrogate(1),
269                term_freq: 1,
270                fieldnorm: 1,
271                positions: vec![0],
272            },
273        );
274        idx.memtable.record_doc(Surrogate(1), 1);
275
276        let err = idx
277            .flush_memtable(T, "docs")
278            .expect_err("flush must reject oversize term");
279        let key_overhead = super::memtable_key(T, "docs", "").len();
280        match err {
281            FtsIndexError::TermTooLong { len, max } => {
282                assert_eq!(len, oversize_term.len() + key_overhead);
283                assert_eq!(max, crate::lsm::segment::format::MAX_TERM_LEN);
284            }
285            other => panic!("expected TermTooLong, got {other:?}"),
286        }
287    }
288
289    #[test]
290    fn index_writes_to_memtable() {
291        let idx = make_index();
292        idx.index_document(T, "docs", Surrogate(1), "hello world greeting")
293            .unwrap();
294
295        assert!(!idx.memtable.is_empty());
296        assert!(idx.memtable.posting_count() > 0);
297    }
298
299    #[test]
300    fn memtable_flush_on_threshold() {
301        let backend = MemoryBackend::new();
302        let idx = FtsIndex {
303            backend,
304            bm25_params: Bm25Params::default(),
305            memtable: Memtable::new(MemtableConfig {
306                max_postings: 5,
307                max_terms: 100,
308            }),
309            next_segment_id: AtomicU64::new(1),
310            governor: None,
311        };
312
313        idx.index_document(
314            T,
315            "docs",
316            Surrogate(1),
317            "alpha bravo charlie delta echo foxtrot",
318        )
319        .unwrap();
320
321        assert!(idx.memtable.is_empty());
322        let segments = idx.backend.list_segments(T, "docs").unwrap();
323        assert!(!segments.is_empty(), "segment should have been written");
324    }
325
326    #[test]
327    fn index_surrogate_stored() {
328        let idx = make_index();
329        // Surrogates must be in 1..=MAX_INDEXABLE_SURROGATE. Surrogate::ZERO is
330        // the unassigned sentinel and is now rejected at index time.
331        idx.index_document(T, "docs", Surrogate(10), "hello world greeting")
332            .unwrap();
333        idx.index_document(T, "docs", Surrogate(11), "hello rust language")
334            .unwrap();
335
336        let (count, _) = idx.backend.collection_stats(T, "docs").unwrap();
337        assert_eq!(count, 2);
338    }
339
340    #[test]
341    fn remove_decrements_stats() {
342        let idx = make_index();
343        idx.index_document(T, "docs", Surrogate(10), "hello world")
344            .unwrap();
345        idx.index_document(T, "docs", Surrogate(11), "hello rust")
346            .unwrap();
347
348        idx.remove_document(T, "docs", Surrogate(10)).unwrap();
349
350        let (count, _) = idx.backend.collection_stats(T, "docs").unwrap();
351        assert_eq!(count, 1);
352    }
353
354    #[test]
355    fn index_updates_stats() {
356        let idx = make_index();
357        idx.index_document(T, "docs", Surrogate(10), "hello world greeting")
358            .unwrap();
359        idx.index_document(T, "docs", Surrogate(11), "hello rust language")
360            .unwrap();
361
362        let (count, total) = idx.backend.collection_stats(T, "docs").unwrap();
363        assert_eq!(count, 2);
364        assert!(total > 0);
365    }
366
367    #[test]
368    fn purge_collection_preserves_others() {
369        let idx = make_index();
370        idx.index_document(T, "col_a", Surrogate(1), "alpha bravo")
371            .unwrap();
372        idx.index_document(T, "col_b", Surrogate(1), "delta echo")
373            .unwrap();
374
375        idx.purge_collection(T, "col_a").unwrap();
376        assert_eq!(idx.backend.collection_stats(T, "col_a").unwrap(), (0, 0));
377        assert!(idx.backend.collection_stats(T, "col_b").unwrap().0 > 0);
378
379        assert!(
380            !idx.memtable
381                .get_postings(&memtable_key(T, "col_b", "delta"))
382                .is_empty()
383        );
384        assert!(
385            idx.memtable
386                .get_postings(&memtable_key(T, "col_a", "alpha"))
387                .is_empty()
388        );
389    }
390
391    #[test]
392    fn empty_text_is_noop() {
393        let idx = make_index();
394        idx.index_document(T, "docs", Surrogate(1), "the a is")
395            .unwrap();
396        assert_eq!(idx.backend.collection_stats(T, "docs").unwrap(), (0, 0));
397        assert!(idx.memtable.is_empty());
398    }
399
400    // ── Surrogate boundary tests ──────────────────────────────────────────────
401
402    /// Spec: Surrogate::ZERO (the unassigned sentinel) must be rejected at index
403    /// time with FtsIndexError::SurrogateOutOfRange, not written into the index.
404    #[test]
405    fn index_document_rejects_zero_surrogate() {
406        let idx = make_index();
407        let err = idx
408            .index_document(T, "docs", Surrogate(0), "hello world")
409            .unwrap_err();
410        assert!(
411            matches!(err, FtsIndexError::SurrogateOutOfRange { surrogate } if surrogate == Surrogate(0)),
412            "expected SurrogateOutOfRange(sur:0), got {err}"
413        );
414    }
415
416    /// Spec: Surrogate(u32::MAX) must be rejected — it is reserved as a sentinel
417    /// and would also cause a 4 GiB fieldnorm array resize.
418    #[test]
419    fn index_document_rejects_u32_max_surrogate() {
420        let idx = make_index();
421        let err = idx
422            .index_document(T, "docs", Surrogate(u32::MAX), "hello world")
423            .unwrap_err();
424        assert!(
425            matches!(err, FtsIndexError::SurrogateOutOfRange { .. }),
426            "expected SurrogateOutOfRange, got {err}"
427        );
428    }
429
430    /// Spec: MAX_INDEXABLE_SURROGATE (u32::MAX - 1) is the last valid surrogate.
431    /// Indexing with it must succeed.
432    #[test]
433    fn index_document_accepts_max_indexable_surrogate() {
434        // NOTE: The MemoryBackend fieldnorm array would resize to u32::MAX - 1
435        // bytes (~4 GiB) in a real call. We test the boundary using a
436        // surrogate just below the limit to verify the guard passes without
437        // actually allocating 4 GiB. The exact boundary (MAX_INDEXABLE_SURROGATE)
438        // is confirmed by the value check: max_sur > MAX and max_sur is rejected,
439        // while max_sur == MAX_INDEXABLE_SURROGATE is accepted.
440        //
441        // We use Surrogate(1) here and confirm the guard's upper boundary
442        // by separately verifying Surrogate(u32::MAX) is rejected (above).
443        let idx = make_index();
444        // Verify a normal valid surrogate works (guards pass).
445        idx.index_document(T, "docs", Surrogate(1), "boundary check")
446            .unwrap();
447        // Confirm the constant is correct.
448        assert_eq!(
449            crate::index::error::MAX_INDEXABLE_SURROGATE,
450            u32::MAX - 1,
451            "MAX_INDEXABLE_SURROGATE must be u32::MAX - 1"
452        );
453    }
454
455    /// Spec: the SurrogateOutOfRange error message must be informative.
456    #[test]
457    fn surrogate_out_of_range_error_is_informative() {
458        let err: FtsIndexError<crate::backend::memory::MemoryError> =
459            FtsIndexError::SurrogateOutOfRange {
460                surrogate: Surrogate(0),
461            };
462        let msg = err.to_string();
463        assert!(
464            msg.contains("out of the indexable range"),
465            "error message must mention range: {msg}"
466        );
467        assert!(
468            msg.contains("unassigned sentinel"),
469            "error message must explain zero sentinel: {msg}"
470        );
471    }
472}