1use std::collections::HashMap;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use nodedb_types::Surrogate;
10use tracing::debug;
11
12use crate::backend::FtsBackend;
13
14use crate::block::CompactPosting;
15use crate::codec::smallfloat;
16use crate::index::error::{FtsIndexError, MAX_INDEXABLE_SURROGATE};
17use crate::lsm::compaction;
18use crate::lsm::memtable::{Memtable, MemtableConfig};
19use crate::lsm::segment::writer as seg_writer;
20use crate::posting::Bm25Params;
21use nodedb_mem::MemoryGovernor;
22
23pub struct FtsIndex<B: FtsBackend> {
39 pub(crate) backend: B,
40 pub(crate) bm25_params: Bm25Params,
41 pub(crate) memtable: Memtable,
42 next_segment_id: AtomicU64,
44 pub(crate) governor: Option<Arc<MemoryGovernor>>,
46}
47
48impl<B: FtsBackend> FtsIndex<B> {
49 pub fn new(backend: B) -> Self {
51 Self {
52 backend,
53 bm25_params: Bm25Params::default(),
54 memtable: Memtable::new(MemtableConfig::default()),
55 next_segment_id: AtomicU64::new(1),
56 governor: None,
57 }
58 }
59
60 pub fn with_params(backend: B, params: Bm25Params) -> Self {
62 Self {
63 backend,
64 bm25_params: params,
65 memtable: Memtable::new(MemtableConfig::default()),
66 next_segment_id: AtomicU64::new(1),
67 governor: None,
68 }
69 }
70
71 pub fn set_governor(&mut self, governor: Arc<MemoryGovernor>) {
78 self.governor = Some(governor);
79 }
80
81 pub fn backend(&self) -> &B {
83 &self.backend
84 }
85
86 pub fn backend_mut(&mut self) -> &mut B {
88 &mut self.backend
89 }
90
91 pub fn memtable(&self) -> &Memtable {
93 &self.memtable
94 }
95
96 pub fn index_document(
106 &self,
107 tid: u64,
108 collection: &str,
109 doc_id: Surrogate,
110 text: &str,
111 ) -> Result<(), FtsIndexError<B::Error>> {
112 let raw = doc_id.as_u32();
113 if raw == 0 || raw > MAX_INDEXABLE_SURROGATE {
114 return Err(FtsIndexError::SurrogateOutOfRange { surrogate: doc_id });
115 }
116
117 let tokens = self
118 .analyze_for_collection(tid, collection, text)
119 .map_err(FtsIndexError::backend)?;
120 if tokens.is_empty() {
121 return Ok(());
122 }
123
124 let mut term_data: HashMap<&str, (u32, Vec<u32>)> = HashMap::new();
125 for (pos, token) in tokens.iter().enumerate() {
126 let entry = term_data.entry(token.as_str()).or_insert((0, Vec::new()));
127 entry.0 += 1;
128 entry.1.push(pos as u32);
129 }
130
131 let doc_len = tokens.len() as u32;
132
133 for (term, (freq, positions)) in &term_data {
134 let compact = CompactPosting {
135 doc_id,
136 term_freq: *freq,
137 fieldnorm: smallfloat::encode(doc_len),
138 positions: positions.clone(),
139 };
140 let scoped_term = memtable_key(tid, collection, term);
141 self.memtable.insert(&scoped_term, compact);
142 }
143 self.memtable.record_doc(doc_id, doc_len);
144
145 self.backend
147 .write_doc_length(tid, collection, doc_id, doc_len)
148 .map_err(FtsIndexError::backend)?;
149 self.write_fieldnorm(tid, collection, doc_id, doc_len)
150 .map_err(FtsIndexError::backend)?;
151 self.backend
152 .increment_stats(tid, collection, doc_len)
153 .map_err(FtsIndexError::backend)?;
154
155 if self.memtable.should_flush() {
156 self.flush_memtable(tid, collection)?;
157 }
158
159 debug!(tid, %collection, doc_id = doc_id.0, tokens = tokens.len(), terms = term_data.len(), "indexed document");
160 Ok(())
161 }
162
163 pub fn flush_memtable(
171 &self,
172 tid: u64,
173 collection: &str,
174 ) -> Result<(), FtsIndexError<B::Error>> {
175 let drained = self.memtable.drain();
176 if drained.is_empty() {
177 return Ok(());
178 }
179
180 let segment_bytes = seg_writer::flush_to_segment(drained)?;
181 let seg_id = self.next_segment_id.fetch_add(1, Ordering::Relaxed);
182 let id = compaction::segment_id(seg_id, 0);
183 self.backend
184 .write_segment(tid, collection, &id, &segment_bytes)
185 .map_err(FtsIndexError::backend)?;
186
187 debug!(tid, %collection, seg_id, bytes = segment_bytes.len(), "flushed memtable to segment");
188 Ok(())
189 }
190
191 pub fn remove_document(
193 &self,
194 tid: u64,
195 collection: &str,
196 doc_id: Surrogate,
197 ) -> Result<(), B::Error> {
198 let doc_len = self.backend.read_doc_length(tid, collection, doc_id)?;
199
200 self.memtable.remove_doc(doc_id);
201 self.backend.remove_doc_length(tid, collection, doc_id)?;
202
203 if let Some(len) = doc_len {
204 self.backend.decrement_stats(tid, collection, len)?;
205 }
206
207 Ok(())
208 }
209
210 pub fn purge_collection(&self, tid: u64, collection: &str) -> Result<usize, B::Error> {
212 self.memtable
213 .drain_collection(&memtable_collection_prefix(tid, collection));
214 self.backend.purge_collection(tid, collection)
215 }
216
217 pub fn purge_tenant(&self, tid: u64) -> Result<usize, B::Error> {
219 self.memtable.drain_collection(&memtable_tenant_prefix(tid));
220 self.backend.purge_tenant(tid)
221 }
222}
223
224pub(crate) fn memtable_key(tid: u64, collection: &str, term: &str) -> String {
228 format!("{tid}:{collection}:{term}")
229}
230
231pub(crate) fn memtable_collection_prefix(tid: u64, collection: &str) -> String {
234 format!("{tid}:{collection}:")
235}
236
237pub(crate) fn memtable_tenant_prefix(tid: u64) -> String {
239 format!("{tid}:")
240}
241
242#[cfg(test)]
243mod tests {
244 use nodedb_types::Surrogate;
245
246 use crate::backend::memory::MemoryBackend;
247
248 use super::*;
249
250 const T: u64 = 1;
251
252 fn make_index() -> FtsIndex<MemoryBackend> {
253 FtsIndex::new(MemoryBackend::new())
254 }
255
256 #[test]
257 fn flush_propagates_term_too_long_as_typed_error() {
258 let backend = MemoryBackend::new();
259 let idx = FtsIndex {
260 backend,
261 bm25_params: Bm25Params::default(),
262 memtable: Memtable::new(MemtableConfig {
263 max_postings: 1,
264 max_terms: 1,
265 }),
266 next_segment_id: AtomicU64::new(1),
267 governor: None,
268 };
269
270 let oversize_term = "x".repeat(crate::lsm::segment::format::MAX_TERM_LEN + 1);
275 idx.memtable.insert(
276 &super::memtable_key(T, "docs", &oversize_term),
277 CompactPosting {
278 doc_id: Surrogate(1),
279 term_freq: 1,
280 fieldnorm: 1,
281 positions: vec![0],
282 },
283 );
284 idx.memtable.record_doc(Surrogate(1), 1);
285
286 let err = idx
287 .flush_memtable(T, "docs")
288 .expect_err("flush must reject oversize term");
289 let key_overhead = super::memtable_key(T, "docs", "").len();
290 match err {
291 FtsIndexError::TermTooLong { len, max } => {
292 assert_eq!(len, oversize_term.len() + key_overhead);
293 assert_eq!(max, crate::lsm::segment::format::MAX_TERM_LEN);
294 }
295 other => panic!("expected TermTooLong, got {other:?}"),
296 }
297 }
298
299 #[test]
300 fn index_writes_to_memtable() {
301 let idx = make_index();
302 idx.index_document(T, "docs", Surrogate(1), "hello world greeting")
303 .unwrap();
304
305 assert!(!idx.memtable.is_empty());
306 assert!(idx.memtable.posting_count() > 0);
307 }
308
309 #[test]
310 fn memtable_flush_on_threshold() {
311 let backend = MemoryBackend::new();
312 let idx = FtsIndex {
313 backend,
314 bm25_params: Bm25Params::default(),
315 memtable: Memtable::new(MemtableConfig {
316 max_postings: 5,
317 max_terms: 100,
318 }),
319 next_segment_id: AtomicU64::new(1),
320 governor: None,
321 };
322
323 idx.index_document(
324 T,
325 "docs",
326 Surrogate(1),
327 "alpha bravo charlie delta echo foxtrot",
328 )
329 .unwrap();
330
331 assert!(idx.memtable.is_empty());
332 let segments = idx.backend.list_segments(T, "docs").unwrap();
333 assert!(!segments.is_empty(), "segment should have been written");
334 }
335
336 #[test]
337 fn index_surrogate_stored() {
338 let idx = make_index();
339 idx.index_document(T, "docs", Surrogate(10), "hello world greeting")
342 .unwrap();
343 idx.index_document(T, "docs", Surrogate(11), "hello rust language")
344 .unwrap();
345
346 let (count, _) = idx.backend.collection_stats(T, "docs").unwrap();
347 assert_eq!(count, 2);
348 }
349
350 #[test]
351 fn remove_decrements_stats() {
352 let idx = make_index();
353 idx.index_document(T, "docs", Surrogate(10), "hello world")
354 .unwrap();
355 idx.index_document(T, "docs", Surrogate(11), "hello rust")
356 .unwrap();
357
358 idx.remove_document(T, "docs", Surrogate(10)).unwrap();
359
360 let (count, _) = idx.backend.collection_stats(T, "docs").unwrap();
361 assert_eq!(count, 1);
362 }
363
364 #[test]
365 fn index_updates_stats() {
366 let idx = make_index();
367 idx.index_document(T, "docs", Surrogate(10), "hello world greeting")
368 .unwrap();
369 idx.index_document(T, "docs", Surrogate(11), "hello rust language")
370 .unwrap();
371
372 let (count, total) = idx.backend.collection_stats(T, "docs").unwrap();
373 assert_eq!(count, 2);
374 assert!(total > 0);
375 }
376
377 #[test]
378 fn purge_collection_preserves_others() {
379 let idx = make_index();
380 idx.index_document(T, "col_a", Surrogate(1), "alpha bravo")
381 .unwrap();
382 idx.index_document(T, "col_b", Surrogate(1), "delta echo")
383 .unwrap();
384
385 idx.purge_collection(T, "col_a").unwrap();
386 assert_eq!(idx.backend.collection_stats(T, "col_a").unwrap(), (0, 0));
387 assert!(idx.backend.collection_stats(T, "col_b").unwrap().0 > 0);
388
389 assert!(
390 !idx.memtable
391 .get_postings(&memtable_key(T, "col_b", "delta"))
392 .is_empty()
393 );
394 assert!(
395 idx.memtable
396 .get_postings(&memtable_key(T, "col_a", "alpha"))
397 .is_empty()
398 );
399 }
400
401 #[test]
402 fn empty_text_is_noop() {
403 let idx = make_index();
404 idx.index_document(T, "docs", Surrogate(1), "the a is")
405 .unwrap();
406 assert_eq!(idx.backend.collection_stats(T, "docs").unwrap(), (0, 0));
407 assert!(idx.memtable.is_empty());
408 }
409
410 #[test]
415 fn index_document_rejects_zero_surrogate() {
416 let idx = make_index();
417 let err = idx
418 .index_document(T, "docs", Surrogate(0), "hello world")
419 .unwrap_err();
420 assert!(
421 matches!(err, FtsIndexError::SurrogateOutOfRange { surrogate } if surrogate == Surrogate(0)),
422 "expected SurrogateOutOfRange(sur:0), got {err}"
423 );
424 }
425
426 #[test]
429 fn index_document_rejects_u32_max_surrogate() {
430 let idx = make_index();
431 let err = idx
432 .index_document(T, "docs", Surrogate(u32::MAX), "hello world")
433 .unwrap_err();
434 assert!(
435 matches!(err, FtsIndexError::SurrogateOutOfRange { .. }),
436 "expected SurrogateOutOfRange, got {err}"
437 );
438 }
439
440 #[test]
443 fn index_document_accepts_max_indexable_surrogate() {
444 let idx = make_index();
454 idx.index_document(T, "docs", Surrogate(1), "boundary check")
456 .unwrap();
457 assert_eq!(
459 crate::index::error::MAX_INDEXABLE_SURROGATE,
460 u32::MAX - 1,
461 "MAX_INDEXABLE_SURROGATE must be u32::MAX - 1"
462 );
463 }
464
465 #[test]
467 fn surrogate_out_of_range_error_is_informative() {
468 let err: FtsIndexError<crate::backend::memory::MemoryError> =
469 FtsIndexError::SurrogateOutOfRange {
470 surrogate: Surrogate(0),
471 };
472 let msg = err.to_string();
473 assert!(
474 msg.contains("out of the indexable range"),
475 "error message must mention range: {msg}"
476 );
477 assert!(
478 msg.contains("unassigned sentinel"),
479 "error message must explain zero sentinel: {msg}"
480 );
481 }
482}