1use std::collections::HashMap;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use nodedb_types::Surrogate;
10use tracing::debug;
11
12use crate::backend::FtsBackend;
13
14use crate::block::CompactPosting;
15use crate::codec::smallfloat;
16use crate::index::error::{FtsIndexError, MAX_INDEXABLE_SURROGATE};
17use crate::lsm::compaction;
18use crate::lsm::memtable::{Memtable, MemtableConfig};
19use crate::lsm::segment::writer as seg_writer;
20use crate::posting::Bm25Params;
21use nodedb_mem::MemoryGovernor;
22
23pub struct FtsIndex<B: FtsBackend> {
39 pub(crate) backend: B,
40 pub(crate) bm25_params: Bm25Params,
41 pub(crate) memtable: Memtable,
42 next_segment_id: AtomicU64,
44 pub(crate) governor: Option<Arc<MemoryGovernor>>,
46}
47
48impl<B: FtsBackend> FtsIndex<B> {
49 pub fn new(backend: B) -> Self {
51 Self {
52 backend,
53 bm25_params: Bm25Params::default(),
54 memtable: Memtable::new(MemtableConfig::default()),
55 next_segment_id: AtomicU64::new(1),
56 governor: None,
57 }
58 }
59
60 pub fn with_params(backend: B, params: Bm25Params) -> Self {
62 Self {
63 backend,
64 bm25_params: params,
65 memtable: Memtable::new(MemtableConfig::default()),
66 next_segment_id: AtomicU64::new(1),
67 governor: None,
68 }
69 }
70
71 pub fn set_governor(&mut self, governor: Arc<MemoryGovernor>) {
78 self.governor = Some(governor);
79 }
80
81 pub fn backend(&self) -> &B {
83 &self.backend
84 }
85
86 pub fn backend_mut(&mut self) -> &mut B {
88 &mut self.backend
89 }
90
91 pub fn memtable(&self) -> &Memtable {
93 &self.memtable
94 }
95
96 pub fn index_document(
106 &self,
107 tid: u64,
108 collection: &str,
109 doc_id: Surrogate,
110 text: &str,
111 ) -> Result<(), FtsIndexError<B::Error>> {
112 let raw = doc_id.as_u32();
113 if raw == 0 || raw > MAX_INDEXABLE_SURROGATE {
114 return Err(FtsIndexError::SurrogateOutOfRange { surrogate: doc_id });
115 }
116
117 let tokens = self
118 .analyze_for_collection(tid, collection, text)
119 .map_err(FtsIndexError::backend)?;
120 if tokens.is_empty() {
121 return Ok(());
122 }
123
124 let mut term_data: HashMap<&str, (u32, Vec<u32>)> = HashMap::new();
125 for (pos, token) in tokens.iter().enumerate() {
126 let entry = term_data.entry(token.as_str()).or_insert((0, Vec::new()));
127 entry.0 += 1;
128 entry.1.push(pos as u32);
129 }
130
131 let doc_len = tokens.len() as u32;
132
133 for (term, (freq, positions)) in &term_data {
134 let compact = CompactPosting {
135 doc_id,
136 term_freq: *freq,
137 fieldnorm: smallfloat::encode(doc_len),
138 positions: positions.clone(),
139 };
140 let scoped_term = memtable_key(tid, collection, term);
141 self.memtable.insert(&scoped_term, compact);
142 }
143 self.memtable.record_doc(doc_id, doc_len);
144
145 self.backend
147 .write_doc_length(tid, collection, doc_id, doc_len)
148 .map_err(FtsIndexError::backend)?;
149 self.write_fieldnorm(tid, collection, doc_id, doc_len)
150 .map_err(FtsIndexError::backend)?;
151 self.backend
152 .increment_stats(tid, collection, doc_len)
153 .map_err(FtsIndexError::backend)?;
154
155 if self.memtable.should_flush() {
156 self.flush_memtable(tid, collection)?;
157 }
158
159 debug!(tid, %collection, doc_id = doc_id.0, tokens = tokens.len(), terms = term_data.len(), "indexed document");
160 Ok(())
161 }
162
163 fn flush_memtable(&self, tid: u64, collection: &str) -> Result<(), FtsIndexError<B::Error>> {
165 let drained = self.memtable.drain();
166 if drained.is_empty() {
167 return Ok(());
168 }
169
170 let segment_bytes = seg_writer::flush_to_segment(drained)?;
171 let seg_id = self.next_segment_id.fetch_add(1, Ordering::Relaxed);
172 let id = compaction::segment_id(seg_id, 0);
173 self.backend
174 .write_segment(tid, collection, &id, &segment_bytes)
175 .map_err(FtsIndexError::backend)?;
176
177 debug!(tid, %collection, seg_id, bytes = segment_bytes.len(), "flushed memtable to segment");
178 Ok(())
179 }
180
181 pub fn remove_document(
183 &self,
184 tid: u64,
185 collection: &str,
186 doc_id: Surrogate,
187 ) -> Result<(), B::Error> {
188 let doc_len = self.backend.read_doc_length(tid, collection, doc_id)?;
189
190 self.memtable.remove_doc(doc_id);
191 self.backend.remove_doc_length(tid, collection, doc_id)?;
192
193 if let Some(len) = doc_len {
194 self.backend.decrement_stats(tid, collection, len)?;
195 }
196
197 Ok(())
198 }
199
200 pub fn purge_collection(&self, tid: u64, collection: &str) -> Result<usize, B::Error> {
202 self.memtable
203 .drain_collection(&memtable_collection_prefix(tid, collection));
204 self.backend.purge_collection(tid, collection)
205 }
206
207 pub fn purge_tenant(&self, tid: u64) -> Result<usize, B::Error> {
209 self.memtable.drain_collection(&memtable_tenant_prefix(tid));
210 self.backend.purge_tenant(tid)
211 }
212}
213
214pub(crate) fn memtable_key(tid: u64, collection: &str, term: &str) -> String {
218 format!("{tid}:{collection}:{term}")
219}
220
221pub(crate) fn memtable_collection_prefix(tid: u64, collection: &str) -> String {
224 format!("{tid}:{collection}:")
225}
226
227pub(crate) fn memtable_tenant_prefix(tid: u64) -> String {
229 format!("{tid}:")
230}
231
232#[cfg(test)]
233mod tests {
234 use nodedb_types::Surrogate;
235
236 use crate::backend::memory::MemoryBackend;
237
238 use super::*;
239
240 const T: u64 = 1;
241
242 fn make_index() -> FtsIndex<MemoryBackend> {
243 FtsIndex::new(MemoryBackend::new())
244 }
245
246 #[test]
247 fn flush_propagates_term_too_long_as_typed_error() {
248 let backend = MemoryBackend::new();
249 let idx = FtsIndex {
250 backend,
251 bm25_params: Bm25Params::default(),
252 memtable: Memtable::new(MemtableConfig {
253 max_postings: 1,
254 max_terms: 1,
255 }),
256 next_segment_id: AtomicU64::new(1),
257 governor: None,
258 };
259
260 let oversize_term = "x".repeat(crate::lsm::segment::format::MAX_TERM_LEN + 1);
265 idx.memtable.insert(
266 &super::memtable_key(T, "docs", &oversize_term),
267 CompactPosting {
268 doc_id: Surrogate(1),
269 term_freq: 1,
270 fieldnorm: 1,
271 positions: vec![0],
272 },
273 );
274 idx.memtable.record_doc(Surrogate(1), 1);
275
276 let err = idx
277 .flush_memtable(T, "docs")
278 .expect_err("flush must reject oversize term");
279 let key_overhead = super::memtable_key(T, "docs", "").len();
280 match err {
281 FtsIndexError::TermTooLong { len, max } => {
282 assert_eq!(len, oversize_term.len() + key_overhead);
283 assert_eq!(max, crate::lsm::segment::format::MAX_TERM_LEN);
284 }
285 other => panic!("expected TermTooLong, got {other:?}"),
286 }
287 }
288
289 #[test]
290 fn index_writes_to_memtable() {
291 let idx = make_index();
292 idx.index_document(T, "docs", Surrogate(1), "hello world greeting")
293 .unwrap();
294
295 assert!(!idx.memtable.is_empty());
296 assert!(idx.memtable.posting_count() > 0);
297 }
298
299 #[test]
300 fn memtable_flush_on_threshold() {
301 let backend = MemoryBackend::new();
302 let idx = FtsIndex {
303 backend,
304 bm25_params: Bm25Params::default(),
305 memtable: Memtable::new(MemtableConfig {
306 max_postings: 5,
307 max_terms: 100,
308 }),
309 next_segment_id: AtomicU64::new(1),
310 governor: None,
311 };
312
313 idx.index_document(
314 T,
315 "docs",
316 Surrogate(1),
317 "alpha bravo charlie delta echo foxtrot",
318 )
319 .unwrap();
320
321 assert!(idx.memtable.is_empty());
322 let segments = idx.backend.list_segments(T, "docs").unwrap();
323 assert!(!segments.is_empty(), "segment should have been written");
324 }
325
326 #[test]
327 fn index_surrogate_stored() {
328 let idx = make_index();
329 idx.index_document(T, "docs", Surrogate(10), "hello world greeting")
332 .unwrap();
333 idx.index_document(T, "docs", Surrogate(11), "hello rust language")
334 .unwrap();
335
336 let (count, _) = idx.backend.collection_stats(T, "docs").unwrap();
337 assert_eq!(count, 2);
338 }
339
340 #[test]
341 fn remove_decrements_stats() {
342 let idx = make_index();
343 idx.index_document(T, "docs", Surrogate(10), "hello world")
344 .unwrap();
345 idx.index_document(T, "docs", Surrogate(11), "hello rust")
346 .unwrap();
347
348 idx.remove_document(T, "docs", Surrogate(10)).unwrap();
349
350 let (count, _) = idx.backend.collection_stats(T, "docs").unwrap();
351 assert_eq!(count, 1);
352 }
353
354 #[test]
355 fn index_updates_stats() {
356 let idx = make_index();
357 idx.index_document(T, "docs", Surrogate(10), "hello world greeting")
358 .unwrap();
359 idx.index_document(T, "docs", Surrogate(11), "hello rust language")
360 .unwrap();
361
362 let (count, total) = idx.backend.collection_stats(T, "docs").unwrap();
363 assert_eq!(count, 2);
364 assert!(total > 0);
365 }
366
367 #[test]
368 fn purge_collection_preserves_others() {
369 let idx = make_index();
370 idx.index_document(T, "col_a", Surrogate(1), "alpha bravo")
371 .unwrap();
372 idx.index_document(T, "col_b", Surrogate(1), "delta echo")
373 .unwrap();
374
375 idx.purge_collection(T, "col_a").unwrap();
376 assert_eq!(idx.backend.collection_stats(T, "col_a").unwrap(), (0, 0));
377 assert!(idx.backend.collection_stats(T, "col_b").unwrap().0 > 0);
378
379 assert!(
380 !idx.memtable
381 .get_postings(&memtable_key(T, "col_b", "delta"))
382 .is_empty()
383 );
384 assert!(
385 idx.memtable
386 .get_postings(&memtable_key(T, "col_a", "alpha"))
387 .is_empty()
388 );
389 }
390
391 #[test]
392 fn empty_text_is_noop() {
393 let idx = make_index();
394 idx.index_document(T, "docs", Surrogate(1), "the a is")
395 .unwrap();
396 assert_eq!(idx.backend.collection_stats(T, "docs").unwrap(), (0, 0));
397 assert!(idx.memtable.is_empty());
398 }
399
400 #[test]
405 fn index_document_rejects_zero_surrogate() {
406 let idx = make_index();
407 let err = idx
408 .index_document(T, "docs", Surrogate(0), "hello world")
409 .unwrap_err();
410 assert!(
411 matches!(err, FtsIndexError::SurrogateOutOfRange { surrogate } if surrogate == Surrogate(0)),
412 "expected SurrogateOutOfRange(sur:0), got {err}"
413 );
414 }
415
416 #[test]
419 fn index_document_rejects_u32_max_surrogate() {
420 let idx = make_index();
421 let err = idx
422 .index_document(T, "docs", Surrogate(u32::MAX), "hello world")
423 .unwrap_err();
424 assert!(
425 matches!(err, FtsIndexError::SurrogateOutOfRange { .. }),
426 "expected SurrogateOutOfRange, got {err}"
427 );
428 }
429
430 #[test]
433 fn index_document_accepts_max_indexable_surrogate() {
434 let idx = make_index();
444 idx.index_document(T, "docs", Surrogate(1), "boundary check")
446 .unwrap();
447 assert_eq!(
449 crate::index::error::MAX_INDEXABLE_SURROGATE,
450 u32::MAX - 1,
451 "MAX_INDEXABLE_SURROGATE must be u32::MAX - 1"
452 );
453 }
454
455 #[test]
457 fn surrogate_out_of_range_error_is_informative() {
458 let err: FtsIndexError<crate::backend::memory::MemoryError> =
459 FtsIndexError::SurrogateOutOfRange {
460 surrogate: Surrogate(0),
461 };
462 let msg = err.to_string();
463 assert!(
464 msg.contains("out of the indexable range"),
465 "error message must mention range: {msg}"
466 );
467 assert!(
468 msg.contains("unassigned sentinel"),
469 "error message must explain zero sentinel: {msg}"
470 );
471 }
472}