use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use tracing::debug;
use crate::backend::FtsBackend;
use crate::block::CompactPosting;
use crate::codec::DocIdMap;
use crate::codec::smallfloat;
use crate::lsm::compaction;
use crate::lsm::memtable::{Memtable, MemtableConfig};
use crate::lsm::segment::writer as seg_writer;
use crate::posting::Bm25Params;
pub struct FtsIndex<B: FtsBackend> {
pub(crate) backend: B,
pub(crate) bm25_params: Bm25Params,
pub(crate) memtable: Memtable,
next_segment_id: AtomicU64,
}
impl<B: FtsBackend> FtsIndex<B> {
pub fn new(backend: B) -> Self {
Self {
backend,
bm25_params: Bm25Params::default(),
memtable: Memtable::new(MemtableConfig::default()),
next_segment_id: AtomicU64::new(1),
}
}
pub fn with_params(backend: B, params: Bm25Params) -> Self {
Self {
backend,
bm25_params: params,
memtable: Memtable::new(MemtableConfig::default()),
next_segment_id: AtomicU64::new(1),
}
}
pub fn backend(&self) -> &B {
&self.backend
}
pub fn backend_mut(&mut self) -> &mut B {
&mut self.backend
}
pub fn memtable(&self) -> &Memtable {
&self.memtable
}
pub fn load_doc_id_map(&self, collection: &str) -> Result<DocIdMap, B::Error> {
let key = format!("{collection}:docmap");
match self.backend.read_meta(&key)? {
Some(bytes) => Ok(DocIdMap::from_bytes(&bytes).unwrap_or_default()),
None => Ok(DocIdMap::new()),
}
}
fn save_doc_id_map(&self, collection: &str, map: &DocIdMap) -> Result<(), B::Error> {
let key = format!("{collection}:docmap");
self.backend.write_meta(&key, &map.to_bytes())
}
pub fn index_document(
&self,
collection: &str,
doc_id: &str,
text: &str,
) -> Result<(), B::Error> {
let tokens = self.analyze_for_collection(collection, text)?;
if tokens.is_empty() {
return Ok(());
}
let mut doc_map = self.load_doc_id_map(collection)?;
let int_id = doc_map.get_or_assign(doc_id);
self.save_doc_id_map(collection, &doc_map)?;
let mut term_data: HashMap<&str, (u32, Vec<u32>)> = HashMap::new();
for (pos, token) in tokens.iter().enumerate() {
let entry = term_data.entry(token.as_str()).or_insert((0, Vec::new()));
entry.0 += 1;
entry.1.push(pos as u32);
}
let doc_len = tokens.len() as u32;
for (term, (freq, positions)) in &term_data {
let compact = CompactPosting {
doc_id: int_id,
term_freq: *freq,
fieldnorm: smallfloat::encode(doc_len),
positions: positions.clone(),
};
let scoped_term = format!("{collection}:{term}");
self.memtable.insert(&scoped_term, compact);
}
self.memtable.record_doc(int_id, doc_len);
self.backend.write_doc_length(collection, doc_id, doc_len)?;
self.write_fieldnorm(collection, int_id, doc_len)?;
self.backend.increment_stats(collection, doc_len)?;
if self.memtable.should_flush() {
self.flush_memtable(collection)?;
}
debug!(%collection, %doc_id, int_id, tokens = tokens.len(), terms = term_data.len(), "indexed document");
Ok(())
}
fn flush_memtable(&self, collection: &str) -> Result<(), B::Error> {
let drained = self.memtable.drain();
if drained.is_empty() {
return Ok(());
}
let segment_bytes = seg_writer::flush_to_segment(drained);
let seg_id = self.next_segment_id.fetch_add(1, Ordering::Relaxed);
let key = compaction::segment_key(collection, seg_id, 0);
self.backend.write_segment(&key, &segment_bytes)?;
debug!(%collection, seg_id, bytes = segment_bytes.len(), "flushed memtable to segment");
Ok(())
}
pub fn remove_document(&self, collection: &str, doc_id: &str) -> Result<(), B::Error> {
let doc_len = self.backend.read_doc_length(collection, doc_id)?;
let mut doc_map = self.load_doc_id_map(collection)?;
if let Some(int_id) = doc_map.to_u32(doc_id) {
self.memtable.remove_doc(int_id);
}
doc_map.remove(doc_id);
self.save_doc_id_map(collection, &doc_map)?;
self.backend.remove_doc_length(collection, doc_id)?;
if let Some(len) = doc_len {
self.backend.decrement_stats(collection, len)?;
}
Ok(())
}
pub fn purge_collection(&self, collection: &str) -> Result<usize, B::Error> {
self.memtable.drain_collection(collection);
self.backend.purge_collection(collection)
}
}
#[cfg(test)]
mod tests {
use crate::backend::memory::MemoryBackend;
use super::*;
fn make_index() -> FtsIndex<MemoryBackend> {
FtsIndex::new(MemoryBackend::new())
}
#[test]
fn index_writes_to_memtable() {
let idx = make_index();
idx.index_document("docs", "d1", "hello world greeting")
.unwrap();
assert!(!idx.memtable.is_empty());
assert!(idx.memtable.posting_count() > 0);
}
#[test]
fn memtable_flush_on_threshold() {
let backend = MemoryBackend::new();
let idx = FtsIndex {
backend,
bm25_params: Bm25Params::default(),
memtable: Memtable::new(MemtableConfig {
max_postings: 5,
max_terms: 100,
}),
next_segment_id: AtomicU64::new(1),
};
idx.index_document("docs", "d1", "alpha bravo charlie delta echo foxtrot")
.unwrap();
assert!(idx.memtable.is_empty());
let segments = idx.backend.list_segments("docs").unwrap();
assert!(!segments.is_empty(), "segment should have been written");
}
#[test]
fn index_assigns_doc_ids() {
let idx = make_index();
idx.index_document("docs", "d1", "hello world greeting")
.unwrap();
idx.index_document("docs", "d2", "hello rust language")
.unwrap();
let map = idx.load_doc_id_map("docs").unwrap();
assert_eq!(map.to_u32("d1"), Some(0));
assert_eq!(map.to_u32("d2"), Some(1));
}
#[test]
fn remove_tombstones_docmap() {
let idx = make_index();
idx.index_document("docs", "d1", "hello world").unwrap();
idx.index_document("docs", "d2", "hello rust").unwrap();
idx.remove_document("docs", "d1").unwrap();
let map = idx.load_doc_id_map("docs").unwrap();
assert_eq!(map.to_u32("d1"), None);
assert_eq!(map.to_u32("d2"), Some(1));
}
#[test]
fn index_updates_stats() {
let idx = make_index();
idx.index_document("docs", "d1", "hello world greeting")
.unwrap();
idx.index_document("docs", "d2", "hello rust language")
.unwrap();
let (count, total) = idx.backend.collection_stats("docs").unwrap();
assert_eq!(count, 2);
assert!(total > 0);
}
#[test]
fn remove_decrements_stats() {
let idx = make_index();
idx.index_document("docs", "d1", "hello world").unwrap();
idx.index_document("docs", "d2", "hello rust").unwrap();
idx.remove_document("docs", "d1").unwrap();
let (count, _) = idx.backend.collection_stats("docs").unwrap();
assert_eq!(count, 1);
}
#[test]
fn purge_collection_preserves_others() {
let idx = make_index();
idx.index_document("col_a", "d1", "alpha bravo").unwrap();
idx.index_document("col_b", "d1", "delta echo").unwrap();
idx.purge_collection("col_a").unwrap();
assert_eq!(idx.backend.collection_stats("col_a").unwrap(), (0, 0));
assert!(idx.backend.collection_stats("col_b").unwrap().0 > 0);
assert!(!idx.memtable.get_postings("col_b:delta").is_empty());
assert!(idx.memtable.get_postings("col_a:alpha").is_empty());
}
#[test]
fn empty_text_is_noop() {
let idx = make_index();
idx.index_document("docs", "d1", "the a is").unwrap();
assert_eq!(idx.backend.collection_stats("docs").unwrap(), (0, 0));
assert!(idx.memtable.is_empty());
}
}