1use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use tracing::debug;
7
8use crate::backend::FtsBackend;
9use crate::block::CompactPosting;
10use crate::codec::DocIdMap;
11use crate::codec::smallfloat;
12use crate::lsm::compaction;
13use crate::lsm::memtable::{Memtable, MemtableConfig};
14use crate::lsm::segment::writer as seg_writer;
15use crate::posting::Bm25Params;
16
17pub struct FtsIndex<B: FtsBackend> {
27 pub(crate) backend: B,
28 pub(crate) bm25_params: Bm25Params,
29 pub(crate) memtable: Memtable,
30 next_segment_id: AtomicU64,
32}
33
34impl<B: FtsBackend> FtsIndex<B> {
35 pub fn new(backend: B) -> Self {
37 Self {
38 backend,
39 bm25_params: Bm25Params::default(),
40 memtable: Memtable::new(MemtableConfig::default()),
41 next_segment_id: AtomicU64::new(1),
42 }
43 }
44
45 pub fn with_params(backend: B, params: Bm25Params) -> Self {
47 Self {
48 backend,
49 bm25_params: params,
50 memtable: Memtable::new(MemtableConfig::default()),
51 next_segment_id: AtomicU64::new(1),
52 }
53 }
54
55 pub fn backend(&self) -> &B {
57 &self.backend
58 }
59
60 pub fn backend_mut(&mut self) -> &mut B {
62 &mut self.backend
63 }
64
65 pub fn memtable(&self) -> &Memtable {
67 &self.memtable
68 }
69
70 pub fn load_doc_id_map(&self, collection: &str) -> Result<DocIdMap, B::Error> {
72 let key = format!("{collection}:docmap");
73 match self.backend.read_meta(&key)? {
74 Some(bytes) => Ok(DocIdMap::from_bytes(&bytes).unwrap_or_default()),
75 None => Ok(DocIdMap::new()),
76 }
77 }
78
79 fn save_doc_id_map(&self, collection: &str, map: &DocIdMap) -> Result<(), B::Error> {
81 let key = format!("{collection}:docmap");
82 self.backend.write_meta(&key, &map.to_bytes())
83 }
84
85 pub fn index_document(
90 &self,
91 collection: &str,
92 doc_id: &str,
93 text: &str,
94 ) -> Result<(), B::Error> {
95 let tokens = self.analyze_for_collection(collection, text)?;
96 if tokens.is_empty() {
97 return Ok(());
98 }
99
100 let mut doc_map = self.load_doc_id_map(collection)?;
102 let int_id = doc_map.get_or_assign(doc_id);
103 self.save_doc_id_map(collection, &doc_map)?;
104
105 let mut term_data: HashMap<&str, (u32, Vec<u32>)> = HashMap::new();
107 for (pos, token) in tokens.iter().enumerate() {
108 let entry = term_data.entry(token.as_str()).or_insert((0, Vec::new()));
109 entry.0 += 1;
110 entry.1.push(pos as u32);
111 }
112
113 let doc_len = tokens.len() as u32;
114
115 for (term, (freq, positions)) in &term_data {
117 let compact = CompactPosting {
118 doc_id: int_id,
119 term_freq: *freq,
120 fieldnorm: smallfloat::encode(doc_len),
121 positions: positions.clone(),
122 };
123 let scoped_term = format!("{collection}:{term}");
124 self.memtable.insert(&scoped_term, compact);
125 }
126 self.memtable.record_doc(int_id, doc_len);
127
128 self.backend.write_doc_length(collection, doc_id, doc_len)?;
134 self.write_fieldnorm(collection, int_id, doc_len)?;
135 self.backend.increment_stats(collection, doc_len)?;
136
137 if self.memtable.should_flush() {
139 self.flush_memtable(collection)?;
140 }
141
142 debug!(%collection, %doc_id, int_id, tokens = tokens.len(), terms = term_data.len(), "indexed document");
143 Ok(())
144 }
145
146 fn flush_memtable(&self, collection: &str) -> Result<(), B::Error> {
148 let drained = self.memtable.drain();
149 if drained.is_empty() {
150 return Ok(());
151 }
152
153 let segment_bytes = seg_writer::flush_to_segment(drained);
154 let seg_id = self.next_segment_id.fetch_add(1, Ordering::Relaxed);
155 let key = compaction::segment_key(collection, seg_id, 0);
156 self.backend.write_segment(&key, &segment_bytes)?;
157
158 debug!(%collection, seg_id, bytes = segment_bytes.len(), "flushed memtable to segment");
159 Ok(())
160 }
161
162 pub fn remove_document(&self, collection: &str, doc_id: &str) -> Result<(), B::Error> {
168 let doc_len = self.backend.read_doc_length(collection, doc_id)?;
170
171 let mut doc_map = self.load_doc_id_map(collection)?;
173 if let Some(int_id) = doc_map.to_u32(doc_id) {
174 self.memtable.remove_doc(int_id);
175 }
176 doc_map.remove(doc_id);
177 self.save_doc_id_map(collection, &doc_map)?;
178
179 self.backend.remove_doc_length(collection, doc_id)?;
180
181 if let Some(len) = doc_len {
182 self.backend.decrement_stats(collection, len)?;
183 }
184
185 Ok(())
186 }
187
188 pub fn purge_collection(&self, collection: &str) -> Result<usize, B::Error> {
190 self.memtable.drain_collection(collection);
192 self.backend.purge_collection(collection)
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use crate::backend::memory::MemoryBackend;
199
200 use super::*;
201
202 fn make_index() -> FtsIndex<MemoryBackend> {
203 FtsIndex::new(MemoryBackend::new())
204 }
205
206 #[test]
207 fn index_writes_to_memtable() {
208 let idx = make_index();
209 idx.index_document("docs", "d1", "hello world greeting")
210 .unwrap();
211
212 assert!(!idx.memtable.is_empty());
214 assert!(idx.memtable.posting_count() > 0);
215 }
216
217 #[test]
218 fn memtable_flush_on_threshold() {
219 let backend = MemoryBackend::new();
221 let idx = FtsIndex {
222 backend,
223 bm25_params: Bm25Params::default(),
224 memtable: Memtable::new(MemtableConfig {
225 max_postings: 5,
226 max_terms: 100,
227 }),
228 next_segment_id: AtomicU64::new(1),
229 };
230
231 idx.index_document("docs", "d1", "alpha bravo charlie delta echo foxtrot")
233 .unwrap();
234
235 assert!(idx.memtable.is_empty());
237 let segments = idx.backend.list_segments("docs").unwrap();
238 assert!(!segments.is_empty(), "segment should have been written");
239 }
240
241 #[test]
242 fn index_assigns_doc_ids() {
243 let idx = make_index();
244 idx.index_document("docs", "d1", "hello world greeting")
245 .unwrap();
246 idx.index_document("docs", "d2", "hello rust language")
247 .unwrap();
248
249 let map = idx.load_doc_id_map("docs").unwrap();
250 assert_eq!(map.to_u32("d1"), Some(0));
251 assert_eq!(map.to_u32("d2"), Some(1));
252 }
253
254 #[test]
255 fn remove_tombstones_docmap() {
256 let idx = make_index();
257 idx.index_document("docs", "d1", "hello world").unwrap();
258 idx.index_document("docs", "d2", "hello rust").unwrap();
259
260 idx.remove_document("docs", "d1").unwrap();
261
262 let map = idx.load_doc_id_map("docs").unwrap();
264 assert_eq!(map.to_u32("d1"), None);
265 assert_eq!(map.to_u32("d2"), Some(1));
266 }
267
268 #[test]
269 fn index_updates_stats() {
270 let idx = make_index();
271 idx.index_document("docs", "d1", "hello world greeting")
272 .unwrap();
273 idx.index_document("docs", "d2", "hello rust language")
274 .unwrap();
275
276 let (count, total) = idx.backend.collection_stats("docs").unwrap();
277 assert_eq!(count, 2);
278 assert!(total > 0);
279 }
280
281 #[test]
282 fn remove_decrements_stats() {
283 let idx = make_index();
284 idx.index_document("docs", "d1", "hello world").unwrap();
285 idx.index_document("docs", "d2", "hello rust").unwrap();
286
287 idx.remove_document("docs", "d1").unwrap();
288
289 let (count, _) = idx.backend.collection_stats("docs").unwrap();
290 assert_eq!(count, 1);
291 }
292
293 #[test]
294 fn purge_collection_preserves_others() {
295 let idx = make_index();
296 idx.index_document("col_a", "d1", "alpha bravo").unwrap();
297 idx.index_document("col_b", "d1", "delta echo").unwrap();
298
299 idx.purge_collection("col_a").unwrap();
300 assert_eq!(idx.backend.collection_stats("col_a").unwrap(), (0, 0));
301 assert!(idx.backend.collection_stats("col_b").unwrap().0 > 0);
302
303 assert!(!idx.memtable.get_postings("col_b:delta").is_empty());
305 assert!(idx.memtable.get_postings("col_a:alpha").is_empty());
307 }
308
309 #[test]
310 fn empty_text_is_noop() {
311 let idx = make_index();
312 idx.index_document("docs", "d1", "the a is").unwrap();
313 assert_eq!(idx.backend.collection_stats("docs").unwrap(), (0, 0));
314 assert!(idx.memtable.is_empty());
315 }
316}