1use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use tracing::debug;
7
8use crate::backend::FtsBackend;
9use crate::block::CompactPosting;
10use crate::codec::DocIdMap;
11use crate::codec::smallfloat;
12use crate::lsm::compaction;
13use crate::lsm::memtable::{Memtable, MemtableConfig};
14use crate::lsm::segment::writer as seg_writer;
15use crate::posting::Bm25Params;
16
17pub struct FtsIndex<B: FtsBackend> {
27 pub(crate) backend: B,
28 pub(crate) bm25_params: Bm25Params,
29 pub(crate) memtable: Memtable,
30 next_segment_id: AtomicU64,
32}
33
34impl<B: FtsBackend> FtsIndex<B> {
35 pub fn new(backend: B) -> Self {
37 Self {
38 backend,
39 bm25_params: Bm25Params::default(),
40 memtable: Memtable::new(MemtableConfig::default()),
41 next_segment_id: AtomicU64::new(1),
42 }
43 }
44
45 pub fn with_params(backend: B, params: Bm25Params) -> Self {
47 Self {
48 backend,
49 bm25_params: params,
50 memtable: Memtable::new(MemtableConfig::default()),
51 next_segment_id: AtomicU64::new(1),
52 }
53 }
54
55 pub fn backend(&self) -> &B {
57 &self.backend
58 }
59
60 pub fn backend_mut(&mut self) -> &mut B {
62 &mut self.backend
63 }
64
65 pub fn memtable(&self) -> &Memtable {
67 &self.memtable
68 }
69
70 pub fn load_doc_id_map(&self, tid: u32, collection: &str) -> Result<DocIdMap, B::Error> {
72 match self.backend.read_meta(tid, collection, "docmap")? {
73 Some(bytes) => Ok(DocIdMap::from_bytes(&bytes).unwrap_or_default()),
74 None => Ok(DocIdMap::new()),
75 }
76 }
77
78 fn save_doc_id_map(&self, tid: u32, collection: &str, map: &DocIdMap) -> Result<(), B::Error> {
80 self.backend
81 .write_meta(tid, collection, "docmap", &map.to_bytes())
82 }
83
84 pub fn index_document(
86 &self,
87 tid: u32,
88 collection: &str,
89 doc_id: &str,
90 text: &str,
91 ) -> Result<(), B::Error> {
92 let tokens = self.analyze_for_collection(tid, collection, text)?;
93 if tokens.is_empty() {
94 return Ok(());
95 }
96
97 let mut doc_map = self.load_doc_id_map(tid, collection)?;
98 let int_id = doc_map.get_or_assign(doc_id);
99 self.save_doc_id_map(tid, collection, &doc_map)?;
100
101 let mut term_data: HashMap<&str, (u32, Vec<u32>)> = HashMap::new();
102 for (pos, token) in tokens.iter().enumerate() {
103 let entry = term_data.entry(token.as_str()).or_insert((0, Vec::new()));
104 entry.0 += 1;
105 entry.1.push(pos as u32);
106 }
107
108 let doc_len = tokens.len() as u32;
109
110 for (term, (freq, positions)) in &term_data {
111 let compact = CompactPosting {
112 doc_id: int_id,
113 term_freq: *freq,
114 fieldnorm: smallfloat::encode(doc_len),
115 positions: positions.clone(),
116 };
117 let scoped_term = memtable_key(tid, collection, term);
118 self.memtable.insert(&scoped_term, compact);
119 }
120 self.memtable.record_doc(int_id, doc_len);
121
122 self.backend
124 .write_doc_length(tid, collection, doc_id, doc_len)?;
125 self.write_fieldnorm(tid, collection, int_id, doc_len)?;
126 self.backend.increment_stats(tid, collection, doc_len)?;
127
128 if self.memtable.should_flush() {
129 self.flush_memtable(tid, collection)?;
130 }
131
132 debug!(tid, %collection, %doc_id, int_id, tokens = tokens.len(), terms = term_data.len(), "indexed document");
133 Ok(())
134 }
135
136 fn flush_memtable(&self, tid: u32, collection: &str) -> Result<(), B::Error> {
138 let drained = self.memtable.drain();
139 if drained.is_empty() {
140 return Ok(());
141 }
142
143 let segment_bytes = seg_writer::flush_to_segment(drained);
144 let seg_id = self.next_segment_id.fetch_add(1, Ordering::Relaxed);
145 let id = compaction::segment_id(seg_id, 0);
146 self.backend
147 .write_segment(tid, collection, &id, &segment_bytes)?;
148
149 debug!(tid, %collection, seg_id, bytes = segment_bytes.len(), "flushed memtable to segment");
150 Ok(())
151 }
152
153 pub fn remove_document(
155 &self,
156 tid: u32,
157 collection: &str,
158 doc_id: &str,
159 ) -> Result<(), B::Error> {
160 let doc_len = self.backend.read_doc_length(tid, collection, doc_id)?;
161
162 let mut doc_map = self.load_doc_id_map(tid, collection)?;
163 if let Some(int_id) = doc_map.to_u32(doc_id) {
164 self.memtable.remove_doc(int_id);
165 }
166 doc_map.remove(doc_id);
167 self.save_doc_id_map(tid, collection, &doc_map)?;
168
169 self.backend.remove_doc_length(tid, collection, doc_id)?;
170
171 if let Some(len) = doc_len {
172 self.backend.decrement_stats(tid, collection, len)?;
173 }
174
175 Ok(())
176 }
177
178 pub fn purge_collection(&self, tid: u32, collection: &str) -> Result<usize, B::Error> {
180 self.memtable
181 .drain_collection(&memtable_collection_prefix(tid, collection));
182 self.backend.purge_collection(tid, collection)
183 }
184
185 pub fn purge_tenant(&self, tid: u32) -> Result<usize, B::Error> {
187 self.memtable.drain_collection(&memtable_tenant_prefix(tid));
188 self.backend.purge_tenant(tid)
189 }
190}
191
192pub(crate) fn memtable_key(tid: u32, collection: &str, term: &str) -> String {
196 format!("{tid}:{collection}:{term}")
197}
198
199pub(crate) fn memtable_collection_prefix(tid: u32, collection: &str) -> String {
202 format!("{tid}:{collection}:")
203}
204
205pub(crate) fn memtable_tenant_prefix(tid: u32) -> String {
207 format!("{tid}:")
208}
209
210#[cfg(test)]
211mod tests {
212 use crate::backend::memory::MemoryBackend;
213
214 use super::*;
215
216 const T: u32 = 1;
217
218 fn make_index() -> FtsIndex<MemoryBackend> {
219 FtsIndex::new(MemoryBackend::new())
220 }
221
222 #[test]
223 fn index_writes_to_memtable() {
224 let idx = make_index();
225 idx.index_document(T, "docs", "d1", "hello world greeting")
226 .unwrap();
227
228 assert!(!idx.memtable.is_empty());
229 assert!(idx.memtable.posting_count() > 0);
230 }
231
232 #[test]
233 fn memtable_flush_on_threshold() {
234 let backend = MemoryBackend::new();
235 let idx = FtsIndex {
236 backend,
237 bm25_params: Bm25Params::default(),
238 memtable: Memtable::new(MemtableConfig {
239 max_postings: 5,
240 max_terms: 100,
241 }),
242 next_segment_id: AtomicU64::new(1),
243 };
244
245 idx.index_document(T, "docs", "d1", "alpha bravo charlie delta echo foxtrot")
246 .unwrap();
247
248 assert!(idx.memtable.is_empty());
249 let segments = idx.backend.list_segments(T, "docs").unwrap();
250 assert!(!segments.is_empty(), "segment should have been written");
251 }
252
253 #[test]
254 fn index_assigns_doc_ids() {
255 let idx = make_index();
256 idx.index_document(T, "docs", "d1", "hello world greeting")
257 .unwrap();
258 idx.index_document(T, "docs", "d2", "hello rust language")
259 .unwrap();
260
261 let map = idx.load_doc_id_map(T, "docs").unwrap();
262 assert_eq!(map.to_u32("d1"), Some(0));
263 assert_eq!(map.to_u32("d2"), Some(1));
264 }
265
266 #[test]
267 fn remove_tombstones_docmap() {
268 let idx = make_index();
269 idx.index_document(T, "docs", "d1", "hello world").unwrap();
270 idx.index_document(T, "docs", "d2", "hello rust").unwrap();
271
272 idx.remove_document(T, "docs", "d1").unwrap();
273
274 let map = idx.load_doc_id_map(T, "docs").unwrap();
275 assert_eq!(map.to_u32("d1"), None);
276 assert_eq!(map.to_u32("d2"), Some(1));
277 }
278
279 #[test]
280 fn index_updates_stats() {
281 let idx = make_index();
282 idx.index_document(T, "docs", "d1", "hello world greeting")
283 .unwrap();
284 idx.index_document(T, "docs", "d2", "hello rust language")
285 .unwrap();
286
287 let (count, total) = idx.backend.collection_stats(T, "docs").unwrap();
288 assert_eq!(count, 2);
289 assert!(total > 0);
290 }
291
292 #[test]
293 fn remove_decrements_stats() {
294 let idx = make_index();
295 idx.index_document(T, "docs", "d1", "hello world").unwrap();
296 idx.index_document(T, "docs", "d2", "hello rust").unwrap();
297
298 idx.remove_document(T, "docs", "d1").unwrap();
299
300 let (count, _) = idx.backend.collection_stats(T, "docs").unwrap();
301 assert_eq!(count, 1);
302 }
303
304 #[test]
305 fn purge_collection_preserves_others() {
306 let idx = make_index();
307 idx.index_document(T, "col_a", "d1", "alpha bravo").unwrap();
308 idx.index_document(T, "col_b", "d1", "delta echo").unwrap();
309
310 idx.purge_collection(T, "col_a").unwrap();
311 assert_eq!(idx.backend.collection_stats(T, "col_a").unwrap(), (0, 0));
312 assert!(idx.backend.collection_stats(T, "col_b").unwrap().0 > 0);
313
314 assert!(
315 !idx.memtable
316 .get_postings(&memtable_key(T, "col_b", "delta"))
317 .is_empty()
318 );
319 assert!(
320 idx.memtable
321 .get_postings(&memtable_key(T, "col_a", "alpha"))
322 .is_empty()
323 );
324 }
325
326 #[test]
327 fn empty_text_is_noop() {
328 let idx = make_index();
329 idx.index_document(T, "docs", "d1", "the a is").unwrap();
330 assert_eq!(idx.backend.collection_stats(T, "docs").unwrap(), (0, 0));
331 assert!(idx.memtable.is_empty());
332 }
333}