use std::collections::HashMap;
use std::io::Read;
#[cfg(not(target_arch = "wasm32"))]
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use crate::error::{LaurusError, Result};
use crate::lexical::core::field::FieldOption;
use crate::lexical::index::LexicalIndex;
use crate::lexical::index::config::InvertedIndexConfig;
use crate::lexical::reader::LexicalIndexReader;
use crate::lexical::search::searcher::LexicalSearcher;
use crate::lexical::writer::LexicalIndexWriter;
use crate::storage::Storage;
#[cfg(not(target_arch = "wasm32"))]
use crate::storage::file::{FileStorage, FileStorageConfig};
pub mod core;
pub mod maintenance;
pub mod reader;
pub mod searcher;
pub mod segment;
pub mod writer;
use self::reader::{InvertedIndexReader, InvertedIndexReaderConfig};
use self::searcher::InvertedIndexSearcher;
use self::segment::SegmentInfo;
use self::writer::{InvertedIndexWriter, InvertedIndexWriterConfig};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexMetadata {
pub version: u32,
pub created: u64,
pub modified: u64,
pub doc_count: u64,
pub generation: u64,
#[serde(default)]
pub deleted_count: u64,
#[serde(default)]
pub last_wal_seq: u64,
}
#[derive(Debug, Clone)]
pub struct InvertedIndexStats {
pub doc_count: u64,
pub term_count: u64,
pub segment_count: u32,
pub total_size: u64,
pub deleted_count: u64,
pub last_modified: u64,
}
impl Default for IndexMetadata {
fn default() -> Self {
let now = crate::util::time::now_secs();
IndexMetadata {
version: 1,
created: now,
modified: now,
doc_count: 0,
generation: 0,
deleted_count: 0,
last_wal_seq: 0,
}
}
}
pub struct InvertedIndex {
storage: Arc<dyn Storage>,
config: InvertedIndexConfig,
extra_fields: RwLock<HashMap<String, FieldOption>>,
closed: AtomicBool,
metadata: RwLock<IndexMetadata>,
}
impl std::fmt::Debug for InvertedIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InvertedIndex")
.field("storage", &self.storage)
.field("config", &self.config)
.field("closed", &self.closed.load(Ordering::SeqCst))
.field("metadata", &*self.metadata.read())
.finish()
}
}
impl InvertedIndex {
pub fn create(storage: Arc<dyn Storage>, config: InvertedIndexConfig) -> Result<Self> {
let metadata = IndexMetadata::default();
let index = InvertedIndex {
storage,
config,
extra_fields: RwLock::new(HashMap::new()),
closed: AtomicBool::new(false),
metadata: RwLock::new(metadata),
};
index.write_metadata()?;
Ok(index)
}
pub fn open(storage: Arc<dyn Storage>, config: InvertedIndexConfig) -> Result<Self> {
if !storage.file_exists("metadata.json") {
return Err(LaurusError::index("Index does not exist"));
}
let metadata = Self::read_metadata(storage.as_ref())?;
Ok(InvertedIndex {
storage,
config,
extra_fields: RwLock::new(HashMap::new()),
closed: AtomicBool::new(false),
metadata: RwLock::new(metadata),
})
}
#[cfg(not(target_arch = "wasm32"))]
pub fn create_in_dir<P: AsRef<Path>>(dir: P, config: InvertedIndexConfig) -> Result<Self> {
let storage_config = FileStorageConfig::new(&dir);
let storage = Arc::new(FileStorage::new(&dir, storage_config)?);
Self::create(storage, config)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn open_dir<P: AsRef<Path>>(dir: P, config: InvertedIndexConfig) -> Result<Self> {
let storage_config = FileStorageConfig::new(&dir);
let storage = Arc::new(FileStorage::new(&dir, storage_config)?);
Self::open(storage, config)
}
pub fn open_or_create(storage: Arc<dyn Storage>, config: InvertedIndexConfig) -> Result<Self> {
if storage.file_exists("metadata.json") {
Self::open(storage, config)
} else {
Self::create(storage, config)
}
}
fn write_metadata(&self) -> Result<()> {
let metadata = self.metadata.read();
let metadata_json = serde_json::to_string_pretty(&*metadata)
.map_err(|e| LaurusError::index(format!("Failed to serialize metadata: {e}")))?;
drop(metadata);
let mut output = self.storage.create_output("metadata.json")?;
std::io::Write::write_all(&mut output, metadata_json.as_bytes())?;
output.close()?;
Ok(())
}
pub(crate) fn read_metadata(storage: &dyn Storage) -> Result<IndexMetadata> {
let mut input = storage.open_input("metadata.json")?;
let mut metadata_json = String::new();
Read::read_to_string(&mut input, &mut metadata_json)?;
let metadata: IndexMetadata = serde_json::from_str(&metadata_json)
.map_err(|e| LaurusError::index(format!("Failed to deserialize metadata: {e}")))?;
Ok(metadata)
}
fn update_metadata(&self) -> Result<()> {
{
let mut metadata = self.metadata.write();
metadata.modified = crate::util::time::now_secs();
}
self.write_metadata()
}
pub fn update_doc_count(&self, additional_docs: u64) -> Result<()> {
self.check_closed()?;
{
let mut metadata = self.metadata.write();
metadata.doc_count += additional_docs;
}
self.update_metadata()
}
fn check_closed(&self) -> Result<()> {
if self.closed.load(Ordering::SeqCst) {
Err(LaurusError::index("Index is closed"))
} else {
Ok(())
}
}
fn load_segments(&self) -> Result<Vec<SegmentInfo>> {
let files = self.storage.list_files()?;
let mut segments = Vec::new();
for file in &files {
if file.starts_with("segment_") && file.ends_with(".meta") {
let mut input = self.storage.open_input(file)?;
let mut data = Vec::new();
Read::read_to_end(&mut input, &mut data)?;
let segment_info: SegmentInfo = serde_json::from_slice(&data).map_err(|e| {
LaurusError::index(format!("Failed to parse segment metadata: {e}"))
})?;
segments.push(segment_info);
}
}
segments.sort_by_key(|s| s.generation);
Ok(segments)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn exists_in_dir<P: AsRef<Path>>(dir: P) -> bool {
let metadata_path = dir.as_ref().join("metadata.json");
metadata_path.exists()
}
#[cfg(not(target_arch = "wasm32"))]
pub fn delete_in_dir<P: AsRef<Path>>(dir: P) -> Result<()> {
let storage_config = FileStorageConfig::new(&dir);
let storage = FileStorage::new(&dir, storage_config)?;
for file in storage.list_files()? {
storage.delete_file(&file)?;
}
Ok(())
}
pub fn list_files(&self) -> Result<Vec<String>> {
self.check_closed()?;
self.storage.list_files()
}
pub fn last_wal_seq(&self) -> u64 {
self.metadata.read().last_wal_seq
}
pub fn set_last_wal_seq(&self, seq: u64) -> Result<()> {
self.check_closed()?;
{
let mut metadata = self.metadata.write();
metadata.last_wal_seq = seq;
}
self.update_metadata()
}
}
impl LexicalIndex for InvertedIndex {
fn reader(&self) -> Result<Arc<dyn LexicalIndexReader>> {
self.check_closed()?;
let segments = self.load_segments()?;
let reader_config = InvertedIndexReaderConfig {
analyzer: self.config.analyzer.clone(),
..InvertedIndexReaderConfig::default()
};
let reader = InvertedIndexReader::new(segments, self.storage.clone(), reader_config)?;
Ok(Arc::new(reader))
}
fn writer(&self) -> Result<Box<dyn LexicalIndexWriter>> {
self.check_closed()?;
let mut fields = self.config.fields.clone();
fields.extend(
self.extra_fields
.read()
.iter()
.map(|(k, v)| (k.clone(), v.clone())),
);
let writer_config = InvertedIndexWriterConfig {
analyzer: self.config.analyzer.clone(),
shard_id: self.config.shard_id,
fields,
..Default::default()
};
let writer = InvertedIndexWriter::new(self.storage.clone(), writer_config)?;
Ok(Box::new(writer))
}
fn storage(&self) -> &Arc<dyn Storage> {
&self.storage
}
fn close(&self) -> Result<()> {
self.closed.store(true, Ordering::SeqCst);
Ok(())
}
fn is_closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}
fn stats(&self) -> Result<InvertedIndexStats> {
self.check_closed()?;
let metadata = self.metadata.read();
Ok(InvertedIndexStats {
doc_count: metadata.doc_count,
term_count: 0,
segment_count: 0,
total_size: 0,
deleted_count: metadata.deleted_count,
last_modified: metadata.modified,
})
}
fn optimize(&self) -> Result<()> {
self.check_closed()?;
self.update_metadata()?;
Ok(())
}
fn refresh(&self) -> Result<()> {
self.check_closed()?;
let metadata = Self::read_metadata(self.storage.as_ref())?;
*self.metadata.write() = metadata;
Ok(())
}
fn searcher(&self) -> Result<Box<dyn LexicalSearcher>> {
self.check_closed()?;
let reader = self.reader()?;
let searcher = InvertedIndexSearcher::from_arc(reader)
.with_default_fields(self.config.default_fields.clone());
Ok(Box::new(searcher))
}
fn default_fields(&self) -> Result<Vec<String>> {
Ok(self.config.default_fields.clone())
}
fn add_field(&self, name: &str, option: FieldOption) -> Result<()> {
if self.config.fields.contains_key(name) || self.extra_fields.read().contains_key(name) {
return Err(LaurusError::invalid_argument(format!(
"Field '{name}' already exists in the lexical index"
)));
}
self.extra_fields.write().insert(name.to_string(), option);
Ok(())
}
fn delete_field(&self, name: &str) -> Result<()> {
self.extra_fields.write().remove(name);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::lexical::core::document::Document;
use crate::storage::memory::{MemoryStorage, MemoryStorageConfig};
use std::sync::Arc;
#[allow(dead_code)]
fn create_test_document(title: &str, body: &str) -> Document {
Document::builder()
.add_text("title", title)
.add_text("body", body)
.build()
}
#[test]
fn test_inverted_index_writer_creation() {
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let config = InvertedIndexWriterConfig::default();
let writer = InvertedIndexWriter::new(storage, config).unwrap();
assert_eq!(writer.pending_docs(), 0);
assert_eq!(writer.stats().docs_added, 0);
}
#[test]
fn test_add_document() {
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let config = InvertedIndexWriterConfig::default();
let mut writer = InvertedIndexWriter::new(storage, config).unwrap();
let doc = create_test_document("Test Title", "This is test content");
writer.add_document(doc).unwrap();
assert_eq!(writer.pending_docs(), 1);
assert_eq!(writer.stats().docs_added, 1);
assert!(writer.stats().unique_terms > 0);
}
#[test]
fn test_auto_flush() {
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let config = InvertedIndexWriterConfig {
max_buffered_docs: 2,
..Default::default()
};
let mut writer = InvertedIndexWriter::new(storage.clone(), config).unwrap();
writer
.add_document(create_test_document("Doc 1", "Content 1"))
.unwrap();
assert_eq!(writer.pending_docs(), 1);
writer
.add_document(create_test_document("Doc 2", "Content 2"))
.unwrap();
assert_eq!(writer.pending_docs(), 0); assert_eq!(writer.stats().segments_created, 1);
let files = storage.list_files().unwrap();
assert!(files.iter().any(|f| f.contains("segment_000000")));
}
#[test]
fn test_commit() {
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let config = InvertedIndexWriterConfig::default();
let mut writer = InvertedIndexWriter::new(storage.clone(), config).unwrap();
writer
.add_document(create_test_document("Test", "Content"))
.unwrap();
writer.commit().unwrap();
assert_eq!(writer.pending_docs(), 0);
let files = storage.list_files().unwrap();
assert!(files.contains(&"index.meta".to_string()));
assert!(files.iter().any(|f| f.starts_with("segment_")));
}
#[test]
fn test_rollback() {
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let config = InvertedIndexWriterConfig::default();
let mut writer = InvertedIndexWriter::new(storage, config).unwrap();
writer
.add_document(create_test_document("Test", "Content"))
.unwrap();
assert_eq!(writer.pending_docs(), 1);
writer.rollback().unwrap();
assert_eq!(writer.pending_docs(), 0);
assert_eq!(writer.stats().docs_added, 1); }
#[test]
fn test_multiple_field_types() {
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let config = InvertedIndexWriterConfig::default();
let mut writer = InvertedIndexWriter::new(storage, config).unwrap();
let doc = Document::builder()
.add_text("title", "Test Document")
.add_text("id", "doc1")
.add_float("count", 42.0)
.build();
writer.add_document(doc).unwrap();
writer.commit().unwrap();
assert_eq!(writer.stats().docs_added, 1);
assert!(writer.stats().unique_terms >= 3); }
}