#[cfg(feature = "native")]
use crate::dsl::Schema;
#[cfg(feature = "native")]
use crate::error::Result;
#[cfg(feature = "native")]
use std::sync::Arc;
mod searcher;
pub use searcher::Searcher;
#[cfg(feature = "native")]
mod primary_key;
#[cfg(feature = "native")]
mod reader;
#[cfg(feature = "native")]
mod vector_builder;
#[cfg(all(feature = "wasm", not(feature = "native")))]
mod wasm_writer;
#[cfg(feature = "native")]
mod writer;
#[cfg(feature = "native")]
pub use primary_key::PrimaryKeyIndex;
#[cfg(feature = "native")]
pub use reader::IndexReader;
#[cfg(all(feature = "wasm", not(feature = "native")))]
pub use wasm_writer::IndexWriter as WasmIndexWriter;
#[cfg(feature = "native")]
pub use writer::{IndexWriter, PreparedCommit};
mod metadata;
pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
#[cfg(feature = "native")]
mod helpers;
#[cfg(feature = "native")]
pub use helpers::{
IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
index_documents_from_reader, index_json_document, parse_schema,
};
pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
#[derive(Debug, Clone)]
pub struct IndexConfig {
pub num_threads: usize,
pub num_indexing_threads: usize,
pub num_compression_threads: usize,
pub term_cache_blocks: usize,
pub store_cache_blocks: usize,
pub max_indexing_memory_bytes: usize,
pub merge_policy: Box<dyn crate::merge::MergePolicy>,
pub optimization: crate::structures::IndexOptimization,
pub reload_interval_ms: u64,
pub max_concurrent_merges: usize,
}
impl Default for IndexConfig {
fn default() -> Self {
#[cfg(feature = "native")]
let indexing_threads = crate::default_indexing_threads();
#[cfg(not(feature = "native"))]
let indexing_threads = 1;
#[cfg(feature = "native")]
let compression_threads = crate::default_compression_threads();
#[cfg(not(feature = "native"))]
let compression_threads = 1;
Self {
num_threads: indexing_threads,
num_indexing_threads: 1, num_compression_threads: compression_threads,
term_cache_blocks: 256,
store_cache_blocks: 32,
max_indexing_memory_bytes: 256 * 1024 * 1024, merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
optimization: crate::structures::IndexOptimization::default(),
reload_interval_ms: 1000, max_concurrent_merges: 4,
}
}
}
#[cfg(feature = "native")]
pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
directory: Arc<D>,
schema: Arc<Schema>,
config: IndexConfig,
segment_manager: Arc<crate::merge::SegmentManager<D>>,
cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
}
#[cfg(feature = "native")]
impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
let directory = Arc::new(directory);
let schema = Arc::new(schema);
let metadata = IndexMetadata::new((*schema).clone());
let segment_manager = Arc::new(crate::merge::SegmentManager::new(
Arc::clone(&directory),
Arc::clone(&schema),
metadata,
config.merge_policy.clone_box(),
config.term_cache_blocks,
config.max_concurrent_merges,
));
segment_manager.update_metadata(|_| {}).await?;
Ok(Self {
directory,
schema,
config,
segment_manager,
cached_reader: tokio::sync::OnceCell::new(),
})
}
pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
let directory = Arc::new(directory);
let metadata = IndexMetadata::load(directory.as_ref()).await?;
let schema = Arc::new(metadata.schema.clone());
let segment_manager = Arc::new(crate::merge::SegmentManager::new(
Arc::clone(&directory),
Arc::clone(&schema),
metadata,
config.merge_policy.clone_box(),
config.term_cache_blocks,
config.max_concurrent_merges,
));
segment_manager.load_and_publish_trained().await;
Ok(Self {
directory,
schema,
config,
segment_manager,
cached_reader: tokio::sync::OnceCell::new(),
})
}
pub fn schema(&self) -> &Schema {
&self.schema
}
pub fn schema_arc(&self) -> &Arc<Schema> {
&self.schema
}
pub fn directory(&self) -> &D {
&self.directory
}
pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
&self.segment_manager
}
pub async fn reader(&self) -> Result<&IndexReader<D>> {
self.cached_reader
.get_or_try_init(|| async {
IndexReader::from_segment_manager(
Arc::clone(&self.schema),
Arc::clone(&self.segment_manager),
self.config.term_cache_blocks,
self.config.reload_interval_ms,
)
.await
})
.await
}
pub fn config(&self) -> &IndexConfig {
&self.config
}
pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
let reader = self.reader().await?;
let searcher = reader.searcher().await?;
Ok(searcher.segment_readers().to_vec())
}
pub async fn num_docs(&self) -> Result<u32> {
let reader = self.reader().await?;
let searcher = reader.searcher().await?;
Ok(searcher.num_docs())
}
pub fn default_fields(&self) -> Vec<crate::Field> {
if !self.schema.default_fields().is_empty() {
self.schema.default_fields().to_vec()
} else {
self.schema
.fields()
.filter(|(_, entry)| {
entry.indexed && entry.field_type == crate::dsl::FieldType::Text
})
.map(|(field, _)| field)
.collect()
}
}
pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
Arc::new(crate::tokenizer::TokenizerRegistry::default())
}
pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
let default_fields = self.default_fields();
let tokenizers = self.tokenizers();
let query_routers = self.schema.query_routers();
if !query_routers.is_empty()
&& let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
{
return crate::dsl::QueryLanguageParser::with_router(
Arc::clone(&self.schema),
default_fields,
tokenizers,
router,
);
}
crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
}
pub async fn query(
&self,
query_str: &str,
limit: usize,
) -> Result<crate::query::SearchResponse> {
self.query_offset(query_str, limit, 0).await
}
pub async fn query_offset(
&self,
query_str: &str,
limit: usize,
offset: usize,
) -> Result<crate::query::SearchResponse> {
let parser = self.query_parser();
let query = parser
.parse(query_str)
.map_err(crate::error::Error::Query)?;
self.search_offset(query.as_ref(), limit, offset).await
}
pub async fn search(
&self,
query: &dyn crate::query::Query,
limit: usize,
) -> Result<crate::query::SearchResponse> {
self.search_offset(query, limit, 0).await
}
pub async fn search_offset(
&self,
query: &dyn crate::query::Query,
limit: usize,
offset: usize,
) -> Result<crate::query::SearchResponse> {
let reader = self.reader().await?;
let searcher = reader.searcher().await?;
#[cfg(feature = "sync")]
let (results, total_seen) = {
let runtime_flavor = tokio::runtime::Handle::current().runtime_flavor();
if runtime_flavor == tokio::runtime::RuntimeFlavor::MultiThread {
tokio::task::block_in_place(|| {
searcher.search_with_offset_and_count_sync(query, limit, offset)
})?
} else {
searcher.search_with_offset_and_count_sync(query, limit, offset)?
}
};
#[cfg(not(feature = "sync"))]
let (results, total_seen) = {
searcher
.search_with_offset_and_count(query, limit, offset)
.await?
};
let total_hits = total_seen;
let hits: Vec<crate::query::SearchHit> = results
.into_iter()
.map(|result| crate::query::SearchHit {
address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
score: result.score,
matched_fields: result.extract_ordinals(),
})
.collect();
Ok(crate::query::SearchResponse { hits, total_hits })
}
pub async fn get_document(
&self,
address: &crate::query::DocAddress,
) -> Result<Option<crate::dsl::Document>> {
let reader = self.reader().await?;
let searcher = reader.searcher().await?;
searcher.get_document(address).await
}
pub async fn get_postings(
&self,
field: crate::Field,
term: &[u8],
) -> Result<
Vec<(
Arc<crate::segment::SegmentReader>,
crate::structures::BlockPostingList,
)>,
> {
let segments = self.segment_readers().await?;
let mut results = Vec::new();
for segment in segments {
if let Some(postings) = segment.get_postings(field, term).await? {
results.push((segment, postings));
}
}
Ok(results)
}
}
#[cfg(feature = "native")]
impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
pub fn writer(&self) -> writer::IndexWriter<D> {
writer::IndexWriter::from_index(self)
}
}
#[cfg(test)]
mod tests;