use std::collections::HashSet;
use std::fmt;
#[cfg(feature = "mmap")]
use std::path::Path;
use std::path::PathBuf;
use std::thread::available_parallelism;
use super::segment::Segment;
use super::segment_reader::merge_field_meta_data;
use super::{FieldMetadata, IndexSettings};
use crate::core::{Executor, META_FILEPATH};
use crate::directory::error::OpenReadError;
#[cfg(feature = "mmap")]
use crate::directory::MmapDirectory;
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
use crate::error::{DataCorruption, LucivyError};
use crate::index::{IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory};
use crate::indexer::index_writer::{
IndexWriterOptions, MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN,
};
use crate::indexer::segment_updater::save_metas;
use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
use crate::reader::{IndexReader, IndexReaderBuilder};
use crate::schema::document::Document;
use crate::schema::{Field, FieldType, Schema};
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
use crate::SegmentReader;
fn parse_metas(meta_data: Vec<u8>, inventory: &SegmentMetaInventory) -> crate::Result<IndexMeta> {
let meta_string = String::from_utf8(meta_data).map_err(|_utf8_err| {
error!("Meta data is not valid utf8.");
DataCorruption::new(
META_FILEPATH.to_path_buf(),
"Meta file does not contain valid utf8 file.".to_string(),
)
})?;
IndexMeta::deserialize(&meta_string, inventory)
.map_err(|e| {
DataCorruption::new(
META_FILEPATH.to_path_buf(),
format!("Meta file cannot be deserialized. {e:?}. Content: {meta_string:?}"),
)
})
.map_err(From::from)
}
fn load_metas(
directory: &dyn Directory,
inventory: &SegmentMetaInventory,
) -> crate::Result<IndexMeta> {
let meta_data = directory.atomic_read(&META_FILEPATH)?;
parse_metas(meta_data, inventory)
}
#[cfg(feature = "quickwit")]
async fn load_metas_async(
directory: &dyn Directory,
inventory: &SegmentMetaInventory,
) -> crate::Result<IndexMeta> {
let meta_data = directory.atomic_read_async(&META_FILEPATH).await?;
parse_metas(meta_data, inventory)
}
fn save_new_metas(
schema: Schema,
index_settings: IndexSettings,
directory: &dyn Directory,
) -> crate::Result<()> {
save_metas(
&IndexMeta {
index_settings,
segments: Vec::new(),
schema,
opstamp: 0u64,
payload: None,
},
directory,
)?;
directory.sync_directory()?;
Ok(())
}
pub struct IndexBuilder {
schema: Option<Schema>,
index_settings: IndexSettings,
tokenizer_manager: TokenizerManager,
fast_field_tokenizer_manager: TokenizerManager,
}
impl Default for IndexBuilder {
fn default() -> Self {
IndexBuilder::new()
}
}
impl IndexBuilder {
pub fn new() -> Self {
Self {
schema: None,
index_settings: IndexSettings::default(),
tokenizer_manager: TokenizerManager::default(),
fast_field_tokenizer_manager: TokenizerManager::default(),
}
}
#[must_use]
pub fn settings(mut self, settings: IndexSettings) -> Self {
self.index_settings = settings;
self
}
#[must_use]
pub fn schema(mut self, schema: Schema) -> Self {
self.schema = Some(schema);
self
}
pub fn tokenizers(mut self, tokenizers: TokenizerManager) -> Self {
self.tokenizer_manager = tokenizers;
self
}
pub fn fast_field_tokenizers(mut self, tokenizers: TokenizerManager) -> Self {
self.fast_field_tokenizer_manager = tokenizers;
self
}
pub fn create_in_ram(self) -> Result<Index, LucivyError> {
let ram_directory = RamDirectory::create();
self.create(ram_directory)
}
#[cfg(feature = "mmap")]
pub fn create_in_dir<P: AsRef<Path>>(self, directory_path: P) -> crate::Result<Index> {
let mmap_directory: Box<dyn Directory> = Box::new(MmapDirectory::open(directory_path)?);
if Index::exists(&*mmap_directory)? {
return Err(LucivyError::IndexAlreadyExists);
}
self.create(mmap_directory)
}
#[doc(hidden)]
pub fn single_segment_index_writer<D: Document>(
self,
dir: impl Into<Box<dyn Directory>>,
mem_budget: usize,
) -> crate::Result<SingleSegmentIndexWriter<D>> {
let index = self.create(dir)?;
let index_simple_writer = SingleSegmentIndexWriter::new(index, mem_budget)?;
Ok(index_simple_writer)
}
#[cfg(feature = "mmap")]
pub fn create_from_tempdir(self) -> crate::Result<Index> {
let mmap_directory: Box<dyn Directory> = Box::new(MmapDirectory::create_from_tempdir()?);
self.create(mmap_directory)
}
fn get_expect_schema(&self) -> crate::Result<Schema> {
self.schema
.as_ref()
.cloned()
.ok_or(LucivyError::IndexBuilderMissingArgument("schema"))
}
pub fn open_or_create<T: Into<Box<dyn Directory>>>(self, dir: T) -> crate::Result<Index> {
let dir: Box<dyn Directory> = dir.into();
if !Index::exists(&*dir)? {
return self.create(dir);
}
let mut index = Index::open(dir)?;
index.set_tokenizers(self.tokenizer_manager.clone());
if index.schema() == self.get_expect_schema()? {
Ok(index)
} else {
Err(LucivyError::SchemaError(
"An index exists but the schema does not match.".to_string(),
))
}
}
fn validate(&self) -> crate::Result<()> {
if let Some(_schema) = self.schema.as_ref() {
Ok(())
} else {
Err(LucivyError::InvalidArgument(
"no schema passed".to_string(),
))
}
}
fn create<T: Into<Box<dyn Directory>>>(self, dir: T) -> crate::Result<Index> {
self.validate()?;
let dir = dir.into();
let directory = ManagedDirectory::wrap(dir)?;
save_new_metas(
self.get_expect_schema()?,
self.index_settings.clone(),
&directory,
)?;
let mut metas = IndexMeta::with_schema(self.get_expect_schema()?);
metas.index_settings = self.index_settings;
let mut index = Index::open_from_metas(directory, &metas, SegmentMetaInventory::default());
index.set_tokenizers(self.tokenizer_manager);
index.set_fast_field_tokenizers(self.fast_field_tokenizer_manager);
Ok(index)
}
}
#[derive(Clone)]
pub struct Index {
directory: ManagedDirectory,
schema: Schema,
settings: IndexSettings,
executor: Executor,
tokenizers: TokenizerManager,
fast_field_tokenizers: TokenizerManager,
inventory: SegmentMetaInventory,
}
impl Index {
pub fn builder() -> IndexBuilder {
IndexBuilder::new()
}
pub fn exists(dir: &dyn Directory) -> Result<bool, OpenReadError> {
dir.exists(&META_FILEPATH)
}
pub fn search_executor(&self) -> &Executor {
&self.executor
}
pub fn set_multithread_executor(&mut self, num_threads: usize) -> crate::Result<()> {
self.executor = Executor::multi_thread(num_threads, "lucivy-search-")?;
Ok(())
}
pub fn set_executor(&mut self, executor: Executor) {
self.executor = executor;
}
pub fn set_default_multithread_executor(&mut self) -> crate::Result<()> {
let default_num_threads = available_parallelism()?.get();
self.set_multithread_executor(default_num_threads)
}
pub fn create_in_ram(schema: Schema) -> Index {
IndexBuilder::new().schema(schema).create_in_ram().unwrap()
}
#[cfg(feature = "mmap")]
pub fn create_in_dir<P: AsRef<Path>>(
directory_path: P,
schema: Schema,
) -> crate::Result<Index> {
IndexBuilder::new()
.schema(schema)
.create_in_dir(directory_path)
}
pub fn open_or_create<T: Into<Box<dyn Directory>>>(
dir: T,
schema: Schema,
) -> crate::Result<Index> {
let dir = dir.into();
IndexBuilder::new().schema(schema).open_or_create(dir)
}
#[cfg(feature = "mmap")]
pub fn create_from_tempdir(schema: Schema) -> crate::Result<Index> {
IndexBuilder::new().schema(schema).create_from_tempdir()
}
pub fn create<T: Into<Box<dyn Directory>>>(
dir: T,
schema: Schema,
settings: IndexSettings,
) -> crate::Result<Index> {
let dir: Box<dyn Directory> = dir.into();
let mut builder = IndexBuilder::new().schema(schema);
builder = builder.settings(settings);
builder.create(dir)
}
fn open_from_metas(
directory: ManagedDirectory,
metas: &IndexMeta,
inventory: SegmentMetaInventory,
) -> Index {
let schema = metas.schema.clone();
Index {
settings: metas.index_settings.clone(),
directory,
schema,
tokenizers: TokenizerManager::default(),
fast_field_tokenizers: TokenizerManager::default(),
executor: Executor::single_thread(),
inventory,
}
}
pub fn set_tokenizers(&mut self, tokenizers: TokenizerManager) {
self.tokenizers = tokenizers;
}
pub fn tokenizers(&self) -> &TokenizerManager {
&self.tokenizers
}
pub fn set_fast_field_tokenizers(&mut self, tokenizers: TokenizerManager) {
self.fast_field_tokenizers = tokenizers;
}
pub fn fast_field_tokenizer(&self) -> &TokenizerManager {
&self.fast_field_tokenizers
}
pub fn tokenizer_for_field(&self, field: Field) -> crate::Result<TextAnalyzer> {
let field_entry = self.schema.get_field_entry(field);
let field_type = field_entry.field_type();
let tokenizer_manager: &TokenizerManager = self.tokenizers();
let indexing_options_opt = match field_type {
FieldType::JsonObject(options) => options.get_text_indexing_options(),
FieldType::Str(options) => options.get_indexing_options(),
_ => {
return Err(LucivyError::SchemaError(format!(
"{:?} is not a text field.",
field_entry.name()
)))
}
};
let indexing_options = indexing_options_opt.ok_or_else(|| {
LucivyError::InvalidArgument(format!(
"No indexing options set for field {field_entry:?}"
))
})?;
tokenizer_manager
.get(indexing_options.tokenizer())
.ok_or_else(|| {
LucivyError::InvalidArgument(format!(
"No Tokenizer found for field {field_entry:?}"
))
})
}
pub fn reader(&self) -> crate::Result<IndexReader> {
self.reader_builder().try_into()
}
pub fn reader_builder(&self) -> IndexReaderBuilder {
IndexReaderBuilder::new(self.clone())
}
#[cfg(feature = "mmap")]
pub fn open_in_dir<P: AsRef<Path>>(directory_path: P) -> crate::Result<Index> {
let mmap_directory = MmapDirectory::open(directory_path)?;
Index::open(mmap_directory)
}
pub(crate) fn list_all_segment_metas(&self) -> Vec<SegmentMeta> {
self.inventory.all()
}
pub fn fields_metadata(&self) -> crate::Result<Vec<FieldMetadata>> {
let segments = self.searchable_segments()?;
let fields_metadata: Vec<Vec<FieldMetadata>> = segments
.into_iter()
.map(|segment| SegmentReader::open(&segment)?.fields_metadata())
.collect::<Result<_, _>>()?;
Ok(merge_field_meta_data(fields_metadata))
}
pub fn new_segment_meta(&self, segment_id: SegmentId, max_doc: u32) -> SegmentMeta {
self.inventory.new_segment_meta(segment_id, max_doc)
}
pub fn open<T: Into<Box<dyn Directory>>>(directory: T) -> crate::Result<Index> {
let directory = directory.into();
let directory = ManagedDirectory::wrap(directory)?;
let inventory = SegmentMetaInventory::default();
let metas = load_metas(&directory, &inventory)?;
let index = Index::open_from_metas(directory, &metas, inventory);
Ok(index)
}
#[cfg(feature = "quickwit")]
pub async fn open_async<T: Into<Box<dyn Directory>>>(directory: T) -> crate::Result<Index> {
let directory = directory.into();
let directory = ManagedDirectory::wrap(directory)?;
let inventory = SegmentMetaInventory::default();
let metas = load_metas_async(&directory, &inventory).await?;
let index = Index::open_from_metas(directory, &metas, inventory);
Ok(index)
}
pub fn load_metas(&self) -> crate::Result<IndexMeta> {
load_metas(self.directory(), &self.inventory)
}
#[cfg(feature = "quickwit")]
pub async fn load_metas_async(&self) -> crate::Result<IndexMeta> {
load_metas_async(self.directory(), &self.inventory).await
}
pub fn writer_with_options<D: Document>(
&self,
options: IndexWriterOptions,
) -> crate::Result<IndexWriter<D>> {
let directory_lock = self
.directory
.acquire_lock(&INDEX_WRITER_LOCK)
.map_err(|err| {
LucivyError::LockFailure(
err,
Some(
"Failed to acquire index lock. If you are using a regular directory, this \
means there is already an `IndexWriter` working on this `Directory`, in \
this process or in a different process."
.to_string(),
),
)
})?;
IndexWriter::new(self, options, directory_lock)
}
pub fn writer_with_num_threads<D: Document>(
&self,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<D>> {
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
let options = IndexWriterOptions::builder()
.num_worker_threads(num_threads)
.memory_budget_per_thread(memory_arena_in_bytes_per_thread)
.build();
self.writer_with_options(options)
}
#[cfg(test)]
pub fn writer_for_tests<D: Document>(&self) -> crate::Result<IndexWriter<D>> {
self.writer_with_num_threads(1, MEMORY_BUDGET_NUM_BYTES_MIN)
}
pub fn writer<D: Document>(
&self,
memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<D>> {
let mut num_threads = std::cmp::min(available_parallelism()?.get(), MAX_NUM_THREAD);
let memory_budget_num_bytes_per_thread = memory_budget_in_bytes / num_threads;
if memory_budget_num_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
num_threads = (memory_budget_in_bytes / MEMORY_BUDGET_NUM_BYTES_MIN).max(1);
}
self.writer_with_num_threads(num_threads, memory_budget_in_bytes)
}
pub fn settings(&self) -> &IndexSettings {
&self.settings
}
pub fn settings_mut(&mut self) -> &mut IndexSettings {
&mut self.settings
}
pub fn schema(&self) -> Schema {
self.schema.clone()
}
pub fn searchable_segments(&self) -> crate::Result<Vec<Segment>> {
Ok(self
.searchable_segment_metas()?
.into_iter()
.map(|segment_meta| self.segment(segment_meta))
.collect())
}
#[cfg(feature = "quickwit")]
pub async fn searchable_segments_async(&self) -> crate::Result<Vec<Segment>> {
Ok(self
.searchable_segment_metas_async()
.await?
.into_iter()
.map(|segment_meta| self.segment(segment_meta))
.collect())
}
#[doc(hidden)]
pub fn segment(&self, segment_meta: SegmentMeta) -> Segment {
Segment::for_index(self.clone(), segment_meta)
}
pub fn new_segment(&self) -> Segment {
let segment_meta = self
.inventory
.new_segment_meta(SegmentId::generate_random(), 0);
self.segment(segment_meta)
}
pub fn directory(&self) -> &ManagedDirectory {
&self.directory
}
pub fn directory_mut(&mut self) -> &mut ManagedDirectory {
&mut self.directory
}
pub fn searchable_segment_metas(&self) -> crate::Result<Vec<SegmentMeta>> {
Ok(self.load_metas()?.segments)
}
#[cfg(feature = "quickwit")]
pub async fn searchable_segment_metas_async(&self) -> crate::Result<Vec<SegmentMeta>> {
Ok(self.load_metas_async().await?.segments)
}
pub fn searchable_segment_ids(&self) -> crate::Result<Vec<SegmentId>> {
Ok(self
.searchable_segment_metas()?
.iter()
.map(SegmentMeta::id)
.collect())
}
pub fn validate_checksum(&self) -> crate::Result<HashSet<PathBuf>> {
let managed_files = self.directory.list_managed_files();
let active_segments_files: HashSet<PathBuf> = self
.searchable_segment_metas()?
.iter()
.flat_map(|segment_meta| segment_meta.list_files())
.collect();
let active_existing_files: HashSet<&PathBuf> =
active_segments_files.intersection(&managed_files).collect();
let mut damaged_files = HashSet::new();
for path in active_existing_files {
if !self.directory.validate_checksum(path)? {
damaged_files.insert((*path).clone());
}
}
Ok(damaged_files)
}
}
impl fmt::Debug for Index {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Index({:?})", self.directory)
}
}