use crate::error::{FusekiError, FusekiResult};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use tracing::{debug, info, warn};
#[derive(Debug, Clone)]
pub struct IndexedLiteral {
pub subject: String,
pub predicate: String,
pub literal_value: String,
pub lang: Option<String>,
pub datatype: Option<String>,
pub graph: Option<String>,
}
#[derive(Debug, Clone)]
pub struct TextSearchHit {
pub subject: String,
pub predicate: String,
pub literal_value: String,
pub score: f32,
pub graph: Option<String>,
}
#[derive(Debug, Clone)]
struct Document {
id: u32,
literal: IndexedLiteral,
tokens: Vec<String>,
}
pub struct SimpleTextIndex {
inverted: HashMap<String, Vec<(u32, u32)>>,
documents: Vec<Document>,
next_id: u32,
stop_words: HashSet<&'static str>,
avg_doc_len: f32,
}
impl SimpleTextIndex {
const K1: f32 = 1.2;
const B: f32 = 0.75;
const STOP_WORDS: &'static [&'static str] = &[
"a", "an", "and", "are", "as", "at", "be", "been", "by", "for", "from", "has", "have",
"he", "in", "is", "it", "its", "of", "on", "or", "she", "that", "the", "their", "there",
"they", "this", "to", "was", "were", "will", "with",
];
pub fn new() -> Self {
let stop_words: HashSet<&'static str> = Self::STOP_WORDS.iter().cloned().collect();
SimpleTextIndex {
inverted: HashMap::new(),
documents: Vec::new(),
next_id: 0,
stop_words,
avg_doc_len: 0.0,
}
}
pub fn index(&mut self, literal: IndexedLiteral) -> u32 {
let id = self.next_id;
self.next_id += 1;
let tokens = Self::tokenize(&literal.literal_value);
let mut tf_counts: HashMap<String, u32> = HashMap::new();
for token in &tokens {
*tf_counts.entry(token.clone()).or_insert(0) += 1;
}
for (token, tf) in &tf_counts {
if !self.stop_words.contains(token.as_str()) {
self.inverted
.entry(token.clone())
.or_default()
.push((id, *tf));
}
}
let n = self.documents.len() as f32;
self.avg_doc_len = (self.avg_doc_len * n + tokens.len() as f32) / (n + 1.0);
self.documents.push(Document {
id,
literal,
tokens,
});
id
}
pub fn remove_subject(&mut self, subject: &str) -> usize {
let remove_ids: HashSet<u32> = self
.documents
.iter()
.filter(|d| d.literal.subject == subject)
.map(|d| d.id)
.collect();
if remove_ids.is_empty() {
return 0;
}
let count = remove_ids.len();
self.documents.retain(|d| !remove_ids.contains(&d.id));
for posting_list in self.inverted.values_mut() {
posting_list.retain(|(id, _)| !remove_ids.contains(id));
}
self.inverted.retain(|_, list| !list.is_empty());
if self.documents.is_empty() {
self.avg_doc_len = 0.0;
} else {
let total: f32 = self.documents.iter().map(|d| d.tokens.len() as f32).sum();
self.avg_doc_len = total / self.documents.len() as f32;
}
count
}
pub fn search(&self, query: &str, limit: usize) -> Vec<TextSearchHit> {
let terms = Self::tokenize(query);
if terms.is_empty() {
return Vec::new();
}
let candidate_ids: HashSet<u32> = terms
.iter()
.filter_map(|t| self.inverted.get(t))
.map(|list| list.iter().map(|(id, _)| *id).collect::<HashSet<u32>>())
.reduce(|acc, set| acc.intersection(&set).cloned().collect())
.unwrap_or_default();
let mut scored: Vec<(u32, f32)> = candidate_ids
.into_iter()
.map(|id| {
let score = self.bm25_score(id, &terms);
(id, score)
})
.collect();
scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
scored.truncate(limit);
scored
.into_iter()
.filter_map(|(id, score)| {
let doc = self.documents.iter().find(|d| d.id == id)?;
Some(TextSearchHit {
subject: doc.literal.subject.clone(),
predicate: doc.literal.predicate.clone(),
literal_value: doc.literal.literal_value.clone(),
score,
graph: doc.literal.graph.clone(),
})
})
.collect()
}
pub fn phrase_search(&self, phrase: &str, limit: usize) -> Vec<TextSearchHit> {
let terms = Self::tokenize(phrase);
if terms.is_empty() {
return Vec::new();
}
let mut scored: Vec<(u32, f32)> = self
.documents
.iter()
.filter(|doc| has_phrase(&doc.tokens, &terms))
.map(|doc| {
let score = self.bm25_score(doc.id, &terms);
(doc.id, score)
})
.collect();
scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
scored.truncate(limit);
scored
.into_iter()
.filter_map(|(id, score)| {
let doc = self.documents.iter().find(|d| d.id == id)?;
Some(TextSearchHit {
subject: doc.literal.subject.clone(),
predicate: doc.literal.predicate.clone(),
literal_value: doc.literal.literal_value.clone(),
score,
graph: doc.literal.graph.clone(),
})
})
.collect()
}
pub fn tokenize(text: &str) -> Vec<String> {
text.to_lowercase()
.split(|c: char| !c.is_alphanumeric())
.filter(|t| !t.is_empty() && t.len() > 1)
.map(|t| t.to_string())
.collect()
}
fn bm25_score(&self, doc_id: u32, query_terms: &[String]) -> f32 {
let doc = match self.documents.iter().find(|d| d.id == doc_id) {
Some(d) => d,
None => return 0.0,
};
let doc_len = doc.tokens.len() as f32;
let n = self.documents.len() as f32;
let avg_dl = if self.avg_doc_len > 0.0 {
self.avg_doc_len
} else {
1.0
};
query_terms
.iter()
.map(|term| {
let tf = doc.tokens.iter().filter(|t| *t == term).count() as f32;
if tf == 0.0 {
return 0.0;
}
let df = self
.inverted
.get(term)
.map(|list| list.len() as f32)
.unwrap_or(0.0);
if df == 0.0 {
return 0.0;
}
let idf = ((n - df + 0.5) / (df + 0.5) + 1.0).ln();
let tf_norm = (tf * (Self::K1 + 1.0))
/ (tf + Self::K1 * (1.0 - Self::B + Self::B * doc_len / avg_dl));
idf * tf_norm
})
.sum()
}
pub fn document_count(&self) -> usize {
self.documents.len()
}
pub fn term_count(&self) -> usize {
self.inverted.len()
}
}
impl Default for SimpleTextIndex {
fn default() -> Self {
Self::new()
}
}
fn has_phrase(tokens: &[String], phrase: &[String]) -> bool {
if phrase.is_empty() {
return true;
}
if tokens.len() < phrase.len() {
return false;
}
tokens.windows(phrase.len()).any(|window| window == phrase)
}
pub struct TantivyTextIndex {
index: tantivy::Index,
writer: Arc<RwLock<tantivy::IndexWriter>>,
reader: tantivy::IndexReader,
schema: tantivy::schema::Schema,
field_subject: tantivy::schema::Field,
field_predicate: tantivy::schema::Field,
field_literal: tantivy::schema::Field,
field_graph: tantivy::schema::Field,
field_lang: tantivy::schema::Field,
}
impl TantivyTextIndex {
const WRITER_HEAP_BYTES: usize = 50 * 1024 * 1024;
pub fn open(index_dir: PathBuf) -> FusekiResult<Self> {
use tantivy::schema::{SchemaBuilder, STORED, STRING, TEXT};
let mut schema_builder = SchemaBuilder::new();
let field_subject = schema_builder.add_text_field("subject", STRING | STORED);
let field_predicate = schema_builder.add_text_field("predicate", STRING | STORED);
let field_literal = schema_builder.add_text_field("literal", TEXT | STORED);
let field_graph = schema_builder.add_text_field("graph", STRING | STORED);
let field_lang = schema_builder.add_text_field("lang", STRING | STORED);
let schema = schema_builder.build();
std::fs::create_dir_all(&index_dir).map_err(FusekiError::Io)?;
let index = tantivy::Index::create_in_dir(&index_dir, schema.clone())
.or_else(|_| tantivy::Index::open_in_dir(&index_dir))
.map_err(|e| FusekiError::Internal {
message: format!("Failed to open Tantivy index: {e}"),
})?;
let writer = index
.writer(Self::WRITER_HEAP_BYTES)
.map_err(|e| FusekiError::Internal {
message: format!("Failed to create Tantivy writer: {e}"),
})?;
let reader = index
.reader_builder()
.reload_policy(tantivy::ReloadPolicy::Manual)
.try_into()
.map_err(|e| FusekiError::Internal {
message: format!("Failed to create Tantivy reader: {e}"),
})?;
info!(path = %index_dir.display(), "Tantivy text index opened");
Ok(TantivyTextIndex {
index,
writer: Arc::new(RwLock::new(writer)),
reader,
schema,
field_subject,
field_predicate,
field_literal,
field_graph,
field_lang,
})
}
pub fn index(&self, literal: &IndexedLiteral) -> FusekiResult<()> {
let mut doc = tantivy::TantivyDocument::default();
doc.add_text(self.field_subject, &literal.subject);
doc.add_text(self.field_predicate, &literal.predicate);
doc.add_text(self.field_literal, &literal.literal_value);
doc.add_text(self.field_graph, literal.graph.as_deref().unwrap_or(""));
doc.add_text(self.field_lang, literal.lang.as_deref().unwrap_or(""));
let writer = self.writer.write().map_err(|e| FusekiError::Internal {
message: format!("Tantivy writer lock poisoned: {e}"),
})?;
writer
.add_document(doc)
.map_err(|e| FusekiError::Internal {
message: format!("Failed to add Tantivy document: {e}"),
})?;
Ok(())
}
pub fn commit(&self) -> FusekiResult<()> {
{
let mut writer = self.writer.write().map_err(|e| FusekiError::Internal {
message: format!("Tantivy writer lock poisoned on commit: {e}"),
})?;
writer.commit().map_err(|e| FusekiError::Internal {
message: format!("Tantivy commit failed: {e}"),
})?;
} self.reader.reload().map_err(|e| FusekiError::Internal {
message: format!("Tantivy reader reload failed: {e}"),
})?;
debug!("Tantivy index committed and reader reloaded");
Ok(())
}
pub fn remove_subject(&self, subject: &str) -> FusekiResult<()> {
use tantivy::Term;
let term = Term::from_field_text(self.field_subject, subject);
let writer = self.writer.write().map_err(|e| FusekiError::Internal {
message: format!("Tantivy writer lock poisoned on remove: {e}"),
})?;
writer.delete_term(term);
Ok(())
}
pub fn search(&self, query_str: &str, limit: usize) -> FusekiResult<Vec<TextSearchHit>> {
use tantivy::collector::TopDocs;
use tantivy::query::QueryParser;
let searcher = self.reader.searcher();
let query_parser = QueryParser::for_index(&self.index, vec![self.field_literal]);
let query = query_parser
.parse_query(query_str)
.map_err(|e| FusekiError::Internal {
message: format!("Tantivy query parse error: {e}"),
})?;
let top_docs = searcher
.search(&query, &TopDocs::with_limit(limit))
.map_err(|e| FusekiError::Internal {
message: format!("Tantivy search error: {e}"),
})?;
let mut hits = Vec::with_capacity(top_docs.len());
for (score, doc_addr) in top_docs {
match searcher.doc(doc_addr) {
Ok(doc) => {
let subject = get_field_str(&doc, self.field_subject);
let predicate = get_field_str(&doc, self.field_predicate);
let literal_value = get_field_str(&doc, self.field_literal);
let graph = {
let g = get_field_str(&doc, self.field_graph);
if g.is_empty() {
None
} else {
Some(g)
}
};
hits.push(TextSearchHit {
subject,
predicate,
literal_value,
score,
graph,
});
}
Err(e) => {
warn!("Failed to retrieve Tantivy document: {}", e);
}
}
}
Ok(hits)
}
pub fn document_count(&self) -> usize {
self.reader.searcher().num_docs() as usize
}
}
fn get_field_str(doc: &tantivy::TantivyDocument, field: tantivy::schema::Field) -> String {
use tantivy::schema::Value;
doc.get_first(field)
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string()
}
pub enum TextIndexBackend {
Simple(SimpleTextIndex),
Tantivy(TantivyTextIndex),
}
pub struct TextIndex {
backend: Arc<RwLock<TextIndexBackend>>,
}
impl TextIndex {
pub fn new_simple() -> Self {
TextIndex {
backend: Arc::new(RwLock::new(
TextIndexBackend::Simple(SimpleTextIndex::new()),
)),
}
}
pub fn new_tantivy(index_dir: PathBuf) -> FusekiResult<Self> {
let tantivy_idx = TantivyTextIndex::open(index_dir)?;
Ok(TextIndex {
backend: Arc::new(RwLock::new(TextIndexBackend::Tantivy(tantivy_idx))),
})
}
pub fn index(&self, literal: IndexedLiteral) -> FusekiResult<()> {
let mut backend = self.backend.write().map_err(|e| FusekiError::Internal {
message: format!("TextIndex RwLock poisoned on index: {e}"),
})?;
match &mut *backend {
TextIndexBackend::Simple(idx) => {
idx.index(literal);
Ok(())
}
TextIndexBackend::Tantivy(idx) => idx.index(&literal),
}
}
pub fn remove_subject(&self, subject: &str) -> FusekiResult<usize> {
let mut backend = self.backend.write().map_err(|e| FusekiError::Internal {
message: format!("TextIndex RwLock poisoned on remove: {e}"),
})?;
match &mut *backend {
TextIndexBackend::Simple(idx) => Ok(idx.remove_subject(subject)),
TextIndexBackend::Tantivy(idx) => {
idx.remove_subject(subject)?;
Ok(0) }
}
}
pub fn search(&self, query: &str, limit: usize) -> FusekiResult<Vec<TextSearchHit>> {
let backend = self.backend.read().map_err(|e| FusekiError::Internal {
message: format!("TextIndex RwLock poisoned on search: {e}"),
})?;
match &*backend {
TextIndexBackend::Simple(idx) => Ok(idx.search(query, limit)),
TextIndexBackend::Tantivy(idx) => idx.search(query, limit),
}
}
pub fn phrase_search(&self, phrase: &str, limit: usize) -> FusekiResult<Vec<TextSearchHit>> {
let backend = self.backend.read().map_err(|e| FusekiError::Internal {
message: format!("TextIndex RwLock poisoned on phrase_search: {e}"),
})?;
match &*backend {
TextIndexBackend::Simple(idx) => Ok(idx.phrase_search(phrase, limit)),
TextIndexBackend::Tantivy(idx) => {
let quoted = format!("\"{}\"", phrase.replace('"', ""));
idx.search("ed, limit)
}
}
}
pub fn commit(&self) -> FusekiResult<()> {
let backend = self.backend.read().map_err(|e| FusekiError::Internal {
message: format!("TextIndex RwLock poisoned on commit: {e}"),
})?;
match &*backend {
TextIndexBackend::Simple(_) => Ok(()),
TextIndexBackend::Tantivy(idx) => idx.commit(),
}
}
pub fn document_count(&self) -> usize {
let backend = self.backend.read().unwrap_or_else(|e| e.into_inner());
match &*backend {
TextIndexBackend::Simple(idx) => idx.document_count(),
TextIndexBackend::Tantivy(idx) => idx.document_count(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_literal(subject: &str, predicate: &str, text: &str) -> IndexedLiteral {
IndexedLiteral {
subject: subject.to_string(),
predicate: predicate.to_string(),
literal_value: text.to_string(),
lang: Some("en".to_string()),
datatype: None,
graph: None,
}
}
#[test]
fn test_simple_index_and_search() {
let mut idx = SimpleTextIndex::new();
idx.index(make_literal(
"http://ex.org/doc1",
"http://ex.org/title",
"Rust programming language systems",
));
idx.index(make_literal(
"http://ex.org/doc2",
"http://ex.org/title",
"Python scripting programming language",
));
let hits = idx.search("rust programming", 10);
assert!(!hits.is_empty(), "Should find 'rust programming'");
assert_eq!(hits[0].subject, "http://ex.org/doc1");
}
#[test]
fn test_simple_bm25_ordering() {
let mut idx = SimpleTextIndex::new();
idx.index(make_literal(
"http://ex.org/doc1",
"http://ex.org/desc",
"A database system",
));
idx.index(make_literal(
"http://ex.org/doc2",
"http://ex.org/desc",
"database database database management",
));
let hits = idx.search("database", 10);
assert!(hits.len() == 2);
assert_eq!(hits[0].subject, "http://ex.org/doc2");
}
#[test]
fn test_simple_remove_subject() {
let mut idx = SimpleTextIndex::new();
idx.index(make_literal(
"http://ex.org/s1",
"http://ex.org/p",
"Hello world",
));
idx.index(make_literal(
"http://ex.org/s2",
"http://ex.org/p",
"Hello Rust",
));
let removed = idx.remove_subject("http://ex.org/s1");
assert_eq!(removed, 1);
let hits = idx.search("hello", 10);
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].subject, "http://ex.org/s2");
}
#[test]
fn test_simple_phrase_search() {
let mut idx = SimpleTextIndex::new();
idx.index(make_literal(
"http://ex.org/doc1",
"http://ex.org/p",
"semantic web technologies",
));
idx.index(make_literal(
"http://ex.org/doc2",
"http://ex.org/p",
"web semantic data technologies",
));
let hits = idx.phrase_search("semantic web", 10);
assert_eq!(hits.len(), 1, "Only doc1 has 'semantic web' in order");
assert_eq!(hits[0].subject, "http://ex.org/doc1");
}
#[test]
fn test_simple_and_semantics() {
let mut idx = SimpleTextIndex::new();
idx.index(make_literal(
"http://ex.org/doc1",
"http://ex.org/p",
"apple orange banana",
));
idx.index(make_literal(
"http://ex.org/doc2",
"http://ex.org/p",
"apple mango kiwi",
));
let hits = idx.search("apple orange", 10);
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].subject, "http://ex.org/doc1");
}
#[test]
fn test_tokenization() {
let tokens = SimpleTextIndex::tokenize("Hello, World! This is a TEST.");
assert!(tokens.contains(&"hello".to_string()));
assert!(tokens.contains(&"world".to_string()));
assert!(tokens.contains(&"test".to_string()));
assert!(tokens.contains(&"this".to_string()));
}
#[test]
fn test_empty_search() {
let idx = SimpleTextIndex::new();
let hits = idx.search("nonexistent", 10);
assert!(hits.is_empty());
}
#[test]
fn test_document_and_term_count() {
let mut idx = SimpleTextIndex::new();
assert_eq!(idx.document_count(), 0);
idx.index(make_literal("s1", "p", "hello world rust"));
assert_eq!(idx.document_count(), 1);
assert!(idx.term_count() > 0);
}
#[test]
fn test_unified_index_simple_backend() {
let idx = TextIndex::new_simple();
idx.index(make_literal(
"http://ex.org/s1",
"http://ex.org/p",
"knowledge graph reasoning",
))
.unwrap();
let hits = idx.search("knowledge graph", 10).unwrap();
assert!(!hits.is_empty());
assert_eq!(hits[0].subject, "http://ex.org/s1");
}
#[test]
fn test_unified_remove_subject() {
let idx = TextIndex::new_simple();
idx.index(make_literal(
"http://ex.org/s1",
"http://ex.org/p",
"sparql query language",
))
.unwrap();
idx.index(make_literal(
"http://ex.org/s2",
"http://ex.org/p",
"sparql endpoint server",
))
.unwrap();
idx.remove_subject("http://ex.org/s1").unwrap();
let hits = idx.search("sparql", 10).unwrap();
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].subject, "http://ex.org/s2");
}
#[test]
fn test_tantivy_index() {
let dir = std::env::temp_dir().join(format!("oxirs_tantivy_test_{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let idx = TextIndex::new_tantivy(dir.clone()).unwrap();
idx.index(make_literal(
"http://ex.org/s1",
"http://ex.org/p",
"tantivy full text search engine",
))
.unwrap();
idx.index(make_literal(
"http://ex.org/s2",
"http://ex.org/p",
"sparql semantic web query",
))
.unwrap();
idx.commit().unwrap();
let hits = idx.search("tantivy", 10).unwrap();
assert!(!hits.is_empty(), "Should find 'tantivy' in Tantivy index");
assert_eq!(hits[0].subject, "http://ex.org/s1");
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn test_tantivy_phrase_search() {
let dir = std::env::temp_dir().join(format!("oxirs_tantivy_phrase_{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let idx = TextIndex::new_tantivy(dir.clone()).unwrap();
idx.index(make_literal(
"http://ex.org/s1",
"http://ex.org/p",
"semantic web technologies",
))
.unwrap();
idx.index(make_literal(
"http://ex.org/s2",
"http://ex.org/p",
"web semantic computing",
))
.unwrap();
idx.commit().unwrap();
let hits = idx.phrase_search("semantic web", 10).unwrap();
assert!(!hits.is_empty(), "Should find phrase 'semantic web'");
assert_eq!(hits[0].subject, "http://ex.org/s1");
let _ = std::fs::remove_dir_all(&dir);
}
}