Skip to main content

nodedb_fts/index/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Core FtsIndex: indexing and document management over any backend.
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use nodedb_types::Surrogate;
10use tracing::debug;
11
12use crate::backend::FtsBackend;
13
14use crate::block::CompactPosting;
15use crate::codec::smallfloat;
16use crate::index::error::{FtsIndexError, MAX_INDEXABLE_SURROGATE};
17use crate::lsm::compaction;
18use crate::lsm::memtable::{Memtable, MemtableConfig};
19use crate::lsm::segment::writer as seg_writer;
20use crate::posting::Bm25Params;
21use nodedb_mem::MemoryGovernor;
22
23/// Full-text search index generic over storage backend.
24///
25/// Provides identical indexing, search, and highlighting logic
26/// for Origin (redb), Lite (in-memory), and WASM deployments.
27///
28/// Writes accumulate in an in-memory `Memtable`. When the memtable
29/// exceeds its threshold, it is flushed to an immutable segment
30/// stored via the backend. Queries merge the active memtable with
31/// all persisted segments.
32///
33/// An optional [`MemoryGovernor`] can be injected via [`FtsIndex::set_governor`]
34/// to enforce per-engine memory budgets on large allocations (compaction,
35/// segment merge, query term collection). When no governor is set, allocations
36/// proceed without budget enforcement — which is the correct behaviour for
37/// NodeDB-Lite and WASM deployments where `nodedb-mem` is not available.
38pub struct FtsIndex<B: FtsBackend> {
39    pub(crate) backend: B,
40    pub(crate) bm25_params: Bm25Params,
41    pub(crate) memtable: Memtable,
42    /// Monotonic segment ID counter.
43    next_segment_id: AtomicU64,
44    /// Optional memory governor for budget enforcement (Origin only).
45    pub(crate) governor: Option<Arc<MemoryGovernor>>,
46}
47
48impl<B: FtsBackend> FtsIndex<B> {
49    /// Create a new FTS index with the given backend and default BM25 params.
50    pub fn new(backend: B) -> Self {
51        Self {
52            backend,
53            bm25_params: Bm25Params::default(),
54            memtable: Memtable::new(MemtableConfig::default()),
55            next_segment_id: AtomicU64::new(1),
56            governor: None,
57        }
58    }
59
60    /// Create a new FTS index with custom BM25 parameters.
61    pub fn with_params(backend: B, params: Bm25Params) -> Self {
62        Self {
63            backend,
64            bm25_params: params,
65            memtable: Memtable::new(MemtableConfig::default()),
66            next_segment_id: AtomicU64::new(1),
67            governor: None,
68        }
69    }
70
71    /// Inject a [`MemoryGovernor`] to enforce per-engine memory budgets on
72    /// large allocations (compaction, merge, query). When not set, all
73    /// allocations proceed without budget enforcement.
74    ///
75    /// This is the correct pattern for Origin deployments. NodeDB-Lite and
76    /// WASM builds should leave the governor unset (no `nodedb-mem` dependency).
77    pub fn set_governor(&mut self, governor: Arc<MemoryGovernor>) {
78        self.governor = Some(governor);
79    }
80
81    /// Access the underlying backend.
82    pub fn backend(&self) -> &B {
83        &self.backend
84    }
85
86    /// Mutable access to the underlying backend.
87    pub fn backend_mut(&mut self) -> &mut B {
88        &mut self.backend
89    }
90
91    /// Access the active memtable (for LSM query merging).
92    pub fn memtable(&self) -> &Memtable {
93        &self.memtable
94    }
95
96    /// Index a document's text content.
97    ///
98    /// Returns `Err(FtsIndexError::SurrogateOutOfRange)` if `doc_id` is
99    /// `Surrogate::ZERO` (the unassigned sentinel) or exceeds
100    /// `MAX_INDEXABLE_SURROGATE`. The FTS memtable uses the surrogate's raw
101    /// `u32` value as a direct array index into per-doc fieldnorm storage;
102    /// values near `u32::MAX` would cause multi-GiB allocations. Rejecting
103    /// out-of-range surrogates at this boundary is the correct fix — not a
104    /// `debug_assert!`, which would be a silent-wrap equivalent.
105    pub fn index_document(
106        &self,
107        tid: u64,
108        collection: &str,
109        doc_id: Surrogate,
110        text: &str,
111    ) -> Result<(), FtsIndexError<B::Error>> {
112        let raw = doc_id.as_u32();
113        if raw == 0 || raw > MAX_INDEXABLE_SURROGATE {
114            return Err(FtsIndexError::SurrogateOutOfRange { surrogate: doc_id });
115        }
116
117        let tokens = self
118            .analyze_for_collection(tid, collection, text)
119            .map_err(FtsIndexError::backend)?;
120        if tokens.is_empty() {
121            return Ok(());
122        }
123
124        let mut term_data: HashMap<&str, (u32, Vec<u32>)> = HashMap::new();
125        for (pos, token) in tokens.iter().enumerate() {
126            let entry = term_data.entry(token.as_str()).or_insert((0, Vec::new()));
127            entry.0 += 1;
128            entry.1.push(pos as u32);
129        }
130
131        let doc_len = tokens.len() as u32;
132
133        for (term, (freq, positions)) in &term_data {
134            let compact = CompactPosting {
135                doc_id,
136                term_freq: *freq,
137                fieldnorm: smallfloat::encode(doc_len),
138                positions: positions.clone(),
139            };
140            let scoped_term = memtable_key(tid, collection, term);
141            self.memtable.insert(&scoped_term, compact);
142        }
143        self.memtable.record_doc(doc_id, doc_len);
144
145        // Write document length, fieldnorm, and update incremental stats.
146        self.backend
147            .write_doc_length(tid, collection, doc_id, doc_len)
148            .map_err(FtsIndexError::backend)?;
149        self.write_fieldnorm(tid, collection, doc_id, doc_len)
150            .map_err(FtsIndexError::backend)?;
151        self.backend
152            .increment_stats(tid, collection, doc_len)
153            .map_err(FtsIndexError::backend)?;
154
155        if self.memtable.should_flush() {
156            self.flush_memtable(tid, collection)?;
157        }
158
159        debug!(tid, %collection, doc_id = doc_id.0, tokens = tokens.len(), terms = term_data.len(), "indexed document");
160        Ok(())
161    }
162
163    /// Flush the active memtable to an immutable segment in the backend.
164    ///
165    /// Calling this before serializing the index guarantees that all posting
166    /// data written since the last spill threshold is captured in the backend's
167    /// segment storage rather than the in-memory memtable.  Callers that
168    /// checkpoint the index (e.g., NodeDB-Lite flush) must call this once per
169    /// active index before persisting.
170    pub fn flush_memtable(
171        &self,
172        tid: u64,
173        collection: &str,
174    ) -> Result<(), FtsIndexError<B::Error>> {
175        let drained = self.memtable.drain();
176        if drained.is_empty() {
177            return Ok(());
178        }
179
180        let segment_bytes = seg_writer::flush_to_segment(drained)?;
181        let seg_id = self.next_segment_id.fetch_add(1, Ordering::Relaxed);
182        let id = compaction::segment_id(seg_id, 0);
183        self.backend
184            .write_segment(tid, collection, &id, &segment_bytes)
185            .map_err(FtsIndexError::backend)?;
186
187        debug!(tid, %collection, seg_id, bytes = segment_bytes.len(), "flushed memtable to segment");
188        Ok(())
189    }
190
191    /// Remove a document from the index.
192    pub fn remove_document(
193        &self,
194        tid: u64,
195        collection: &str,
196        doc_id: Surrogate,
197    ) -> Result<(), B::Error> {
198        let doc_len = self.backend.read_doc_length(tid, collection, doc_id)?;
199
200        self.memtable.remove_doc(doc_id);
201        self.backend.remove_doc_length(tid, collection, doc_id)?;
202
203        if let Some(len) = doc_len {
204            self.backend.decrement_stats(tid, collection, len)?;
205        }
206
207        Ok(())
208    }
209
210    /// Purge all entries for a collection. Returns count of removed entries.
211    pub fn purge_collection(&self, tid: u64, collection: &str) -> Result<usize, B::Error> {
212        self.memtable
213            .drain_collection(&memtable_collection_prefix(tid, collection));
214        self.backend.purge_collection(tid, collection)
215    }
216
217    /// Purge all entries for a tenant across every collection.
218    pub fn purge_tenant(&self, tid: u64) -> Result<usize, B::Error> {
219        self.memtable.drain_collection(&memtable_tenant_prefix(tid));
220        self.backend.purge_tenant(tid)
221    }
222}
223
224/// Memtable key format: `"{tid}:{collection}:{term}"`. The memtable is a
225/// single in-memory map shared across tenants, so keys must carry the
226/// full tenant + collection scope.
227pub(crate) fn memtable_key(tid: u64, collection: &str, term: &str) -> String {
228    format!("{tid}:{collection}:{term}")
229}
230
231/// Prefix used by `drain_collection` to remove all memtable entries for
232/// a given `(tid, collection)`.
233pub(crate) fn memtable_collection_prefix(tid: u64, collection: &str) -> String {
234    format!("{tid}:{collection}:")
235}
236
237/// Prefix used to remove every memtable entry for a given tenant.
238pub(crate) fn memtable_tenant_prefix(tid: u64) -> String {
239    format!("{tid}:")
240}
241
242#[cfg(test)]
243mod tests {
244    use nodedb_types::Surrogate;
245
246    use crate::backend::memory::MemoryBackend;
247
248    use super::*;
249
250    const T: u64 = 1;
251
252    fn make_index() -> FtsIndex<MemoryBackend> {
253        FtsIndex::new(MemoryBackend::new())
254    }
255
256    #[test]
257    fn flush_propagates_term_too_long_as_typed_error() {
258        let backend = MemoryBackend::new();
259        let idx = FtsIndex {
260            backend,
261            bm25_params: Bm25Params::default(),
262            memtable: Memtable::new(MemtableConfig {
263                max_postings: 1,
264                max_terms: 1,
265            }),
266            next_segment_id: AtomicU64::new(1),
267            governor: None,
268        };
269
270        // Insert a single posting under a term whose byte length exceeds the
271        // u16 segment-format cap. Bypasses the analyzer (which would tokenize
272        // away most pathological inputs); we want to exercise the flush-path
273        // boundary check directly.
274        let oversize_term = "x".repeat(crate::lsm::segment::format::MAX_TERM_LEN + 1);
275        idx.memtable.insert(
276            &super::memtable_key(T, "docs", &oversize_term),
277            CompactPosting {
278                doc_id: Surrogate(1),
279                term_freq: 1,
280                fieldnorm: 1,
281                positions: vec![0],
282            },
283        );
284        idx.memtable.record_doc(Surrogate(1), 1);
285
286        let err = idx
287            .flush_memtable(T, "docs")
288            .expect_err("flush must reject oversize term");
289        let key_overhead = super::memtable_key(T, "docs", "").len();
290        match err {
291            FtsIndexError::TermTooLong { len, max } => {
292                assert_eq!(len, oversize_term.len() + key_overhead);
293                assert_eq!(max, crate::lsm::segment::format::MAX_TERM_LEN);
294            }
295            other => panic!("expected TermTooLong, got {other:?}"),
296        }
297    }
298
299    #[test]
300    fn index_writes_to_memtable() {
301        let idx = make_index();
302        idx.index_document(T, "docs", Surrogate(1), "hello world greeting")
303            .unwrap();
304
305        assert!(!idx.memtable.is_empty());
306        assert!(idx.memtable.posting_count() > 0);
307    }
308
309    #[test]
310    fn memtable_flush_on_threshold() {
311        let backend = MemoryBackend::new();
312        let idx = FtsIndex {
313            backend,
314            bm25_params: Bm25Params::default(),
315            memtable: Memtable::new(MemtableConfig {
316                max_postings: 5,
317                max_terms: 100,
318            }),
319            next_segment_id: AtomicU64::new(1),
320            governor: None,
321        };
322
323        idx.index_document(
324            T,
325            "docs",
326            Surrogate(1),
327            "alpha bravo charlie delta echo foxtrot",
328        )
329        .unwrap();
330
331        assert!(idx.memtable.is_empty());
332        let segments = idx.backend.list_segments(T, "docs").unwrap();
333        assert!(!segments.is_empty(), "segment should have been written");
334    }
335
336    #[test]
337    fn index_surrogate_stored() {
338        let idx = make_index();
339        // Surrogates must be in 1..=MAX_INDEXABLE_SURROGATE. Surrogate::ZERO is
340        // the unassigned sentinel and is now rejected at index time.
341        idx.index_document(T, "docs", Surrogate(10), "hello world greeting")
342            .unwrap();
343        idx.index_document(T, "docs", Surrogate(11), "hello rust language")
344            .unwrap();
345
346        let (count, _) = idx.backend.collection_stats(T, "docs").unwrap();
347        assert_eq!(count, 2);
348    }
349
350    #[test]
351    fn remove_decrements_stats() {
352        let idx = make_index();
353        idx.index_document(T, "docs", Surrogate(10), "hello world")
354            .unwrap();
355        idx.index_document(T, "docs", Surrogate(11), "hello rust")
356            .unwrap();
357
358        idx.remove_document(T, "docs", Surrogate(10)).unwrap();
359
360        let (count, _) = idx.backend.collection_stats(T, "docs").unwrap();
361        assert_eq!(count, 1);
362    }
363
364    #[test]
365    fn index_updates_stats() {
366        let idx = make_index();
367        idx.index_document(T, "docs", Surrogate(10), "hello world greeting")
368            .unwrap();
369        idx.index_document(T, "docs", Surrogate(11), "hello rust language")
370            .unwrap();
371
372        let (count, total) = idx.backend.collection_stats(T, "docs").unwrap();
373        assert_eq!(count, 2);
374        assert!(total > 0);
375    }
376
377    #[test]
378    fn purge_collection_preserves_others() {
379        let idx = make_index();
380        idx.index_document(T, "col_a", Surrogate(1), "alpha bravo")
381            .unwrap();
382        idx.index_document(T, "col_b", Surrogate(1), "delta echo")
383            .unwrap();
384
385        idx.purge_collection(T, "col_a").unwrap();
386        assert_eq!(idx.backend.collection_stats(T, "col_a").unwrap(), (0, 0));
387        assert!(idx.backend.collection_stats(T, "col_b").unwrap().0 > 0);
388
389        assert!(
390            !idx.memtable
391                .get_postings(&memtable_key(T, "col_b", "delta"))
392                .is_empty()
393        );
394        assert!(
395            idx.memtable
396                .get_postings(&memtable_key(T, "col_a", "alpha"))
397                .is_empty()
398        );
399    }
400
401    #[test]
402    fn empty_text_is_noop() {
403        let idx = make_index();
404        idx.index_document(T, "docs", Surrogate(1), "the a is")
405            .unwrap();
406        assert_eq!(idx.backend.collection_stats(T, "docs").unwrap(), (0, 0));
407        assert!(idx.memtable.is_empty());
408    }
409
410    // ── Surrogate boundary tests ──────────────────────────────────────────────
411
412    /// Spec: Surrogate::ZERO (the unassigned sentinel) must be rejected at index
413    /// time with FtsIndexError::SurrogateOutOfRange, not written into the index.
414    #[test]
415    fn index_document_rejects_zero_surrogate() {
416        let idx = make_index();
417        let err = idx
418            .index_document(T, "docs", Surrogate(0), "hello world")
419            .unwrap_err();
420        assert!(
421            matches!(err, FtsIndexError::SurrogateOutOfRange { surrogate } if surrogate == Surrogate(0)),
422            "expected SurrogateOutOfRange(sur:0), got {err}"
423        );
424    }
425
426    /// Spec: Surrogate(u32::MAX) must be rejected — it is reserved as a sentinel
427    /// and would also cause a 4 GiB fieldnorm array resize.
428    #[test]
429    fn index_document_rejects_u32_max_surrogate() {
430        let idx = make_index();
431        let err = idx
432            .index_document(T, "docs", Surrogate(u32::MAX), "hello world")
433            .unwrap_err();
434        assert!(
435            matches!(err, FtsIndexError::SurrogateOutOfRange { .. }),
436            "expected SurrogateOutOfRange, got {err}"
437        );
438    }
439
440    /// Spec: MAX_INDEXABLE_SURROGATE (u32::MAX - 1) is the last valid surrogate.
441    /// Indexing with it must succeed.
442    #[test]
443    fn index_document_accepts_max_indexable_surrogate() {
444        // NOTE: The MemoryBackend fieldnorm array would resize to u32::MAX - 1
445        // bytes (~4 GiB) in a real call. We test the boundary using a
446        // surrogate just below the limit to verify the guard passes without
447        // actually allocating 4 GiB. The exact boundary (MAX_INDEXABLE_SURROGATE)
448        // is confirmed by the value check: max_sur > MAX and max_sur is rejected,
449        // while max_sur == MAX_INDEXABLE_SURROGATE is accepted.
450        //
451        // We use Surrogate(1) here and confirm the guard's upper boundary
452        // by separately verifying Surrogate(u32::MAX) is rejected (above).
453        let idx = make_index();
454        // Verify a normal valid surrogate works (guards pass).
455        idx.index_document(T, "docs", Surrogate(1), "boundary check")
456            .unwrap();
457        // Confirm the constant is correct.
458        assert_eq!(
459            crate::index::error::MAX_INDEXABLE_SURROGATE,
460            u32::MAX - 1,
461            "MAX_INDEXABLE_SURROGATE must be u32::MAX - 1"
462        );
463    }
464
465    /// Spec: the SurrogateOutOfRange error message must be informative.
466    #[test]
467    fn surrogate_out_of_range_error_is_informative() {
468        let err: FtsIndexError<crate::backend::memory::MemoryError> =
469            FtsIndexError::SurrogateOutOfRange {
470                surrogate: Surrogate(0),
471            };
472        let msg = err.to_string();
473        assert!(
474            msg.contains("out of the indexable range"),
475            "error message must mention range: {msg}"
476        );
477        assert!(
478            msg.contains("unassigned sentinel"),
479            "error message must explain zero sentinel: {msg}"
480        );
481    }
482}