use add_result::decode_positions_commit;
use ahash::{AHashMap, AHashSet};
use futures::future;
use indexmap::IndexMap;
use itertools::Itertools;
use memmap2::{Mmap, MmapMut, MmapOptions};
use model2vec_rs::model::StaticModel;
use num::FromPrimitive;
use num_derive::FromPrimitive;
use search::{QueryType, Search};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use smallvec::SmallVec;
use snowball_stemmers_rs::{Algorithm, Stemmer};
use std::{
cmp,
collections::HashMap,
fmt::{self},
fs::{self, File},
io::{BufRead, BufReader, Read, Seek, Write},
path::Path,
sync::{Arc, LazyLock},
thread::available_parallelism,
time::Instant,
};
use symspell_complete_rs::{PruningRadixTrie, SymSpell};
use tokio::sync::{RwLock, Semaphore};
use utils::{read_u32, write_u16};
use utoipa::ToSchema;
#[cfg(feature = "zh")]
use crate::word_segmentation::WordSegmentationTM;
use crate::{
INDEX_RUNTIME,
add_result::{self, B, K, SIGMA},
clustering::{ClusterHeader, ParentMedoid},
commit::Commit,
geo_search::encode_morton_2_d,
highlighter::Highlight,
search::{
self, FacetFilter, Point, QueryFacet, QueryRewriting, Ranges, ResultObject, ResultSort,
ResultType, SearchLexicalShard, SearchMode,
},
tokenizer::tokenizer,
utils::{
self, read_u8_ref, read_u16, read_u16_ref, read_u32_ref, read_u64, read_u64_ref, write_f32,
write_f64, write_i8, write_i16, write_i32, write_i64, write_u32, write_u64,
},
vector::{Inference, Model, Precision, Quantization, VectorHeader, read_min_max},
vector_similarity::{TurboQuant, VectorSimilarity},
};
#[cfg(any(
all(
feature = "gxhash",
target_arch = "x86_64",
target_feature = "aes",
target_feature = "sse2"
),
all(
feature = "gxhash",
target_arch = "aarch64",
target_feature = "aes",
target_feature = "neon"
)
))]
use gxhash::{gxhash32, gxhash64};
#[cfg(not(any(
all(
feature = "gxhash",
target_arch = "x86_64",
target_feature = "aes",
target_feature = "sse2"
),
all(
feature = "gxhash",
target_arch = "aarch64",
target_feature = "aes",
target_feature = "neon"
)
)))]
use ahash::RandomState;
pub(crate) const FILE_PATH: &str = "files";
pub(crate) const INDEX_FILENAME: &str = "index.bin";
pub(crate) const DOCSTORE_FILENAME: &str = "docstore.bin";
pub(crate) const DELETE_FILENAME: &str = "delete.bin";
pub(crate) const SCHEMA_FILENAME: &str = "schema.json";
pub(crate) const SYNONYMS_FILENAME: &str = "synonyms.json";
pub(crate) const META_FILENAME: &str = "index.json";
pub(crate) const FACET_FILENAME: &str = "facet.bin";
pub(crate) const FACET_VALUES_FILENAME: &str = "facet.json";
pub(crate) const DICTIONARY_FILENAME: &str = "dictionary.csv";
pub(crate) const COMPLETIONS_FILENAME: &str = "completions.csv";
pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION");
pub(crate) const VECTOR_FILENAME: &str = "vector.bin";
const INDEX_HEADER_SIZE: u64 = 4;
pub const INDEX_FORMAT_VERSION_MAJOR: u16 = 6;
pub const INDEX_FORMAT_VERSION_MINOR: u16 = 1;
pub const MAX_POSITIONS_PER_TERM: usize = 65_536;
pub(crate) const STOP_BIT: u8 = 0b10000000;
pub(crate) const FIELD_STOP_BIT_1: u8 = 0b0010_0000;
pub(crate) const FIELD_STOP_BIT_2: u8 = 0b0100_0000;
pub const ROARING_BLOCK_SIZE: usize = 65_536;
pub(crate) const SPEEDUP_FLAG: bool = true;
pub(crate) const SORT_FLAG: bool = true;
pub(crate) const POSTING_BUFFER_SIZE: usize = 400_000_000;
pub(crate) const MAX_QUERY_TERM_NUMBER: usize = 100;
pub(crate) const SEGMENT_KEY_CAPACITY: usize = 1000;
use tabled::Tabled;
#[derive(Tabled, Clone)]
pub struct Info {
pub entry: &'static str,
pub value: String,
}
#[derive(Deserialize, Serialize, Clone, ToSchema, Debug)]
pub struct SearchRequestObject {
#[serde(rename = "query")]
pub query_string: String,
#[serde(default)]
pub query_vector: Option<Value>,
#[serde(default)]
#[schema(required = false, default = false, example = false)]
pub enable_empty_query: bool,
#[serde(default)]
#[schema(required = false, minimum = 0, default = 0, example = 0)]
pub offset: usize,
#[serde(default = "length_api")]
#[schema(required = false, minimum = 1, default = 10, example = 10)]
pub length: usize,
#[serde(default)]
pub result_type: ResultType,
#[serde(default)]
pub realtime: bool,
#[serde(default)]
pub highlights: Vec<Highlight>,
#[schema(required = false, example = json!(["title"]))]
#[serde(default)]
pub field_filter: Vec<String>,
#[serde(default)]
pub fields: Vec<String>,
#[serde(default)]
pub distance_fields: Vec<DistanceField>,
#[serde(default)]
pub query_facets: Vec<QueryFacet>,
#[serde(default)]
pub facet_filter: Vec<FacetFilter>,
#[schema(required = false, example = json!([{"field": "date", "order": "Ascending", "base": "None" }]))]
#[serde(default)]
pub result_sort: Vec<ResultSort>,
#[schema(required = false, example = QueryType::Intersection)]
#[serde(default = "query_type_api")]
pub query_type_default: QueryType,
#[schema(required = false, example = QueryRewriting::SearchOnly)]
#[serde(default = "query_rewriting_api")]
pub query_rewriting: QueryRewriting,
#[schema(required = false, example = SearchMode::Lexical)]
#[serde(default = "search_mode_api")]
pub search_mode: SearchMode,
}
fn search_mode_api() -> SearchMode {
SearchMode::Lexical
}
fn query_type_api() -> QueryType {
QueryType::Intersection
}
fn query_rewriting_api() -> QueryRewriting {
QueryRewriting::SearchOnly
}
fn length_api() -> usize {
10
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct SearchResultObject {
pub time: u128,
pub original_query: String,
pub query: String,
pub offset: usize,
pub length: usize,
pub count: usize,
pub count_total: usize,
pub query_terms: Vec<String>,
#[schema(value_type=Vec<HashMap<String, serde_json::Value>>)]
pub results: Vec<Document>,
#[schema(value_type=HashMap<String, Vec<(String, usize)>>)]
pub facets: AHashMap<String, Facet>,
pub suggestions: Vec<String>,
}
#[derive(Default, Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct ApikeyQuotaObject {
pub indices_max: usize,
pub indices_size_max: usize,
pub documents_max: usize,
pub operations_max: usize,
pub rate_limit: Option<usize>,
#[serde(skip)]
#[schema(ignore)]
pub timestamp_nanos: usize,
#[serde(skip)]
#[schema(ignore)]
pub violation_count: usize,
}
#[derive(Deserialize, Serialize)]
pub struct ApikeyObject {
pub id: u64,
pub apikey_hash: u128,
pub quota: ApikeyQuotaObject,
#[serde(skip)]
pub index_list: HashMap<u64, IndexArc>,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct CreateIndexRequest {
#[schema(example = "demo_index")]
pub index_name: String,
#[schema(required = true, example = json!([
{"field":"title","field_type":"Text","store":true,"index_lexical":true,"boost":10.0},
{"field":"body","field_type":"Text","store":true,"index_lexical":true,"longest":true},
{"field":"url","field_type":"Text","store":true,"index_lexical":false},
{"field":"date","field_type":"Timestamp","store":true,"index_lexical":false,"facet":true}]))]
#[serde(default)]
pub schema: Vec<SchemaField>,
#[serde(default = "similarity_type_api")]
pub similarity: LexicalSimilarity,
#[serde(default = "tokenizer_type_api")]
pub tokenizer: TokenizerType,
#[serde(default)]
pub stemmer: StemmerType,
#[serde(default)]
pub stop_words: StopwordType,
#[serde(default)]
pub frequent_words: FrequentwordType,
#[serde(default = "ngram_indexing_api")]
pub ngram_indexing: u8,
#[serde(default = "document_compression_api")]
pub document_compression: DocumentCompression,
#[schema(required = false, example = json!([{"terms":["berry","lingonberry","blueberry","gooseberry"],"multiway":false}]))]
#[serde(default)]
pub synonyms: Vec<Synonym>,
#[serde(default)]
pub spelling_correction: Option<SpellingCorrection>,
#[serde(default)]
pub query_completion: Option<QueryCompletion>,
#[serde(default)]
pub clustering: Clustering,
#[serde(default)]
pub inference: Inference,
}
fn similarity_type_api() -> LexicalSimilarity {
LexicalSimilarity::Bm25fProximity
}
fn tokenizer_type_api() -> TokenizerType {
TokenizerType::UnicodeAlphanumeric
}
fn ngram_indexing_api() -> u8 {
NgramSet::NgramFF as u8 | NgramSet::NgramFFF as u8
}
fn document_compression_api() -> DocumentCompression {
DocumentCompression::Snappy
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct DeleteApikeyRequest {
pub apikey_base64: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct GetIteratorRequest {
#[serde(default)]
pub document_id: Option<u64>,
#[serde(default)]
pub skip: usize,
#[serde(default = "default_1usize")]
pub take: isize,
#[serde(default)]
pub include_deleted: bool,
#[serde(default)]
pub include_document: bool,
#[serde(default)]
pub fields: Vec<String>,
}
fn default_1usize() -> isize {
1
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct GetDocumentRequest {
#[serde(default)]
pub query_terms: Vec<String>,
#[serde(default)]
pub highlights: Vec<Highlight>,
#[serde(default)]
pub fields: Vec<String>,
#[serde(default)]
pub distance_fields: Vec<DistanceField>,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct IndexResponseObject {
pub id: u64,
#[schema(example = "demo_index")]
pub name: String,
#[schema(example = json!({
"title":{
"field":"title",
"store":true,
"index_lexical":true,
"field_type":"Text",
"boost":10.0,
"field_id":0
},
"body":{
"field":"body",
"store":true,
"index_lexical":true,
"field_type":"Text",
"field_id":1
},
"url":{
"field":"url",
"store":true,
"index_lexical":false,
"field_type":"Text",
"field_id":2
},
"date":{
"field":"date",
"store":true,
"index_lexical":false,
"field_type":"Timestamp",
"facet":true,
"field_id":3
}
}))]
pub schema: HashMap<String, SchemaField>,
pub indexed_doc_count: usize,
pub committed_doc_count: usize,
pub operations_count: u64,
pub query_count: u64,
#[schema(example = "0.11.1")]
pub version: String,
#[schema(example = json!({"date":{"min":831306011,"max":1730901447}}))]
pub facets_minmax: HashMap<String, MinMaxFieldJson>,
}
pub type Document = IndexMap<String, Value>;
#[derive(Clone, PartialEq)]
pub enum FileType {
Path(Box<Path>),
Bytes(Box<Path>, Box<[u8]>),
None,
}
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, ToSchema)]
pub enum DocumentCompression {
None,
Lz4,
Snappy,
Zstd,
}
impl fmt::Display for DocumentCompression {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
DocumentCompression::None => write!(f, "None"),
DocumentCompression::Lz4 => write!(f, "Lz4"),
DocumentCompression::Snappy => write!(f, "Snappy"),
DocumentCompression::Zstd => write!(f, "Zstd"),
}
}
}
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
pub enum AccessType {
Ram = 0,
Mmap = 1,
}
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Default, ToSchema)]
pub enum LexicalSimilarity {
Bm25f = 0,
#[default]
Bm25fProximity = 1,
}
impl fmt::Display for LexicalSimilarity {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
LexicalSimilarity::Bm25f => write!(f, "Bm25f"),
LexicalSimilarity::Bm25fProximity => write!(f, "Bm25fProximity"),
}
}
}
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Copy, Default, ToSchema)]
pub enum TokenizerType {
#[default]
AsciiAlphabetic = 0,
UnicodeAlphanumeric = 1,
UnicodeAlphanumericFolded = 2,
Whitespace = 3,
WhitespaceLowercase = 4,
#[cfg(feature = "zh")]
UnicodeAlphanumericZH = 5,
}
impl fmt::Display for TokenizerType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TokenizerType::AsciiAlphabetic => write!(f, "AsciiAlphabetic"),
TokenizerType::UnicodeAlphanumeric => write!(f, "UnicodeAlphanumeric"),
TokenizerType::UnicodeAlphanumericFolded => write!(f, "UnicodeAlphanumericFolded"),
TokenizerType::Whitespace => write!(f, "Whitespace"),
TokenizerType::WhitespaceLowercase => write!(f, "WhitespaceLowercase"),
#[cfg(feature = "zh")]
TokenizerType::UnicodeAlphanumericZH => write!(f, "UnicodeAlphanumericZH"),
}
}
}
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Copy, Default, ToSchema)]
pub enum StemmerType {
#[default]
None = 0,
Arabic = 1,
Armenian = 2,
Basque = 3,
Catalan = 4,
Czech = 5,
Danish = 6,
Dutch = 7,
DutchPorter = 8,
English = 9,
Esperanto = 10,
Estonian = 11,
Finnish = 12,
French = 13,
German = 14,
Greek = 15,
Hindi = 16,
Hungarian = 17,
Indonesian = 18,
Irish = 19,
Italian = 20,
Lithuanian = 21,
Lovins = 22,
Nepali = 23,
Norwegian = 24,
Persian = 25,
Polish = 26,
Porter = 27,
Portuguese = 28,
Romanian = 29,
Russian = 30,
Serbian = 31,
Sesotho = 32,
Spanish = 33,
Swedish = 34,
Tamil = 35,
Turkish = 36,
Ukrainian = 37,
Yiddish = 38,
}
impl fmt::Display for StemmerType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
StemmerType::None => write!(f, "None"),
StemmerType::Arabic => write!(f, "Arabic"),
StemmerType::Armenian => write!(f, "Armenian"),
StemmerType::Basque => write!(f, "Basque"),
StemmerType::Catalan => write!(f, "Catalan"),
StemmerType::Czech => write!(f, "Czech"),
StemmerType::Danish => write!(f, "Danish"),
StemmerType::Dutch => write!(f, "Dutch"),
StemmerType::DutchPorter => write!(f, "DutchPorter"),
StemmerType::English => write!(f, "English"),
StemmerType::Esperanto => write!(f, "Esperanto"),
StemmerType::Estonian => write!(f, "Estonian"),
StemmerType::Finnish => write!(f, "Finnish"),
StemmerType::French => write!(f, "French"),
StemmerType::German => write!(f, "German"),
StemmerType::Greek => write!(f, "Greek"),
StemmerType::Hindi => write!(f, "Hindi"),
StemmerType::Hungarian => write!(f, "Hungarian"),
StemmerType::Indonesian => write!(f, "Indonesian"),
StemmerType::Irish => write!(f, "Irish"),
StemmerType::Italian => write!(f, "Italian"),
StemmerType::Lithuanian => write!(f, "Lithuanian"),
StemmerType::Lovins => write!(f, "Lovins"),
StemmerType::Nepali => write!(f, "Nepali"),
StemmerType::Norwegian => write!(f, "Norwegian"),
StemmerType::Persian => write!(f, "Persian"),
StemmerType::Polish => write!(f, "Polish"),
StemmerType::Porter => write!(f, "Porter"),
StemmerType::Portuguese => write!(f, "Portuguese"),
StemmerType::Romanian => write!(f, "Romanian"),
StemmerType::Russian => write!(f, "Russian"),
StemmerType::Serbian => write!(f, "Serbian"),
StemmerType::Sesotho => write!(f, "Sesotho"),
StemmerType::Spanish => write!(f, "Spanish"),
StemmerType::Swedish => write!(f, "Swedish"),
StemmerType::Tamil => write!(f, "Tamil"),
StemmerType::Turkish => write!(f, "Turkish"),
StemmerType::Ukrainian => write!(f, "Ukrainian"),
StemmerType::Yiddish => write!(f, "Yiddish"),
}
}
}
pub(crate) struct LevelIndex {
pub document_length_compressed_array: Vec<[u8; ROARING_BLOCK_SIZE]>,
pub docstore_pointer_docs: Vec<u8>,
pub docstore_pointer_docs_pointer: usize,
pub document_length_compressed_array_pointer: usize,
}
#[derive(Default, Debug, Deserialize, Serialize, Clone)]
pub(crate) struct BlockObjectIndex {
pub max_block_score: f32,
pub block_id: u32,
pub compression_type_pointer: u32,
pub posting_count: u16,
pub max_docid: u16,
pub max_p_docid: u16,
pub pointer_pivot_p_docid: u16,
}
#[derive(Default)]
pub(crate) struct PostingListObjectIndex {
pub posting_count: u32,
pub posting_count_ngram_1: u32,
pub posting_count_ngram_2: u32,
pub posting_count_ngram_3: u32,
pub posting_count_ngram_1_compressed: u8,
pub posting_count_ngram_2_compressed: u8,
pub posting_count_ngram_3_compressed: u8,
pub max_list_score: f32,
pub blocks: Vec<BlockObjectIndex>,
pub position_range_previous: u32,
}
#[derive(Default, Debug, Deserialize, Serialize, Clone)]
pub(crate) struct PostingListObject0 {
pub pointer_first: usize,
pub pointer_last: usize,
pub posting_count: usize,
pub max_block_score: f32,
pub max_docid: u16,
pub max_p_docid: u16,
pub ngram_type: NgramType,
pub term_ngram1: String,
pub term_ngram2: String,
pub term_ngram3: String,
pub posting_count_ngram_1: f32,
pub posting_count_ngram_2: f32,
pub posting_count_ngram_3: f32,
pub posting_count_ngram_1_compressed: u8,
pub posting_count_ngram_2_compressed: u8,
pub posting_count_ngram_3_compressed: u8,
pub position_count: usize,
pub pointer_pivot_p_docid: u16,
pub size_compressed_positions_key: usize,
pub docid_delta_max: u16,
pub docid_old: u16,
pub compression_type_pointer: u32,
}
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, FromPrimitive)]
pub(crate) enum CompressionType {
Delta = 0,
Array = 1,
Bitmap = 2,
Rle = 3,
}
pub(crate) struct QueueObject<'a> {
pub query_list: Vec<PostingListObjectQuery<'a>>,
pub query_index: usize,
pub max_score: f32,
}
#[derive(Clone)]
pub(crate) struct PostingListObjectQuery<'a> {
pub posting_count: u32,
pub max_list_score: f32,
pub blocks: &'a Vec<BlockObjectIndex>,
pub blocks_index: usize,
pub term: String,
pub key0: u32,
pub compression_type: CompressionType,
pub rank_position_pointer_range: u32,
pub compressed_doc_id_range: usize,
pub pointer_pivot_p_docid: u16,
pub posting_pointer: usize,
pub posting_pointer_previous: usize,
pub byte_array: &'a [u8],
pub p_block: i32,
pub p_block_max: i32,
pub p_docid: usize,
pub p_docid_count: usize,
pub rangebits: i32,
pub docid: i32,
pub bitposition: u32,
pub intersect: u64,
pub ulong_pos: usize,
pub run_end: i32,
pub p_run: i32,
pub p_run_count: i32,
pub p_run_sum: i32,
pub term_index_unique: usize,
pub positions_count: u32,
pub positions_pointer: u32,
pub idf: f32,
pub idf_ngram1: f32,
pub idf_ngram2: f32,
pub idf_ngram3: f32,
pub tf_ngram1: u32,
pub tf_ngram2: u32,
pub tf_ngram3: u32,
pub ngram_type: NgramType,
pub end_flag: bool,
pub end_flag_block: bool,
pub is_embedded: bool,
pub embedded_positions: [u32; 4],
pub field_vec: SmallVec<[(u16, usize); 2]>,
pub field_vec_ngram1: SmallVec<[(u16, usize); 2]>,
pub field_vec_ngram2: SmallVec<[(u16, usize); 2]>,
pub field_vec_ngram3: SmallVec<[(u16, usize); 2]>,
pub bm25_flag: bool,
}
pub(crate) static DUMMY_VEC: Vec<BlockObjectIndex> = Vec::new();
pub(crate) static DUMMY_VEC_8: Vec<u8> = Vec::new();
impl Default for PostingListObjectQuery<'_> {
fn default() -> Self {
Self {
posting_count: 0,
max_list_score: 0.0,
blocks: &DUMMY_VEC,
blocks_index: 0,
term: "".to_string(),
key0: 0,
compression_type: CompressionType::Delta,
rank_position_pointer_range: 0,
compressed_doc_id_range: 0,
pointer_pivot_p_docid: 0,
posting_pointer: 0,
posting_pointer_previous: 0,
byte_array: &DUMMY_VEC_8,
p_block: 0,
p_block_max: 0,
p_docid: 0,
p_docid_count: 0,
rangebits: 0,
docid: 0,
bitposition: 0,
run_end: 0,
p_run: 0,
p_run_count: 0,
p_run_sum: 0,
term_index_unique: 0,
positions_count: 0,
positions_pointer: 0,
idf: 0.0,
idf_ngram1: 0.0,
idf_ngram2: 0.0,
idf_ngram3: 0.0,
ngram_type: NgramType::SingleTerm,
is_embedded: false,
embedded_positions: [0; 4],
field_vec: SmallVec::new(),
tf_ngram1: 0,
tf_ngram2: 0,
tf_ngram3: 0,
field_vec_ngram1: SmallVec::new(),
field_vec_ngram2: SmallVec::new(),
field_vec_ngram3: SmallVec::new(),
end_flag: false,
end_flag_block: false,
bm25_flag: true,
intersect: 0,
ulong_pos: 0,
}
}
}
#[derive(Clone)]
pub(crate) struct NonUniquePostingListObjectQuery<'a> {
pub term_index_unique: usize,
pub term_index_nonunique: usize,
pub pos: u32,
pub p_pos: i32,
pub positions_pointer: usize,
pub positions_count: u32,
pub byte_array: &'a [u8],
pub key0: u32,
pub is_embedded: bool,
pub embedded_positions: [u32; 4],
pub p_field: usize,
pub field_vec: SmallVec<[(u16, usize); 2]>,
}
pub(crate) struct SegmentIndex {
pub byte_array_blocks: Vec<Vec<u8>>,
pub byte_array_blocks_pointer: Vec<(usize, usize, u32)>,
pub segment: AHashMap<u64, PostingListObjectIndex>,
}
#[derive(Default, Debug, Clone)]
pub(crate) struct SegmentLevel0 {
pub segment: AHashMap<u64, PostingListObject0>,
pub positions_compressed: Vec<u8>,
}
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Default, ToSchema)]
pub enum FieldType {
U8,
U16,
U32,
U64,
I8,
I16,
I32,
I64,
Timestamp,
F32,
F64,
Bool,
String16,
String32,
StringSet16,
StringSet32,
Point,
#[default]
Text,
Json,
Binary,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct Synonym {
pub terms: Vec<String>,
#[serde(default = "default_as_true")]
pub multiway: bool,
}
fn default_as_true() -> bool {
true
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema, Default)]
pub struct SchemaField {
pub field: String,
pub store: bool,
pub index_lexical: bool,
#[serde(skip_serializing_if = "is_default_bool")]
#[serde(default = "default_false")]
pub index_vector: bool,
pub field_type: FieldType,
#[serde(skip_serializing_if = "is_default_bool")]
#[serde(default = "default_false")]
pub facet: bool,
#[serde(skip_serializing_if = "is_default_bool")]
#[serde(default = "default_false")]
pub longest: bool,
#[serde(skip_serializing_if = "is_default_f32")]
#[serde(default = "default_1")]
pub boost: f32,
#[serde(skip_serializing_if = "is_default_bool")]
#[serde(default = "default_false")]
pub dictionary_source: bool,
#[serde(skip_serializing_if = "is_default_bool")]
#[serde(default = "default_false")]
pub completion_source: bool,
#[serde(skip)]
pub(crate) indexed_field_id: usize,
#[serde(skip_deserializing)]
pub(crate) field_id: usize,
}
impl SchemaField {
#[allow(clippy::too_many_arguments)]
pub fn new(
field: String,
store: bool,
index_lexical: bool,
index_vector: bool,
field_type: FieldType,
facet: bool,
longest: bool,
boost: f32,
dictionary_source: bool,
completion_source: bool,
) -> Self {
SchemaField {
field,
store,
index_lexical,
index_vector,
field_type,
facet,
longest,
boost,
dictionary_source,
completion_source,
indexed_field_id: 0,
field_id: 0,
}
}
}
fn default_false() -> bool {
false
}
fn is_default_bool(num: &bool) -> bool {
!(*num)
}
fn default_1() -> f32 {
1.0
}
fn is_default_f32(num: &f32) -> bool {
*num == 1.0
}
pub(crate) struct IndexedField {
pub schema_field_name: String,
pub field_length_sum: usize,
pub indexed_field_id: usize,
pub is_longest_field: bool,
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, ToSchema)]
pub enum StopwordType {
#[default]
None,
English,
German,
French,
Spanish,
Custom {
terms: Vec<String>,
},
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, ToSchema)]
pub enum FrequentwordType {
None,
#[default]
English,
German,
French,
Spanish,
Custom {
terms: Vec<String>,
},
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct SpellingCorrection {
pub max_dictionary_edit_distance: usize,
pub term_length_threshold: Option<Vec<usize>>,
pub count_threshold: usize,
pub max_dictionary_entries: usize,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct QueryCompletion {
pub max_completion_entries: usize,
}
#[derive(Clone, Copy, Debug, Deserialize, Serialize, Default, ToSchema)]
pub enum Clustering {
None,
#[default]
Auto,
Fixed(usize),
}
impl fmt::Display for Clustering {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Clustering::None => write!(f, "None"),
Clustering::Auto => write!(f, "Auto"),
Clustering::Fixed(value) => write!(f, "Fixed({})", value),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct IndexMetaObject {
pub id: u64,
pub name: String,
pub lexical_similarity: LexicalSimilarity,
pub tokenizer: TokenizerType,
pub stemmer: StemmerType,
#[serde(default)]
pub stop_words: StopwordType,
#[serde(default)]
pub frequent_words: FrequentwordType,
#[serde(default = "ngram_indexing_default")]
pub ngram_indexing: u8,
#[serde(default = "doc_store_compression_default")]
pub document_compression: DocumentCompression,
pub access_type: AccessType,
#[serde(default)]
pub spelling_correction: Option<SpellingCorrection>,
#[serde(default)]
pub query_completion: Option<QueryCompletion>,
#[serde(default)]
pub clustering: Clustering,
#[serde(default)]
pub inference: Inference,
}
fn ngram_indexing_default() -> u8 {
NgramSet::NgramFF as u8 | NgramSet::NgramFFF as u8
}
fn doc_store_compression_default() -> DocumentCompression {
DocumentCompression::Snappy
}
#[derive(Debug, Clone, Default)]
pub(crate) struct ResultFacet {
pub field: String,
pub values: AHashMap<u32, usize>,
pub prefix: String,
pub length: u32,
pub ranges: Ranges,
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, ToSchema)]
pub enum DistanceUnit {
Kilometers,
Miles,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct DistanceField {
pub field: String,
pub distance: String,
pub base: Point,
pub unit: DistanceUnit,
}
impl Default for DistanceField {
fn default() -> Self {
DistanceField {
field: String::new(),
distance: String::new(),
base: Vec::new(),
unit: DistanceUnit::Kilometers,
}
}
}
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
pub struct MinMaxField {
pub min: ValueType,
pub max: ValueType,
}
#[derive(Deserialize, Serialize, Debug, Clone, Default, ToSchema)]
pub struct MinMaxFieldJson {
pub min: Value,
pub max: Value,
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq)]
pub enum ValueType {
U8(u8),
U16(u16),
U32(u32),
U64(u64),
I8(i8),
I16(i16),
I32(i32),
I64(i64),
Timestamp(i64),
F32(f32),
F64(f64),
Point(Point, DistanceUnit),
#[default]
None,
}
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
pub struct FacetField {
pub name: String,
pub values: IndexMap<String, (Vec<String>, usize)>,
pub min: ValueType,
pub max: ValueType,
#[serde(skip)]
pub(crate) offset: usize,
#[serde(skip)]
pub(crate) field_type: FieldType,
}
pub type Facet = Vec<(String, usize)>;
pub type ShardArc = Arc<RwLock<Shard>>;
pub type IndexArc = Arc<RwLock<Index>>;
pub struct Shard {
pub index_format_version_major: u16,
pub index_format_version_minor: u16,
pub indexed_doc_count: usize,
pub(crate) indexed_vector_count: usize,
pub(crate) indexed_cluster_count: usize,
pub committed_doc_count: usize,
pub(crate) uncommitted: bool,
pub(crate) modified: bool,
pub schema_map: HashMap<String, SchemaField>,
pub stored_field_names: Vec<String>,
pub meta: IndexMetaObject,
pub(crate) is_last_level_incomplete: bool,
pub(crate) last_level_index_file_start_pos: u64,
pub(crate) last_level_docstore_file_start_pos: u64,
pub(crate) last_level_vector_file_start_pos: u64,
pub(crate) semaphore: Arc<Semaphore>,
pub(crate) docstore_file: File,
pub(crate) docstore_file_mmap: Mmap,
pub(crate) delete_file: File,
pub(crate) delete_hashset: AHashSet<usize>,
pub(crate) index_file: File,
pub(crate) index_path_string: String,
pub(crate) index_file_mmap: Mmap,
pub(crate) compressed_index_segment_block_buffer: Vec<u8>,
pub(crate) compressed_docstore_segment_block_buffer: Vec<u8>,
pub(crate) segment_number1: usize,
pub(crate) segment_number_bits1: usize,
pub(crate) document_length_normalized_average: f32,
pub(crate) positions_sum_normalized: u64,
pub(crate) level_index: Vec<LevelIndex>,
pub(crate) segments_index: Vec<SegmentIndex>,
pub(crate) segments_level0: Vec<SegmentLevel0>,
pub(crate) enable_fallback: bool,
pub(crate) enable_single_term_topk: bool,
pub(crate) enable_search_quality_test: bool,
pub(crate) enable_inter_query_threading: bool,
pub(crate) enable_inter_query_threading_auto: bool,
pub(crate) segment_number_mask1: u32,
pub(crate) indexed_field_vec: Vec<IndexedField>,
pub(crate) indexed_field_id_bits: usize,
pub(crate) indexed_field_id_mask: usize,
pub(crate) longest_field_id: usize,
pub(crate) longest_field_auto: bool,
pub(crate) indexed_schema_vec: Vec<SchemaField>,
pub(crate) document_length_compressed_array: Vec<[u8; ROARING_BLOCK_SIZE]>,
pub(crate) key_count_sum: u64,
pub(crate) block_id: usize,
pub(crate) strip_compressed_sum: u64,
pub(crate) postings_buffer: Vec<u8>,
pub(crate) postings_buffer_pointer: usize,
pub(crate) size_compressed_positions_index: u64,
pub(crate) size_compressed_docid_index: u64,
pub(crate) postinglist_count: usize,
pub(crate) docid_count: usize,
pub(crate) position_count: usize,
pub(crate) mute: bool,
pub(crate) frequentword_results: AHashMap<String, ResultObject>,
pub(crate) facets: Vec<FacetField>,
pub(crate) facets_map: AHashMap<String, usize>,
pub(crate) facets_size_sum: usize,
pub(crate) facets_file: File,
pub(crate) facets_file_mmap: MmapMut,
pub(crate) bm25_component_cache: [f32; 256],
pub(crate) string_set_to_single_term_id_vec: Vec<AHashMap<String, AHashSet<u32>>>,
pub(crate) synonyms_map: AHashMap<u64, SynonymItem>,
#[cfg(feature = "zh")]
pub(crate) word_segmentation_option: Option<WordSegmentationTM>,
pub(crate) shard_number: usize,
pub(crate) index_option: Option<Arc<RwLock<Index>>>,
pub(crate) stemmer: Option<Stemmer>,
pub(crate) stop_words: AHashSet<String>,
pub(crate) frequent_words: Vec<String>,
pub(crate) frequent_hashset: AHashSet<u64>,
pub(crate) key_head_size: usize,
pub(crate) level_terms: AHashMap<u32, String>,
pub(crate) level_completions: Arc<RwLock<AHashMap<Vec<String>, usize>>>,
pub(crate) is_avx2: bool,
pub(crate) is_neon: bool,
pub(crate) is_simd: bool,
pub(crate) is_vector_indexing: bool,
pub(crate) is_lexical_indexing: bool,
pub(crate) chunks_meta: Vec<(u16, u32, u32)>,
pub(crate) chunks_string: Vec<String>,
pub(crate) vector_file: File,
pub(crate) vector_file_mmap: Mmap,
pub(crate) block_vector_buffer: Vec<ParentMedoid>,
pub(crate) vector_dimensions: usize,
pub(crate) vector_dimensions_original: usize,
pub(crate) vector_precision: Precision,
pub(crate) quantization: Quantization,
pub(crate) vector_similarity: VectorSimilarity,
pub(crate) chunk_size: usize,
pub(crate) min_vector_value: f32,
pub(crate) max_vector_value: f32,
pub(crate) turbo_quant: TurboQuant,
}
pub struct Index {
pub(crate) docid_global: Arc<RwLock<usize>>,
pub index_format_version_major: u16,
pub index_format_version_minor: u16,
pub(crate) indexed_doc_count: usize,
pub indexed_vector_count: usize,
pub indexed_cluster_count: usize,
pub(crate) deleted_doc_count: usize,
pub schema_map: HashMap<String, SchemaField>,
pub stored_field_names: Vec<String>,
pub meta: IndexMetaObject,
pub(crate) index_file: File,
pub(crate) index_path_string: String,
pub(crate) compressed_index_segment_block_buffer: Vec<u8>,
pub(crate) segment_number1: usize,
pub(crate) segment_number_mask1: u32,
pub(crate) indexed_field_vec: Vec<IndexedField>,
pub(crate) mute: bool,
pub(crate) facets: Vec<FacetField>,
pub(crate) synonyms_map: AHashMap<u64, SynonymItem>,
pub(crate) shard_number: usize,
pub(crate) shard_vec: Vec<Arc<RwLock<Shard>>>,
pub(crate) max_dictionary_entries: usize,
pub(crate) symspell_option: Option<Arc<RwLock<SymSpell>>>,
pub(crate) max_completion_entries: usize,
pub(crate) completion_option: Option<Arc<RwLock<PruningRadixTrie>>>,
pub(crate) frequent_hashset: AHashSet<String>,
pub(crate) embedding_model_option: Option<StaticModel>,
pub vector_precision: Precision,
pub(crate) quantization: Quantization,
pub vector_dimensions: usize,
pub vector_dimensions_original: usize,
pub vector_similarity: VectorSimilarity,
pub(crate) is_avx2: bool,
pub(crate) is_neon: bool,
pub(crate) is_simd: bool,
pub(crate) is_vector_indexing: bool,
pub(crate) is_lexical_indexing: bool,
pub(crate) chunk_size: usize,
pub(crate) turbo_quant: TurboQuant,
}
pub type SynonymItem = Vec<(String, (u64, u32))>;
pub fn version() -> &'static str {
VERSION
}
pub(crate) fn get_synonyms_map(
synonyms: &[Synonym],
segment_number_mask1: u32,
) -> AHashMap<u64, SynonymItem> {
let mut synonyms_map: AHashMap<u64, SynonymItem> = AHashMap::new();
for synonym in synonyms.iter() {
if synonym.terms.len() > 1 {
let mut hashes: Vec<(String, (u64, u32))> = Vec::new();
for term in synonym.terms.iter() {
let term_bytes = term.to_lowercase();
hashes.push((
term.to_string(),
(
hash64(term_bytes.as_bytes()),
hash32(term_bytes.as_bytes()) & segment_number_mask1,
),
));
}
if synonym.multiway {
for (i, hash) in hashes.iter().enumerate() {
let new_synonyms = if i == 0 {
hashes[1..].to_vec()
} else if i == hashes.len() - 1 {
hashes[..hashes.len() - 1].to_vec()
} else {
[&hashes[..i], &hashes[(i + 1)..]].concat()
};
if let Some(item) = synonyms_map.get_mut(&hash.1.0) {
*item = item
.clone()
.into_iter()
.chain(new_synonyms)
.collect::<HashMap<String, (u64, u32)>>()
.into_iter()
.collect();
} else {
synonyms_map.insert(hash.1.0, new_synonyms);
}
}
} else {
synonyms_map.insert(hashes[0].1.0, hashes[1..].to_vec());
}
}
}
synonyms_map
}
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, FromPrimitive)]
pub enum NgramSet {
SingleTerm = 0b00000000,
NgramFF = 0b00000001,
NgramFR = 0b00000010,
NgramRF = 0b00000100,
NgramFFF = 0b00001000,
NgramRFF = 0b00010000,
NgramFFR = 0b00100000,
NgramFRF = 0b01000000,
}
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, FromPrimitive, Default)]
pub(crate) enum NgramType {
#[default]
SingleTerm = 0,
NgramFF = 1,
NgramFR = 2,
NgramRF = 3,
NgramFFF = 4,
NgramRFF = 5,
NgramFFR = 6,
NgramFRF = 7,
}
pub async fn create_index(
index_path: &Path,
meta: IndexMetaObject,
schema: &Vec<SchemaField>,
synonyms: &Vec<Synonym>,
segment_number_bits1: usize,
mute: bool,
force_shard_number: Option<usize>,
) -> Result<IndexArc, String> {
create_index_root(
index_path,
meta,
schema,
true,
synonyms,
segment_number_bits1,
mute,
force_shard_number,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn create_index_root(
index_path: &Path,
meta: IndexMetaObject,
#[allow(clippy::ptr_arg)] schema: &Vec<SchemaField>,
serialize_schema: bool,
synonyms: &Vec<Synonym>,
segment_number_bits1: usize,
mute: bool,
force_shard_number: Option<usize>,
) -> Result<IndexArc, String> {
let frequent_hashset: AHashSet<String> = match &meta.frequent_words {
FrequentwordType::None => AHashSet::new(),
FrequentwordType::English => FREQUENT_EN.lines().map(|x| x.to_string()).collect(),
FrequentwordType::German => FREQUENT_EN.lines().map(|x| x.to_string()).collect(),
FrequentwordType::French => FREQUENT_FR.lines().map(|x| x.to_string()).collect(),
FrequentwordType::Spanish => FREQUENT_ES.lines().map(|x| x.to_string()).collect(),
FrequentwordType::Custom { terms } => terms.iter().map(|x| x.to_string()).collect(),
};
let segment_number1 = 1usize << segment_number_bits1;
let segment_number_mask1 = (1u32 << segment_number_bits1) - 1;
let index_path_buf = index_path.to_path_buf();
let index_path_string = index_path_buf.to_str().unwrap();
if !index_path.exists() {
if !mute {
println!("index path created: {} ", index_path_string);
}
fs::create_dir_all(index_path).unwrap();
}
let file_path = Path::new(index_path_string).join(FILE_PATH);
if !file_path.exists() {
fs::create_dir_all(file_path).unwrap();
}
match File::options()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(Path::new(index_path).join(INDEX_FILENAME))
{
Ok(index_file) => {
let mut is_vector_indexing = false;
let mut is_lexical_indexing = false;
let mut schema = schema.clone();
for schema_field in schema.iter_mut() {
if schema_field.field_type == FieldType::Binary && schema_field.index_lexical {
schema_field.index_lexical = false;
}
if schema_field.index_vector {
is_vector_indexing = true;
}
if schema_field.index_lexical {
is_lexical_indexing = true;
}
}
let mut document_length_compressed_array: Vec<[u8; ROARING_BLOCK_SIZE]> = Vec::new();
let mut indexed_field_vec: Vec<IndexedField> = Vec::new();
let mut facets_vec: Vec<FacetField> = Vec::new();
let mut facets_map: AHashMap<String, usize> = AHashMap::new();
let mut schema_map: HashMap<String, SchemaField> = HashMap::new();
let mut indexed_schema_vec: Vec<SchemaField> = Vec::new();
let mut stored_field_names = Vec::new();
let mut facets_size_sum = 0;
let mut longest_field_id_option: Option<usize> = None;
for (i, schema_field) in schema.iter().enumerate() {
let mut schema_field_clone = schema_field.clone();
schema_field_clone.indexed_field_id = indexed_field_vec.len();
if schema_field.longest && schema_field.index_lexical {
longest_field_id_option = Some(schema_field_clone.indexed_field_id);
}
schema_field_clone.field_id = i;
schema_map.insert(schema_field.field.clone(), schema_field_clone.clone());
if schema_field.facet {
let facet_size = match schema_field.field_type {
FieldType::U8 => 1,
FieldType::U16 => 2,
FieldType::U32 => 4,
FieldType::U64 => 8,
FieldType::I8 => 1,
FieldType::I16 => 2,
FieldType::I32 => 4,
FieldType::I64 => 8,
FieldType::Timestamp => 8,
FieldType::F32 => 4,
FieldType::F64 => 8,
FieldType::String16 => 2,
FieldType::String32 => 4,
FieldType::StringSet16 => 2,
FieldType::StringSet32 => 4,
FieldType::Point => 8,
_ => 1,
};
facets_map.insert(schema_field.field.clone(), facets_vec.len());
facets_vec.push(FacetField {
name: schema_field.field.clone(),
values: IndexMap::new(),
min: ValueType::None,
max: ValueType::None,
offset: facets_size_sum,
field_type: schema_field.field_type.clone(),
});
facets_size_sum += facet_size;
}
if schema_field.index_lexical || schema_field.index_vector {
indexed_field_vec.push(IndexedField {
schema_field_name: schema_field.field.clone(),
is_longest_field: false,
field_length_sum: 0,
indexed_field_id: indexed_field_vec.len(),
});
indexed_schema_vec.push(schema_field_clone);
document_length_compressed_array.push([0; ROARING_BLOCK_SIZE]);
}
if schema_field.store {
stored_field_names.push(schema_field.field.clone());
}
}
if !facets_vec.is_empty()
&& let Ok(file) = File::open(Path::new(index_path).join(FACET_VALUES_FILENAME))
&& let Ok(facets) = serde_json::from_reader(BufReader::new(file))
{
let mut facets: Vec<FacetField> = facets;
if facets_vec.len() == facets.len() {
for i in 0..facets.len() {
facets[i].offset = facets_vec[i].offset;
facets[i].field_type = facets_vec[i].field_type.clone();
}
}
facets_vec = facets;
}
let synonyms_map = get_synonyms_map(synonyms, segment_number_mask1);
let shard_number = if let Some(shard_number) = force_shard_number {
shard_number
} else {
cmp::min(
available_parallelism()
.map(|n| n.get())
.unwrap_or_else(|_| num_cpus::get_physical()),
num_cpus::get_physical(),
)
};
let (
vector_dimensions,
embedding_model_option,
vector_precision,
chunk_size,
quantization,
vector_similarity,
) = if is_vector_indexing {
let (
dimensions,
model_path,
precision,
chunk_size,
quantization,
vector_similarity,
) = match &meta.inference {
Inference::Model2Vec {
model,
chunk_size,
quantization,
} => {
let chunk_size = *chunk_size.max(&10);
match model {
Model::PotionBase2M => (
0,
"minishlab/potion-base-2M",
Precision::F32,
chunk_size,
*quantization,
VectorSimilarity::Cosine,
),
Model::PotionBase4M => (
0,
"minishlab/potion-base-4M",
Precision::F32,
chunk_size,
*quantization,
VectorSimilarity::Cosine,
),
Model::PotionBase8M => (
0,
"minishlab/potion-base-8M",
Precision::F32,
chunk_size,
*quantization,
VectorSimilarity::Cosine,
),
Model::PotionBase32M => (
0,
"minishlab/potion-base-32M",
Precision::F32,
chunk_size,
*quantization,
VectorSimilarity::Cosine,
),
Model::PotionMultilingual128M => (
0,
"minishlab/potion-multilingual-128M",
Precision::F32,
chunk_size,
*quantization,
VectorSimilarity::Cosine,
),
Model::PotionRetrieval32M => (
0,
"minishlab/potion-retrieval-32M",
Precision::F32,
chunk_size,
*quantization,
VectorSimilarity::Cosine,
),
Model::PotionCode16M => (
0,
"minishlab/potion-code-16M",
Precision::F32,
chunk_size,
*quantization,
VectorSimilarity::Cosine,
),
}
}
Inference::Model2VecCustom {
path,
chunk_size,
quantization,
} => (
0,
path.as_str(),
Precision::F32,
*chunk_size,
*quantization,
VectorSimilarity::Cosine,
),
Inference::External {
dimensions: vector_dimensions,
precision: vector_precision,
quantization: vector_quantization,
similarity,
} => (
*vector_dimensions,
"",
*vector_precision,
0,
*vector_quantization,
*similarity,
),
Inference::None => (
0,
"",
Precision::None,
0,
Quantization::None,
VectorSimilarity::Cosine,
),
};
if !model_path.is_empty() {
let model =
Some(StaticModel::from_pretrained(model_path, None, None, None).unwrap());
let dimensions = model.as_ref().unwrap().encode(&["test".to_string()])[0].len();
(
dimensions,
model,
precision,
chunk_size,
quantization,
VectorSimilarity::Cosine,
)
} else {
(
dimensions,
None,
precision,
chunk_size,
quantization,
vector_similarity,
)
}
} else {
(
0,
None,
Precision::None,
0,
Quantization::None,
VectorSimilarity::Cosine,
)
};
let turbo_quant = if quantization == Quantization::TurboQuantI8 {
TurboQuant::new(vector_dimensions, 1234)
} else {
TurboQuant::new(0, 1234)
};
let vector_dimensions_original = vector_dimensions;
let vector_dimensions = if quantization == Quantization::TurboQuantI8
&& vector_precision == Precision::F32
{
TurboQuant::next_power_of_two(vector_dimensions)
} else {
vector_dimensions
};
let mut shard_vec: Vec<Arc<RwLock<Shard>>> = Vec::new();
if serialize_schema {
let mut result_object_list = Vec::new();
let index_path_clone = Arc::new(index_path.to_path_buf());
for i in 0..shard_number {
let index_path_clone2 = index_path_clone.clone();
let meta_clone = meta.clone();
let schema_clone = schema.clone();
let turbo_quant_clone = turbo_quant.clone();
result_object_list.push(tokio::spawn(async move {
let shard_path = index_path_clone2.join("shards").join(i.to_string());
let mut shard_meta = meta_clone.clone();
shard_meta.id = i as u64;
let mut shard = create_shard(
&shard_path,
&shard_meta,
&schema_clone,
serialize_schema,
&Vec::new(),
segment_number_bits1,
mute,
longest_field_id_option,
)
.unwrap();
shard.shard_number = shard_number;
shard.vector_dimensions = vector_dimensions;
shard.vector_dimensions_original = vector_dimensions_original;
shard.vector_precision = vector_precision;
shard.quantization = quantization;
shard.vector_similarity = vector_similarity;
shard.is_avx2 = *IS_AVX2;
shard.is_neon = *IS_NEON;
shard.is_simd = *IS_SIMD;
shard.chunk_size = chunk_size;
shard.turbo_quant = turbo_quant_clone;
let shard_arc = Arc::new(RwLock::new(shard));
(shard_arc, i)
}));
}
for result_object_shard in result_object_list {
let ro_shard = result_object_shard.await.unwrap();
shard_vec.push(ro_shard.0);
}
}
let mut index = Index {
docid_global: Arc::new(RwLock::new(0)),
index_format_version_major: INDEX_FORMAT_VERSION_MAJOR,
index_format_version_minor: INDEX_FORMAT_VERSION_MINOR,
index_file,
index_path_string: index_path_string.to_owned(),
stored_field_names,
compressed_index_segment_block_buffer: vec![0; 10_000_000],
indexed_doc_count: 0,
indexed_vector_count: 0,
indexed_cluster_count: 0,
deleted_doc_count: 0,
segment_number1: 0,
segment_number_mask1: 0,
schema_map,
indexed_field_vec,
meta: meta.clone(),
mute,
facets: facets_vec,
synonyms_map,
shard_number,
shard_vec,
max_dictionary_entries: if let Some(spelling_correction) = &meta.spelling_correction
{
spelling_correction.max_dictionary_entries
} else {
usize::MAX
},
symspell_option: if let Some(spelling_correction) = meta.spelling_correction {
Some(Arc::new(RwLock::new(SymSpell::new(
spelling_correction.max_dictionary_edit_distance,
spelling_correction.term_length_threshold,
7,
spelling_correction.count_threshold,
))))
} else {
None
},
max_completion_entries: if let Some(query_completion) = &meta.query_completion {
query_completion.max_completion_entries
} else {
usize::MAX
},
completion_option: meta
.query_completion
.as_ref()
.map(|_query_completion| Arc::new(RwLock::new(PruningRadixTrie::new()))),
frequent_hashset,
embedding_model_option,
vector_dimensions,
vector_dimensions_original,
vector_precision,
quantization,
vector_similarity,
is_avx2: *IS_AVX2,
is_neon: *IS_NEON,
is_simd: *IS_SIMD,
is_vector_indexing,
is_lexical_indexing,
chunk_size,
turbo_quant,
};
let file_len = index.index_file.metadata().unwrap().len();
if file_len == 0 {
write_u16(
INDEX_FORMAT_VERSION_MAJOR,
&mut index.compressed_index_segment_block_buffer,
0,
);
write_u16(
INDEX_FORMAT_VERSION_MINOR,
&mut index.compressed_index_segment_block_buffer,
2,
);
let _ = index.index_file.write(
&index.compressed_index_segment_block_buffer[0..INDEX_HEADER_SIZE as usize],
);
} else {
let _ = index.index_file.read(
&mut index.compressed_index_segment_block_buffer[0..INDEX_HEADER_SIZE as usize],
);
index.index_format_version_major =
read_u16(&index.compressed_index_segment_block_buffer, 0);
index.index_format_version_minor =
read_u16(&index.compressed_index_segment_block_buffer, 2);
if INDEX_FORMAT_VERSION_MAJOR != index.index_format_version_major {
return Err("incompatible index format version ".to_string()
+ &INDEX_FORMAT_VERSION_MAJOR.to_string()
+ " "
+ &index.index_format_version_major.to_string());
};
if index.index_format_version_major == 6 && index.index_format_version_minor == 0 {
index.meta.document_compression = DocumentCompression::Zstd;
}
}
index.segment_number1 = segment_number1;
index.segment_number_mask1 = segment_number_mask1;
if serialize_schema {
serde_json::to_writer(
&File::create(Path::new(index_path).join(SCHEMA_FILENAME)).unwrap(),
&schema,
)
.unwrap();
if !synonyms.is_empty() {
serde_json::to_writer(
&File::create(Path::new(index_path).join(SYNONYMS_FILENAME)).unwrap(),
&synonyms,
)
.unwrap();
}
serde_json::to_writer(
&File::create(Path::new(index_path).join(META_FILENAME)).unwrap(),
&index.meta,
)
.unwrap();
}
let index_arc = Arc::new(RwLock::new(index));
if serialize_schema {
for shard in index_arc.write().await.shard_vec.iter() {
shard.write().await.index_option = Some(index_arc.clone());
}
}
Ok(index_arc)
}
Err(e) => {
println!("file opening error");
Err(e.to_string())
}
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn create_shard(
index_path: &Path,
meta: &IndexMetaObject,
schema: &Vec<SchemaField>,
serialize_schema: bool,
synonyms: &Vec<Synonym>,
segment_number_bits1: usize,
mute: bool,
longest_field_id_option: Option<usize>,
) -> Result<Shard, String> {
let segment_number1 = 1usize << segment_number_bits1;
let segment_number_mask1 = (1u32 << segment_number_bits1) - 1;
let index_path_buf = index_path.to_path_buf();
let index_path_string = index_path_buf.to_str().unwrap();
if !index_path.exists() {
fs::create_dir_all(index_path).unwrap();
}
let file_path = Path::new(index_path_string).join(FILE_PATH);
if !file_path.exists() {
fs::create_dir_all(file_path).unwrap();
}
match File::options()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(Path::new(index_path).join(INDEX_FILENAME))
{
Ok(index_file) => {
let docstore_file = File::options()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(Path::new(index_path).join(DOCSTORE_FILENAME))
.unwrap();
let delete_file = File::options()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(Path::new(index_path).join(DELETE_FILENAME))
.unwrap();
let facets_file = File::options()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(Path::new(index_path).join(FACET_FILENAME))
.unwrap();
let vector_file = File::options()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(Path::new(index_path).join(VECTOR_FILENAME))
.unwrap();
let mut document_length_compressed_array: Vec<[u8; ROARING_BLOCK_SIZE]> = Vec::new();
let mut indexed_field_vec: Vec<IndexedField> = Vec::new();
let mut facets_vec: Vec<FacetField> = Vec::new();
let mut facets_map: AHashMap<String, usize> = AHashMap::new();
let mut schema_map: HashMap<String, SchemaField> = HashMap::new();
let mut indexed_schema_vec: Vec<SchemaField> = Vec::new();
let mut stored_fields_flag = false;
let mut is_vector_indexing = false;
let mut is_lexical_indexing = false;
let mut stored_field_names = Vec::new();
let mut facets_size_sum = 0;
for (i, schema_field) in schema.iter().enumerate() {
if schema_field.index_vector {
is_vector_indexing = true;
}
if schema_field.index_lexical {
is_lexical_indexing = true;
}
let mut schema_field_clone = schema_field.clone();
schema_field_clone.indexed_field_id = indexed_field_vec.len();
schema_field_clone.field_id = i;
schema_map.insert(schema_field.field.clone(), schema_field_clone.clone());
if schema_field.facet {
let facet_size = match schema_field.field_type {
FieldType::U8 => 1,
FieldType::U16 => 2,
FieldType::U32 => 4,
FieldType::U64 => 8,
FieldType::I8 => 1,
FieldType::I16 => 2,
FieldType::I32 => 4,
FieldType::I64 => 8,
FieldType::Timestamp => 8,
FieldType::F32 => 4,
FieldType::F64 => 8,
FieldType::String16 => 2,
FieldType::String32 => 4,
FieldType::StringSet16 => 2,
FieldType::StringSet32 => 4,
FieldType::Point => 8,
_ => 1,
};
facets_map.insert(schema_field.field.clone(), facets_vec.len());
facets_vec.push(FacetField {
name: schema_field.field.clone(),
values: IndexMap::new(),
min: ValueType::None,
max: ValueType::None,
offset: facets_size_sum,
field_type: schema_field.field_type.clone(),
});
facets_size_sum += facet_size;
}
if schema_field.index_lexical || schema_field.index_vector {
indexed_field_vec.push(IndexedField {
schema_field_name: schema_field.field.clone(),
is_longest_field: false,
field_length_sum: 0,
indexed_field_id: indexed_field_vec.len(),
});
indexed_schema_vec.push(schema_field_clone);
document_length_compressed_array.push([0; ROARING_BLOCK_SIZE]);
}
if schema_field.store {
stored_fields_flag = true;
stored_field_names.push(schema_field.field.clone());
}
}
let indexed_field_id_bits =
(usize::BITS - (indexed_field_vec.len() - 1).leading_zeros()) as usize;
let index_file_mmap;
let docstore_file_mmap = if meta.access_type == AccessType::Mmap {
index_file_mmap = unsafe { Mmap::map(&index_file).expect("Unable to create Mmap") };
unsafe { Mmap::map(&docstore_file).expect("Unable to create Mmap") }
} else {
index_file_mmap = unsafe {
MmapOptions::new()
.len(0)
.map(&index_file)
.expect("Unable to create Mmap")
};
unsafe {
MmapOptions::new()
.len(0)
.map(&docstore_file)
.expect("Unable to create Mmap")
}
};
if !facets_vec.is_empty()
&& let Ok(file) = File::open(Path::new(index_path).join(FACET_VALUES_FILENAME))
&& let Ok(facets) = serde_json::from_reader(BufReader::new(file))
{
let mut facets: Vec<FacetField> = facets;
if facets_vec.len() == facets.len() {
for i in 0..facets.len() {
facets[i].offset = facets_vec[i].offset;
facets[i].field_type = facets_vec[i].field_type.clone();
}
}
facets_vec = facets;
}
let facets_file_mmap = if !facets_vec.is_empty() {
if facets_file.metadata().unwrap().len() == 0 {
facets_file
.set_len((facets_size_sum * ROARING_BLOCK_SIZE) as u64)
.expect("Unable to set len");
}
unsafe { MmapMut::map_mut(&facets_file).expect("Unable to create Mmap") }
} else {
unsafe { MmapMut::map_mut(&facets_file).expect("Unable to create Mmap") }
};
let vector_file_mmap =
unsafe { Mmap::map(&vector_file).expect("Unable to create Mmap") };
let synonyms_map = get_synonyms_map(synonyms, segment_number_mask1);
let facets_len = facets_vec.len();
#[cfg(feature = "zh")]
let word_segmentation_option = if meta.tokenizer == TokenizerType::UnicodeAlphanumericZH
{
let mut word_segmentation = WordSegmentationTM::new();
word_segmentation.load_dictionary(0, 1, true);
Some(word_segmentation)
} else {
None
};
let shard_number = 1;
let stemmer = match meta.stemmer {
StemmerType::Arabic => Some(Stemmer::create(Algorithm::Arabic)),
StemmerType::Armenian => Some(Stemmer::create(Algorithm::Armenian)),
StemmerType::Basque => Some(Stemmer::create(Algorithm::Basque)),
StemmerType::Catalan => Some(Stemmer::create(Algorithm::Catalan)),
StemmerType::Czech => Some(Stemmer::create(Algorithm::Czech)),
StemmerType::Danish => Some(Stemmer::create(Algorithm::Danish)),
StemmerType::Dutch => Some(Stemmer::create(Algorithm::Dutch)),
StemmerType::DutchPorter => Some(Stemmer::create(Algorithm::DutchPorter)),
StemmerType::English => Some(Stemmer::create(Algorithm::English)),
StemmerType::Esperanto => Some(Stemmer::create(Algorithm::Esperanto)),
StemmerType::Estonian => Some(Stemmer::create(Algorithm::Estonian)),
StemmerType::Finnish => Some(Stemmer::create(Algorithm::Finnish)),
StemmerType::French => Some(Stemmer::create(Algorithm::French)),
StemmerType::German => Some(Stemmer::create(Algorithm::German)),
StemmerType::Greek => Some(Stemmer::create(Algorithm::Greek)),
StemmerType::Hindi => Some(Stemmer::create(Algorithm::Hindi)),
StemmerType::Hungarian => Some(Stemmer::create(Algorithm::Hungarian)),
StemmerType::Indonesian => Some(Stemmer::create(Algorithm::Indonesian)),
StemmerType::Irish => Some(Stemmer::create(Algorithm::Irish)),
StemmerType::Italian => Some(Stemmer::create(Algorithm::Italian)),
StemmerType::Lithuanian => Some(Stemmer::create(Algorithm::Lithuanian)),
StemmerType::Lovins => Some(Stemmer::create(Algorithm::Lovins)),
StemmerType::Nepali => Some(Stemmer::create(Algorithm::Nepali)),
StemmerType::Norwegian => Some(Stemmer::create(Algorithm::Norwegian)),
StemmerType::Persian => Some(Stemmer::create(Algorithm::Persian)),
StemmerType::Polish => Some(Stemmer::create(Algorithm::Polish)),
StemmerType::Porter => Some(Stemmer::create(Algorithm::Porter)),
StemmerType::Portuguese => Some(Stemmer::create(Algorithm::Portuguese)),
StemmerType::Romanian => Some(Stemmer::create(Algorithm::Romanian)),
StemmerType::Russian => Some(Stemmer::create(Algorithm::Russian)),
StemmerType::Serbian => Some(Stemmer::create(Algorithm::Serbian)),
StemmerType::Sesotho => Some(Stemmer::create(Algorithm::Sesotho)),
StemmerType::Spanish => Some(Stemmer::create(Algorithm::Spanish)),
StemmerType::Swedish => Some(Stemmer::create(Algorithm::Swedish)),
StemmerType::Tamil => Some(Stemmer::create(Algorithm::Tamil)),
StemmerType::Turkish => Some(Stemmer::create(Algorithm::Turkish)),
StemmerType::Ukrainian => Some(Stemmer::create(Algorithm::Ukrainian)),
StemmerType::Yiddish => Some(Stemmer::create(Algorithm::Yiddish)),
_ => None,
};
let stop_words: AHashSet<String> = match &meta.stop_words {
StopwordType::None => AHashSet::new(),
StopwordType::English => FREQUENT_EN.lines().map(|x| x.to_string()).collect(),
StopwordType::German => FREQUENT_DE.lines().map(|x| x.to_string()).collect(),
StopwordType::French => FREQUENT_FR.lines().map(|x| x.to_string()).collect(),
StopwordType::Spanish => FREQUENT_ES.lines().map(|x| x.to_string()).collect(),
StopwordType::Custom { terms } => terms.iter().map(|x| x.to_string()).collect(),
};
let frequent_words: Vec<String> = match &meta.frequent_words {
FrequentwordType::None => Vec::new(),
FrequentwordType::English => {
let mut words: Vec<String> =
FREQUENT_EN.lines().map(|x| x.to_string()).collect();
words.sort_unstable();
words
}
FrequentwordType::German => {
let mut words: Vec<String> =
FREQUENT_DE.lines().map(|x| x.to_string()).collect();
words.sort_unstable();
words
}
FrequentwordType::French => {
let mut words: Vec<String> =
FREQUENT_FR.lines().map(|x| x.to_string()).collect();
words.sort_unstable();
words
}
FrequentwordType::Spanish => {
let mut words: Vec<String> =
FREQUENT_ES.lines().map(|x| x.to_string()).collect();
words.sort_unstable();
words
}
FrequentwordType::Custom { terms } => {
let mut words: Vec<String> = terms.iter().map(|x| x.to_string()).collect();
words.sort_unstable();
words
}
};
let frequent_hashset: AHashSet<u64> = frequent_words
.iter()
.map(|x| hash64(x.as_bytes()))
.collect();
let mut index = Shard {
semaphore: Arc::new(Semaphore::new(1)),
index_format_version_major: INDEX_FORMAT_VERSION_MAJOR,
index_format_version_minor: INDEX_FORMAT_VERSION_MINOR,
docstore_file,
delete_file,
delete_hashset: AHashSet::new(),
index_file,
index_path_string: index_path_string.to_owned(),
index_file_mmap,
docstore_file_mmap,
stored_field_names,
compressed_index_segment_block_buffer: vec![0; 10_000_000],
compressed_docstore_segment_block_buffer: if stored_fields_flag {
vec![0; ROARING_BLOCK_SIZE * 4]
} else {
Vec::new()
},
document_length_normalized_average: 0.0,
indexed_doc_count: 0,
committed_doc_count: 0,
is_last_level_incomplete: false,
last_level_index_file_start_pos: 0,
last_level_docstore_file_start_pos: 0,
last_level_vector_file_start_pos: 0,
positions_sum_normalized: 0,
segment_number1: 0,
segment_number_bits1,
segment_number_mask1: 0,
level_index: Vec::new(),
segments_index: Vec::new(),
segments_level0: Vec::new(),
uncommitted: false,
modified: false,
enable_fallback: false,
enable_single_term_topk: false,
enable_search_quality_test: false,
enable_inter_query_threading: false,
enable_inter_query_threading_auto: false,
schema_map,
indexed_field_id_bits,
indexed_field_id_mask: (1usize << indexed_field_id_bits) - 1,
longest_field_id: longest_field_id_option.unwrap_or_default(),
longest_field_auto: longest_field_id_option.is_none(),
indexed_field_vec,
indexed_schema_vec,
meta: meta.clone(),
document_length_compressed_array,
key_count_sum: 0,
block_id: 0,
strip_compressed_sum: 0,
postings_buffer: vec![0; POSTING_BUFFER_SIZE],
postings_buffer_pointer: 0,
docid_count: 0,
size_compressed_docid_index: 0,
size_compressed_positions_index: 0,
position_count: 0,
postinglist_count: 0,
mute,
frequentword_results: AHashMap::new(),
facets: facets_vec,
facets_map,
facets_size_sum,
facets_file,
facets_file_mmap,
string_set_to_single_term_id_vec: vec![AHashMap::new(); facets_len],
bm25_component_cache: [0.0; 256],
synonyms_map,
#[cfg(feature = "zh")]
word_segmentation_option,
shard_number,
index_option: None,
stemmer,
stop_words,
frequent_words,
frequent_hashset,
key_head_size: if meta.ngram_indexing == 0 {
20
} else if meta.ngram_indexing < 8 {
22
} else {
23
},
level_terms: AHashMap::new(),
level_completions: Arc::new(RwLock::new(AHashMap::with_capacity(200_000))),
chunks_meta: Vec::new(),
chunks_string: Vec::new(),
vector_file,
vector_file_mmap,
indexed_vector_count: 0,
indexed_cluster_count: 0,
is_vector_indexing,
is_lexical_indexing,
block_vector_buffer: Vec::new(),
vector_dimensions: 0,
vector_dimensions_original: 0,
vector_precision: Precision::None,
quantization: Quantization::None,
vector_similarity: VectorSimilarity::Dot,
is_avx2: false,
is_neon: false,
is_simd: false,
chunk_size: 0,
min_vector_value: f32::MAX,
max_vector_value: f32::MIN,
turbo_quant: TurboQuant::new(0, 1234),
};
let file_len = index.index_file.metadata().unwrap().len();
if file_len == 0 {
write_u16(
INDEX_FORMAT_VERSION_MAJOR,
&mut index.compressed_index_segment_block_buffer,
0,
);
write_u16(
INDEX_FORMAT_VERSION_MINOR,
&mut index.compressed_index_segment_block_buffer,
2,
);
let _ = index.index_file.write(
&index.compressed_index_segment_block_buffer[0..INDEX_HEADER_SIZE as usize],
);
} else {
let _ = index.index_file.read(
&mut index.compressed_index_segment_block_buffer[0..INDEX_HEADER_SIZE as usize],
);
index.index_format_version_major =
read_u16(&index.compressed_index_segment_block_buffer, 0);
index.index_format_version_minor =
read_u16(&index.compressed_index_segment_block_buffer, 2);
if INDEX_FORMAT_VERSION_MAJOR != index.index_format_version_major {
return Err("incompatible index format version ".to_string()
+ &INDEX_FORMAT_VERSION_MAJOR.to_string()
+ " "
+ &index.index_format_version_major.to_string());
};
if index.index_format_version_major == 6 && index.index_format_version_minor == 0 {
index.meta.document_compression = DocumentCompression::Zstd;
}
}
index.segment_number1 = segment_number1;
index.segment_number_mask1 = segment_number_mask1;
index.segments_level0 = vec![
SegmentLevel0 {
segment: AHashMap::with_capacity(SEGMENT_KEY_CAPACITY),
..Default::default()
};
index.segment_number1
];
index.segments_index = Vec::new();
for _i in 0..index.segment_number1 {
index.segments_index.push(SegmentIndex {
byte_array_blocks: Vec::new(),
byte_array_blocks_pointer: Vec::new(),
segment: AHashMap::new(),
});
}
if serialize_schema {
serde_json::to_writer(
&File::create(Path::new(index_path).join(SCHEMA_FILENAME)).unwrap(),
&schema,
)
.unwrap();
if !synonyms.is_empty() {
serde_json::to_writer(
&File::create(Path::new(index_path).join(SYNONYMS_FILENAME)).unwrap(),
&synonyms,
)
.unwrap();
}
serde_json::to_writer(
&File::create(Path::new(index_path).join(META_FILENAME)).unwrap(),
&index.meta,
)
.unwrap();
}
Ok(index)
}
Err(e) => {
println!("file opening error");
Err(e.to_string())
}
}
}
#[inline(always)]
pub(crate) fn get_document_length_compressed_mmap(
index: &Shard,
field_id: usize,
block_id: usize,
doc_id_block: usize,
) -> u8 {
index.index_file_mmap[index.level_index[block_id].document_length_compressed_array_pointer
+ (field_id << 16)
+ doc_id_block]
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn get_max_score(
index: &Shard,
segment: &SegmentIndex,
posting_count_ngram_1: u32,
posting_count_ngram_2: u32,
posting_count_ngram_3: u32,
posting_count: u32,
block_id: usize,
max_docid: usize,
max_p_docid: usize,
pointer_pivot_p_docid: usize,
compression_type_pointer: u32,
ngram_type: &NgramType,
) -> f32 {
let byte_array = if index.meta.access_type == AccessType::Mmap {
&index.index_file_mmap[segment.byte_array_blocks_pointer[block_id].0
..segment.byte_array_blocks_pointer[block_id].0
+ segment.byte_array_blocks_pointer[block_id].1]
} else {
&segment.byte_array_blocks[block_id]
};
let mut bm25f = 0.0;
let rank_position_pointer_range: u32 =
compression_type_pointer & 0b0011_1111_1111_1111_1111_1111_1111_1111;
let posting_pointer_size_sum;
let rank_position_pointer;
let posting_pointer_size;
let embed_flag;
if max_p_docid < pointer_pivot_p_docid {
posting_pointer_size_sum = max_p_docid as u32 * 2;
rank_position_pointer = read_u16(
byte_array,
rank_position_pointer_range as usize + posting_pointer_size_sum as usize,
) as u32;
posting_pointer_size = 2;
embed_flag = (rank_position_pointer & 0b10000000_00000000) != 0;
} else {
posting_pointer_size_sum = (max_p_docid as u32) * 3 - pointer_pivot_p_docid as u32;
rank_position_pointer = read_u32(
byte_array,
rank_position_pointer_range as usize + posting_pointer_size_sum as usize,
);
posting_pointer_size = 3;
embed_flag = (rank_position_pointer & 0b10000000_00000000_00000000) != 0;
};
let positions_pointer = if embed_flag {
rank_position_pointer_range as usize + posting_pointer_size_sum as usize
} else {
let pointer_value = if posting_pointer_size == 2 {
rank_position_pointer & 0b01111111_11111111
} else {
rank_position_pointer & 0b01111111_11111111_11111111
} as usize;
rank_position_pointer_range as usize - pointer_value
};
let mut field_vec: SmallVec<[(u16, usize); 2]> = SmallVec::new();
let mut field_vec_ngram1 = SmallVec::new();
let mut field_vec_ngram2 = SmallVec::new();
let mut field_vec_ngram3 = SmallVec::new();
decode_positions_commit(
posting_pointer_size,
embed_flag,
byte_array,
positions_pointer,
ngram_type,
index.indexed_field_vec.len(),
index.indexed_field_id_bits,
index.indexed_field_id_mask,
index.longest_field_id as u16,
&mut field_vec,
&mut field_vec_ngram1,
&mut field_vec_ngram2,
&mut field_vec_ngram3,
);
if ngram_type == &NgramType::SingleTerm
|| index.meta.lexical_similarity == LexicalSimilarity::Bm25fProximity
{
let idf = (((index.indexed_doc_count as f32 - posting_count as f32 + 0.5)
/ (posting_count as f32 + 0.5))
+ 1.0)
.ln();
for field in field_vec.iter() {
let document_length_normalized = DOCUMENT_LENGTH_COMPRESSION[if index.meta.access_type
== AccessType::Mmap
{
get_document_length_compressed_mmap(index, field.0 as usize, block_id, max_docid)
} else {
index.level_index[block_id].document_length_compressed_array[field.0 as usize]
[max_docid]
} as usize] as f32;
let document_length_quotient =
document_length_normalized / index.document_length_normalized_average;
let tf = field.1 as f32;
let weight = index.indexed_schema_vec[field.0 as usize].boost;
bm25f += weight
* idf
* ((tf * (K + 1.0) / (tf + (K * (1.0 - B + (B * document_length_quotient)))))
+ SIGMA);
}
} else if ngram_type == &NgramType::NgramFF
|| ngram_type == &NgramType::NgramFR
|| ngram_type == &NgramType::NgramRF
{
let idf_ngram1 = (((index.indexed_doc_count as f32 - posting_count_ngram_1 as f32 + 0.5)
/ (posting_count_ngram_1 as f32 + 0.5))
+ 1.0)
.ln();
let idf_ngram2 = (((index.indexed_doc_count as f32 - posting_count_ngram_2 as f32 + 0.5)
/ (posting_count_ngram_2 as f32 + 0.5))
+ 1.0)
.ln();
for field in field_vec_ngram1.iter() {
let document_length_normalized = DOCUMENT_LENGTH_COMPRESSION[if index.meta.access_type
== AccessType::Mmap
{
get_document_length_compressed_mmap(index, field.0 as usize, block_id, max_docid)
} else {
index.level_index[block_id].document_length_compressed_array[field.0 as usize]
[max_docid]
} as usize] as f32;
let document_length_quotient =
document_length_normalized / index.document_length_normalized_average;
let tf_ngram1 = field.1 as f32;
let weight = index.indexed_schema_vec[field.0 as usize].boost;
bm25f += weight
* idf_ngram1
* ((tf_ngram1 * (K + 1.0)
/ (tf_ngram1 + (K * (1.0 - B + (B * document_length_quotient)))))
+ SIGMA);
}
for field in field_vec_ngram2.iter() {
let document_length_normalized = DOCUMENT_LENGTH_COMPRESSION[if index.meta.access_type
== AccessType::Mmap
{
get_document_length_compressed_mmap(index, field.0 as usize, block_id, max_docid)
} else {
index.level_index[block_id].document_length_compressed_array[field.0 as usize]
[max_docid]
} as usize] as f32;
let document_length_quotient =
document_length_normalized / index.document_length_normalized_average;
let tf_ngram2 = field.1 as f32;
let weight = index.indexed_schema_vec[field.0 as usize].boost;
bm25f += weight
* idf_ngram2
* ((tf_ngram2 * (K + 1.0)
/ (tf_ngram2 + (K * (1.0 - B + (B * document_length_quotient)))))
+ SIGMA);
}
} else {
let idf_ngram1 = (((index.indexed_doc_count as f32 - posting_count_ngram_1 as f32 + 0.5)
/ (posting_count_ngram_1 as f32 + 0.5))
+ 1.0)
.ln();
let idf_ngram2 = (((index.indexed_doc_count as f32 - posting_count_ngram_2 as f32 + 0.5)
/ (posting_count_ngram_2 as f32 + 0.5))
+ 1.0)
.ln();
let idf_ngram3 = (((index.indexed_doc_count as f32 - posting_count_ngram_3 as f32 + 0.5)
/ (posting_count_ngram_3 as f32 + 0.5))
+ 1.0)
.ln();
for field in field_vec_ngram1.iter() {
let document_length_normalized = DOCUMENT_LENGTH_COMPRESSION[if index.meta.access_type
== AccessType::Mmap
{
get_document_length_compressed_mmap(index, field.0 as usize, block_id, max_docid)
} else {
index.level_index[block_id].document_length_compressed_array[field.0 as usize]
[max_docid]
} as usize] as f32;
let document_length_quotient =
document_length_normalized / index.document_length_normalized_average;
let tf_ngram1 = field.1 as f32;
let weight = index.indexed_schema_vec[field.0 as usize].boost;
bm25f += weight
* idf_ngram1
* ((tf_ngram1 * (K + 1.0)
/ (tf_ngram1 + (K * (1.0 - B + (B * document_length_quotient)))))
+ SIGMA);
}
for field in field_vec_ngram2.iter() {
let document_length_normalized = DOCUMENT_LENGTH_COMPRESSION[if index.meta.access_type
== AccessType::Mmap
{
get_document_length_compressed_mmap(index, field.0 as usize, block_id, max_docid)
} else {
index.level_index[block_id].document_length_compressed_array[field.0 as usize]
[max_docid]
} as usize] as f32;
let document_length_quotient =
document_length_normalized / index.document_length_normalized_average;
let tf_ngram2 = field.1 as f32;
let weight = index.indexed_schema_vec[field.0 as usize].boost;
bm25f += weight
* idf_ngram2
* ((tf_ngram2 * (K + 1.0)
/ (tf_ngram2 + (K * (1.0 - B + (B * document_length_quotient)))))
+ SIGMA);
}
for field in field_vec_ngram3.iter() {
let document_length_normalized = DOCUMENT_LENGTH_COMPRESSION[if index.meta.access_type
== AccessType::Mmap
{
get_document_length_compressed_mmap(index, field.0 as usize, block_id, max_docid)
} else {
index.level_index[block_id].document_length_compressed_array[field.0 as usize]
[max_docid]
} as usize] as f32;
let document_length_quotient =
document_length_normalized / index.document_length_normalized_average;
let tf_ngram3 = field.1 as f32;
let weight = index.indexed_schema_vec[field.0 as usize].boost;
bm25f += weight
* idf_ngram3
* ((tf_ngram3 * (K + 1.0)
/ (tf_ngram3 + (K * (1.0 - B + (B * document_length_quotient)))))
+ SIGMA);
}
}
bm25f
}
pub(crate) fn update_list_max_impact_score(index: &mut Shard) {
if index.meta.access_type == AccessType::Mmap {
return;
}
for key0 in 0..index.segment_number1 {
let keys: Vec<u64> = index.segments_index[key0].segment.keys().cloned().collect();
for key in keys {
let ngram_type = FromPrimitive::from_u64(key & 0b111).unwrap_or(NgramType::SingleTerm);
let blocks_len = index.segments_index[key0].segment[&key].blocks.len();
let mut max_list_score = 0.0;
for block_index in 0..blocks_len {
let segment = &index.segments_index[key0];
let posting_list = &segment.segment[&key];
let block = &posting_list.blocks[block_index];
let max_block_score = get_max_score(
index,
segment,
posting_list.posting_count_ngram_1,
posting_list.posting_count_ngram_2,
posting_list.posting_count_ngram_3,
posting_list.posting_count,
block.block_id as usize,
block.max_docid as usize,
block.max_p_docid as usize,
block.pointer_pivot_p_docid as usize,
block.compression_type_pointer,
&ngram_type,
);
index.segments_index[key0]
.segment
.get_mut(&key)
.unwrap()
.blocks[block_index]
.max_block_score = max_block_score;
max_list_score = f32::max(max_list_score, max_block_score);
}
index.segments_index[key0]
.segment
.get_mut(&key)
.unwrap()
.max_list_score = max_list_score;
}
}
}
pub(crate) async fn open_shard(
index_path: &Path,
mute: bool,
vector_type: Precision,
vector_dimensions: usize,
) -> Result<ShardArc, String> {
if !mute {
println!("opening index ...");
}
let mut index_mmap_position = INDEX_HEADER_SIZE as usize;
let mut docstore_mmap_position = 0;
let vector_size = size_of::<VectorHeader>()
+ (vector_dimensions
* match vector_type {
Precision::F32 => 4,
Precision::I8 => 1,
Precision::None => 0,
});
match File::open(Path::new(index_path).join(META_FILENAME)) {
Ok(meta_file) => {
let meta: IndexMetaObject = serde_json::from_reader(BufReader::new(meta_file)).unwrap();
match File::open(Path::new(index_path).join(SCHEMA_FILENAME)) {
Ok(schema_file) => {
let schema = serde_json::from_reader(BufReader::new(schema_file)).unwrap();
let synonyms = if let Ok(synonym_file) =
File::open(Path::new(index_path).join(SYNONYMS_FILENAME))
{
serde_json::from_reader(BufReader::new(synonym_file)).unwrap_or_default()
} else {
Vec::new()
};
match create_shard(index_path, &meta, &schema, false, &synonyms, 11, mute, None)
{
Ok(mut shard) => {
let mut block_count_sum = 0;
let is_mmap = shard.meta.access_type == AccessType::Mmap;
let file_len = if is_mmap {
shard.index_file_mmap.len() as u64
} else {
shard.index_file.metadata().unwrap().len()
};
while if is_mmap {
index_mmap_position as u64
} else {
shard.index_file.stream_position().unwrap()
} < file_len
{
let mut segment_head_vec: Vec<(u32, u32)> = Vec::new();
for key0 in 0..shard.segment_number1 {
if key0 == 0 {
shard.last_level_index_file_start_pos = if is_mmap {
index_mmap_position as u64
} else {
shard.index_file.stream_position().unwrap()
};
shard.last_level_docstore_file_start_pos = if is_mmap {
docstore_mmap_position as u64
} else {
shard.docstore_file.stream_position().unwrap()
};
if shard.level_index.is_empty() {
let longest_field_id = if is_mmap {
read_u16_ref(
&shard.index_file_mmap,
&mut index_mmap_position,
)
as usize
} else {
let _ = shard.index_file.read(
&mut shard
.compressed_index_segment_block_buffer
[0..2],
);
read_u16(
&shard.compressed_index_segment_block_buffer,
0,
)
as usize
};
for indexed_field in shard.indexed_field_vec.iter_mut()
{
indexed_field.is_longest_field = indexed_field
.indexed_field_id
== longest_field_id;
if indexed_field.is_longest_field {
shard.longest_field_id = longest_field_id
}
}
}
let mut document_length_compressed_array_vec: Vec<
[u8; ROARING_BLOCK_SIZE],
> = Vec::new();
let document_length_compressed_array_pointer = if is_mmap {
index_mmap_position
} else {
shard.index_file.stream_position().unwrap() as usize
};
for _i in 0..shard.indexed_field_vec.len() {
if is_mmap {
index_mmap_position += ROARING_BLOCK_SIZE;
} else {
let mut document_length_compressed_array_item =
[0u8; ROARING_BLOCK_SIZE];
let _ = shard.index_file.read(
&mut document_length_compressed_array_item,
);
document_length_compressed_array_vec
.push(document_length_compressed_array_item);
}
}
let mut docstore_pointer_docs: Vec<u8> = Vec::new();
let mut docstore_pointer_docs_pointer = 0;
if !shard.stored_field_names.is_empty() {
if is_mmap {
let docstore_pointer_docs_size = read_u32_ref(
&shard.docstore_file_mmap,
&mut docstore_mmap_position,
)
as usize;
docstore_pointer_docs_pointer =
docstore_mmap_position;
docstore_mmap_position +=
docstore_pointer_docs_size;
} else {
let _ = shard.docstore_file.read(
&mut shard
.compressed_index_segment_block_buffer
[0..4],
);
let docstore_pointer_docs_size = read_u32(
&shard.compressed_index_segment_block_buffer,
0,
)
as usize;
docstore_pointer_docs_pointer =
shard.docstore_file.stream_position().unwrap()
as usize;
docstore_pointer_docs =
vec![0; docstore_pointer_docs_size];
let _ = shard
.docstore_file
.read(&mut docstore_pointer_docs);
}
}
if is_mmap {
let _previous_indexed_doc_count =
shard.indexed_doc_count;
shard.indexed_doc_count = read_u64_ref(
&shard.index_file_mmap,
&mut index_mmap_position,
)
as usize;
shard.positions_sum_normalized = read_u64_ref(
&shard.index_file_mmap,
&mut index_mmap_position,
);
for _key0 in 0..shard.segment_number1 {
let block_length = read_u32_ref(
&shard.index_file_mmap,
&mut index_mmap_position,
);
let key_count = read_u32_ref(
&shard.index_file_mmap,
&mut index_mmap_position,
);
segment_head_vec.push((block_length, key_count));
}
} else {
let _ = shard.index_file.read(
&mut shard.compressed_index_segment_block_buffer
[0..16],
);
shard.indexed_doc_count = read_u64(
&shard.compressed_index_segment_block_buffer,
0,
)
as usize;
shard.positions_sum_normalized = read_u64(
&shard.compressed_index_segment_block_buffer,
8,
);
for _key0 in 0..shard.segment_number1 {
let _ = shard.index_file.read(
&mut shard
.compressed_index_segment_block_buffer
[0..8],
);
let block_length = read_u32(
&shard.compressed_index_segment_block_buffer,
0,
);
let key_count = read_u32(
&shard.compressed_index_segment_block_buffer,
4,
);
segment_head_vec.push((block_length, key_count));
}
}
shard.document_length_normalized_average =
shard.positions_sum_normalized as f32
/ shard.indexed_doc_count as f32;
shard.level_index.push(LevelIndex {
document_length_compressed_array:
document_length_compressed_array_vec,
docstore_pointer_docs,
docstore_pointer_docs_pointer,
document_length_compressed_array_pointer,
});
}
let block_length = segment_head_vec[key0].0;
let key_count = segment_head_vec[key0].1;
let block_id =
(block_count_sum >> shard.segment_number_bits1) as u32;
block_count_sum += 1;
let key_body_pointer_write_start: u32 =
key_count * shard.key_head_size as u32;
if is_mmap {
index_mmap_position +=
key_count as usize * shard.key_head_size;
shard.segments_index[key0].byte_array_blocks_pointer.push(
(
index_mmap_position,
(block_length - key_body_pointer_write_start)
as usize,
key_count,
),
);
index_mmap_position +=
(block_length - key_body_pointer_write_start) as usize;
} else {
let _ = shard.index_file.read(
&mut shard.compressed_index_segment_block_buffer
[0..(key_count as usize * shard.key_head_size)],
);
let compressed_index_segment_block_buffer = &shard
.compressed_index_segment_block_buffer
[0..(key_count as usize * shard.key_head_size)];
let mut block_array: Vec<u8> = vec![
0;
(block_length - key_body_pointer_write_start)
as usize
];
let _ = shard.index_file.read(&mut block_array);
shard.segments_index[key0]
.byte_array_blocks
.push(block_array);
let mut read_pointer = 0;
let mut posting_count_previous = 0;
let mut pointer_pivot_p_docid_previous = 0;
let mut compression_type_pointer_previous = 0;
for key_index in 0..key_count {
let key_hash = read_u64_ref(
compressed_index_segment_block_buffer,
&mut read_pointer,
);
let posting_count = read_u16_ref(
compressed_index_segment_block_buffer,
&mut read_pointer,
);
let max_docid = read_u16_ref(
compressed_index_segment_block_buffer,
&mut read_pointer,
);
let max_p_docid = read_u16_ref(
compressed_index_segment_block_buffer,
&mut read_pointer,
);
let mut posting_count_ngram_1 = 0;
let mut posting_count_ngram_2 = 0;
let mut posting_count_ngram_3 = 0;
match shard.key_head_size {
20 => {}
22 => {
let posting_count_ngram_1_compressed =
read_u8_ref(
compressed_index_segment_block_buffer,
&mut read_pointer,
);
posting_count_ngram_1 =
DOCUMENT_LENGTH_COMPRESSION
[posting_count_ngram_1_compressed
as usize];
let posting_count_ngram_2_compressed =
read_u8_ref(
compressed_index_segment_block_buffer,
&mut read_pointer,
);
posting_count_ngram_2 =
DOCUMENT_LENGTH_COMPRESSION
[posting_count_ngram_2_compressed
as usize];
}
_ => {
let posting_count_ngram_1_compressed =
read_u8_ref(
compressed_index_segment_block_buffer,
&mut read_pointer,
);
posting_count_ngram_1 =
DOCUMENT_LENGTH_COMPRESSION
[posting_count_ngram_1_compressed
as usize];
let posting_count_ngram_2_compressed =
read_u8_ref(
compressed_index_segment_block_buffer,
&mut read_pointer,
);
posting_count_ngram_2 =
DOCUMENT_LENGTH_COMPRESSION
[posting_count_ngram_2_compressed
as usize];
let posting_count_ngram_3_compressed =
read_u8_ref(
compressed_index_segment_block_buffer,
&mut read_pointer,
);
posting_count_ngram_3 =
DOCUMENT_LENGTH_COMPRESSION
[posting_count_ngram_3_compressed
as usize];
}
}
let pointer_pivot_p_docid = read_u16_ref(
compressed_index_segment_block_buffer,
&mut read_pointer,
);
let compression_type_pointer = read_u32_ref(
compressed_index_segment_block_buffer,
&mut read_pointer,
);
if let Some(value) = shard.segments_index[key0]
.segment
.get_mut(&key_hash)
{
value.posting_count += posting_count as u32 + 1;
value.blocks.push(BlockObjectIndex {
max_block_score: 0.0,
block_id,
posting_count,
max_docid,
max_p_docid,
pointer_pivot_p_docid,
compression_type_pointer,
});
} else {
let value = PostingListObjectIndex {
posting_count: posting_count as u32 + 1,
posting_count_ngram_1,
posting_count_ngram_2,
posting_count_ngram_3,
max_list_score: 0.0,
position_range_previous: 0,
blocks: vec![BlockObjectIndex {
max_block_score: 0.0,
block_id,
posting_count,
max_docid,
max_p_docid,
pointer_pivot_p_docid,
compression_type_pointer,
}],
..Default::default()
};
shard.segments_index[key0]
.segment
.insert(key_hash, value);
};
if !shard
.indexed_doc_count
.is_multiple_of(ROARING_BLOCK_SIZE)
&& block_id as usize
== shard.indexed_doc_count / ROARING_BLOCK_SIZE
&& shard.meta.access_type == AccessType::Ram
{
let position_range_previous = if key_index == 0 {
0
} else {
let posting_pointer_size_sum_previous =
pointer_pivot_p_docid_previous as usize * 2
+ if (pointer_pivot_p_docid_previous
as usize)
< posting_count_previous
{
(posting_count_previous
- pointer_pivot_p_docid_previous
as usize)
* 3
} else {
0
};
let rank_position_pointer_range_previous= compression_type_pointer_previous & 0b0011_1111_1111_1111_1111_1111_1111_1111;
let compression_type_previous: CompressionType =
FromPrimitive::from_u32(
compression_type_pointer_previous >> 30,
)
.unwrap();
let compressed_docid_previous =
match compression_type_previous {
CompressionType::Array => {
posting_count_previous * 2
}
CompressionType::Bitmap => 8192,
CompressionType::Rle => {
let byte_array_docid = &shard
.segments_index[key0]
.byte_array_blocks
[block_id as usize];
4 * read_u16( byte_array_docid, rank_position_pointer_range_previous as usize +posting_pointer_size_sum_previous) as usize + 2
}
_ => 0,
};
rank_position_pointer_range_previous
+ (posting_pointer_size_sum_previous
+ compressed_docid_previous)
as u32
};
let plo = shard.segments_index[key0]
.segment
.get_mut(&key_hash)
.unwrap();
plo.position_range_previous =
position_range_previous;
posting_count_previous = posting_count as usize + 1;
pointer_pivot_p_docid_previous =
pointer_pivot_p_docid;
compression_type_pointer_previous =
compression_type_pointer;
};
}
}
}
}
shard.committed_doc_count = shard.indexed_doc_count;
shard.is_last_level_incomplete =
!shard.committed_doc_count.is_multiple_of(ROARING_BLOCK_SIZE);
if shard.is_vector_indexing && !shard.vector_file_mmap.is_empty() {
shard.indexed_vector_count = 0;
let mut offset = 0;
for _level_id in 0..shard.level_index.len() {
shard.last_level_vector_file_start_pos = offset as u64;
let cluster_number_bytes =
&shard.vector_file_mmap[offset..offset + 4];
let cluster_number = u32::from_le_bytes(
cluster_number_bytes.try_into().unwrap(),
)
as usize;
offset += 4;
let mut level_vectors_count = 0;
let mut start_index = 0;
for _i in 0..cluster_number {
let cluster_header_bytes =
&shard.vector_file_mmap[offset..offset + 4];
let cluster_header = ClusterHeader {
start_index,
child_count: u32::from_le_bytes(
cluster_header_bytes.try_into().unwrap(),
),
};
offset += 4;
start_index += cluster_header.child_count;
level_vectors_count += cluster_header.child_count;
}
shard.indexed_vector_count += level_vectors_count as usize;
shard.indexed_cluster_count += cluster_number;
offset += level_vectors_count as usize * vector_size;
}
}
for (i, component) in shard.bm25_component_cache.iter_mut().enumerate()
{
let document_length_quotient = DOCUMENT_LENGTH_COMPRESSION[i]
as f32
/ shard.document_length_normalized_average;
*component = K * (1.0 - B + B * document_length_quotient);
}
shard.string_set_to_single_term_id();
update_list_max_impact_score(&mut shard);
let mut reader = BufReader::with_capacity(8192, &shard.delete_file);
while let Ok(buffer) = reader.fill_buf() {
let length = buffer.len();
if length == 0 {
break;
}
for i in (0..length).step_by(8) {
let docid = read_u64(buffer, i);
shard.delete_hashset.insert(docid as usize);
}
reader.consume(length);
}
let shard_arc = Arc::new(RwLock::new(shard));
warmup(&shard_arc).await;
Ok(shard_arc.clone())
}
Err(err) => Err(err.to_string()),
}
}
Err(err) => Err(err.to_string()),
}
}
Err(err) => Err(err.to_string()),
}
}
pub async fn open_index(index_path: &Path) -> Result<IndexArc, String> {
let start_time = Instant::now();
match File::open(Path::new(index_path).join(META_FILENAME)) {
Ok(meta_file) => {
let meta: IndexMetaObject = serde_json::from_reader(BufReader::new(meta_file)).unwrap();
match File::open(Path::new(index_path).join(SCHEMA_FILENAME)) {
Ok(schema_file) => {
let schema = serde_json::from_reader(BufReader::new(schema_file)).unwrap();
let synonyms = if let Ok(synonym_file) =
File::open(Path::new(index_path).join(SYNONYMS_FILENAME))
{
serde_json::from_reader(BufReader::new(synonym_file)).unwrap_or_default()
} else {
Vec::new()
};
let shard_number = index_path
.join("shards")
.read_dir()
.unwrap()
.filter_map(Result::ok)
.filter(|entry| entry.path().is_dir())
.filter_map(|entry| entry.file_name().into_string().ok())
.filter(|name| name.parse::<usize>().is_ok())
.count();
match create_index_root(
index_path,
meta,
&schema,
false,
&synonyms,
11,
false,
Some(shard_number),
)
.await
{
Ok(index_arc) => {
let lock = Arc::into_inner(index_arc).unwrap();
let index = RwLock::into_inner(lock);
let index_arc = Arc::new(RwLock::new(index));
if let Some(symspell) =
&mut index_arc.read().await.symspell_option.as_ref()
{
let dictionary_path =
Path::new(&index_arc.read().await.index_path_string)
.join(DICTIONARY_FILENAME);
let _ = symspell.write().await.load_dictionary(
&dictionary_path,
0,
1,
" ",
);
}
if let Some(completion_option) =
&mut index_arc.read().await.completion_option.as_ref()
{
let _ = completion_option.write().await.load_completions(
&Path::new(&index_arc.read().await.index_path_string)
.join(COMPLETIONS_FILENAME),
0,
1,
":",
);
}
let mut shard_vec: Vec<Arc<RwLock<Shard>>> = Vec::new();
let vector_type = match index_arc.read().await.quantization {
Quantization::ScalarQuantizationI8 => Precision::I8,
Quantization::TurboQuantI8 => Precision::I8,
_ => index_arc.read().await.vector_precision,
};
let dimensions = index_arc.read().await.vector_dimensions;
let paths: Vec<_> = fs::read_dir(index_path.join("shards"))
.unwrap()
.filter_map(Result::ok)
.collect();
let mut shard_handle_vec = Vec::new();
let index_path_clone = Arc::new(index_path.to_path_buf());
for i in 0..paths.len() {
let index_path_clone2 = index_path_clone.clone();
let vector_type_clone = vector_type;
let dimensions_clone = dimensions;
shard_handle_vec.push(tokio::spawn(async move {
let path = index_path_clone2.join("shards").join(i.to_string());
open_shard(&path, true, vector_type_clone, dimensions_clone)
.await
.unwrap()
}));
}
for shard_handle in shard_handle_vec {
let shard_arc = shard_handle.await.unwrap();
shard_arc.write().await.index_option = Some(index_arc.clone());
shard_arc.write().await.quantization =
index_arc.read().await.quantization;
shard_arc.write().await.shard_number =
index_arc.read().await.shard_number;
shard_arc.write().await.vector_dimensions =
index_arc.read().await.vector_dimensions;
shard_arc.write().await.vector_dimensions_original =
index_arc.read().await.vector_dimensions_original;
shard_arc.write().await.vector_precision =
index_arc.read().await.vector_precision;
shard_arc.write().await.vector_similarity =
index_arc.read().await.vector_similarity;
shard_arc.write().await.is_avx2 = index_arc.read().await.is_avx2;
shard_arc.write().await.is_neon = index_arc.read().await.is_neon;
shard_arc.write().await.is_simd = index_arc.read().await.is_simd;
shard_arc.write().await.chunk_size =
index_arc.read().await.chunk_size;
shard_arc.write().await.turbo_quant =
index_arc.read().await.turbo_quant.clone();
if shard_arc.read().await.is_vector_indexing
&& !shard_arc.read().await.vector_file_mmap.is_empty()
&& shard_arc.read().await.quantization
== Quantization::ScalarQuantizationI8
&& shard_arc.read().await.vector_similarity
== VectorSimilarity::Euclidean
{
let (min_vector_value, max_vector_value) = read_min_max(
&shard_arc.read().await.vector_file_mmap,
shard_arc.read().await.vector_dimensions,
);
shard_arc.write().await.min_vector_value = min_vector_value;
shard_arc.write().await.max_vector_value = max_vector_value;
}
index_arc.write().await.indexed_doc_count +=
shard_arc.read().await.indexed_doc_count;
index_arc.write().await.indexed_vector_count +=
shard_arc.read().await.indexed_vector_count;
index_arc.write().await.indexed_cluster_count +=
shard_arc.read().await.indexed_cluster_count;
index_arc.write().await.deleted_doc_count +=
shard_arc.read().await.delete_hashset.len();
let _shard_id = shard_arc.read().await.meta.id;
shard_vec.push(shard_arc);
}
index_arc.write().await.shard_number = shard_vec.len();
index_arc.write().await.shard_vec = shard_vec;
let _elapsed_time = start_time.elapsed().as_nanos();
Ok(index_arc.clone())
}
Err(err) => Err(err.to_string()),
}
}
Err(err) => Err(err.to_string()),
}
}
Err(err) => Err(err.to_string()),
}
}
pub(crate) async fn warmup(shard_object_arc: &ShardArc) {
shard_object_arc.write().await.frequentword_results.clear();
let mut query_facets: Vec<QueryFacet> = Vec::new();
for facet in shard_object_arc.read().await.facets.iter() {
match facet.field_type {
FieldType::String16 => query_facets.push(QueryFacet::String16 {
field: facet.name.clone(),
prefix: "".into(),
length: u16::MAX,
}),
FieldType::String32 => query_facets.push(QueryFacet::String32 {
field: facet.name.clone(),
prefix: "".into(),
length: u32::MAX,
}),
FieldType::StringSet16 => query_facets.push(QueryFacet::StringSet16 {
field: facet.name.clone(),
prefix: "".into(),
length: u16::MAX,
}),
FieldType::StringSet32 => query_facets.push(QueryFacet::StringSet32 {
field: facet.name.clone(),
prefix: "".into(),
length: u32::MAX,
}),
_ => {}
}
}
let frequent_words = shard_object_arc.read().await.frequent_words.clone();
for frequentword in frequent_words.iter() {
let results_list = shard_object_arc
.search_lexical_shard(
frequentword.to_owned(),
QueryType::Union,
false,
0,
1000,
ResultType::TopkCount,
false,
Vec::new(),
query_facets.clone(),
Vec::new(),
Vec::new(),
)
.await;
let mut index_mut = shard_object_arc.write().await;
index_mut
.frequentword_results
.insert(frequentword.to_string(), results_list);
}
}
#[derive(Default, Debug, Deserialize, Serialize, Clone)]
pub(crate) struct TermObject {
pub key_hash: u64,
pub key0: u32,
pub term: String,
pub ngram_type: NgramType,
pub term_ngram_2: String,
pub term_ngram_1: String,
pub term_ngram_0: String,
pub field_vec_ngram1: Vec<(usize, u32)>,
pub field_vec_ngram2: Vec<(usize, u32)>,
pub field_vec_ngram3: Vec<(usize, u32)>,
pub field_positions_vec: Vec<Vec<u16>>,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone)]
pub(crate) struct NonUniqueTermObject {
pub term: String,
pub ngram_type: NgramType,
pub term_ngram_2: String,
pub term_ngram_1: String,
pub term_ngram_0: String,
pub op: QueryType,
}
pub static IS_SYSTEM_LE: LazyLock<bool> = LazyLock::new(|| u16::from_ne_bytes([1, 0]) == 1);
pub static IS_AVX2: LazyLock<bool> = LazyLock::new(|| {
#[cfg(target_arch = "x86_64")]
let is_avx2 = is_x86_feature_detected!("avx2");
#[cfg(not(target_arch = "x86_64"))]
let is_avx2 = false;
is_avx2
});
pub static IS_NEON: LazyLock<bool> = LazyLock::new(|| {
#[cfg(target_arch = "aarch64")]
let is_neon = std::arch::is_aarch64_feature_detected!("neon");
#[cfg(not(target_arch = "aarch64"))]
let is_neon = false;
is_neon
});
pub static IS_SIMD: LazyLock<bool> = LazyLock::new(|| *IS_AVX2 || *IS_NEON);
#[cfg(not(any(
all(
feature = "gxhash",
target_arch = "x86_64",
target_feature = "aes",
target_feature = "sse2"
),
all(
target_arch = "aarch64",
target_feature = "aes",
target_feature = "neon"
)
)))]
pub(crate) static HASHER_32: LazyLock<RandomState> =
LazyLock::new(|| RandomState::with_seeds(805272099, 242851902, 646123436, 591410655));
#[cfg(not(any(
all(
feature = "gxhash",
target_arch = "x86_64",
target_feature = "aes",
target_feature = "sse2"
),
all(
feature = "gxhash",
target_arch = "aarch64",
target_feature = "aes",
target_feature = "neon"
)
)))]
pub(crate) static HASHER_64: LazyLock<RandomState> =
LazyLock::new(|| RandomState::with_seeds(808259318, 750368348, 84901999, 789810389));
#[inline]
#[cfg(any(
all(
feature = "gxhash",
target_arch = "x86_64",
target_feature = "aes",
target_feature = "sse2"
),
all(
feature = "gxhash",
target_arch = "aarch64",
target_feature = "aes",
target_feature = "neon"
)
))]
pub(crate) fn hash32(term_bytes: &[u8]) -> u32 {
gxhash32(term_bytes, 1234)
}
#[inline]
#[cfg(any(
all(
feature = "gxhash",
target_arch = "x86_64",
target_feature = "aes",
target_feature = "sse2"
),
all(
feature = "gxhash",
target_arch = "aarch64",
target_feature = "aes",
target_feature = "neon"
)
))]
pub(crate) fn hash64(term_bytes: &[u8]) -> u64 {
gxhash64(term_bytes, 1234) & 0b1111111111111111111111111111111111111111111111111111111111111000
}
#[inline]
#[cfg(not(any(
all(
feature = "gxhash",
target_arch = "x86_64",
target_feature = "aes",
target_feature = "sse2"
),
all(
feature = "gxhash",
target_arch = "aarch64",
target_feature = "aes",
target_feature = "neon"
)
)))]
pub(crate) fn hash32(term_bytes: &[u8]) -> u32 {
HASHER_32.hash_one(term_bytes) as u32
}
#[inline]
#[cfg(not(any(
all(
feature = "gxhash",
target_arch = "x86_64",
target_feature = "aes",
target_feature = "sse2"
),
all(
feature = "gxhash",
target_arch = "aarch64",
target_feature = "aes",
target_feature = "neon"
)
)))]
pub(crate) fn hash64(term_bytes: &[u8]) -> u64 {
HASHER_64.hash_one(term_bytes)
& 0b1111111111111111111111111111111111111111111111111111111111111000
}
static FREQUENT_EN: &str = include_str!("../assets/dictionaries/frequent_en.txt");
static FREQUENT_DE: &str = include_str!("../assets/dictionaries/frequent_de.txt");
static FREQUENT_FR: &str = include_str!("../assets/dictionaries/frequent_fr.txt");
static FREQUENT_ES: &str = include_str!("../assets/dictionaries/frequent_es.txt");
pub(crate) const NUM_FREE_VALUES: u32 = 24;
pub(crate) fn int_to_byte4(i: u32) -> u8 {
if i < NUM_FREE_VALUES {
i as u8
} else {
let ii = i - NUM_FREE_VALUES;
let num_bits = 32 - ii.leading_zeros();
if num_bits < 4 {
(NUM_FREE_VALUES + ii) as u8
} else {
let shift = num_bits - 4;
(NUM_FREE_VALUES + (((ii >> shift) & 0x07) | (shift + 1) << 3)) as u8
}
}
}
pub(crate) const fn byte4_to_int(b: u8) -> u32 {
if (b as u32) < NUM_FREE_VALUES {
b as u32
} else {
let i = b as u32 - NUM_FREE_VALUES;
let bits = i & 0x07;
let shift = i >> 3;
if shift == 0 {
NUM_FREE_VALUES + bits
} else {
NUM_FREE_VALUES + ((bits | 0x08) << (shift - 1))
}
}
}
pub(crate) const DOCUMENT_LENGTH_COMPRESSION: [u32; 256] = {
let mut k2 = [0; 256];
let mut i = 0usize;
while i < 256 {
k2[i] = byte4_to_int(i as u8);
i += 1;
}
k2
};
impl Shard {
pub(crate) fn string_set_to_single_term_id(&mut self) {
for (i, facet) in self.facets.iter().enumerate() {
if facet.field_type == FieldType::StringSet16
|| facet.field_type == FieldType::StringSet32
{
for (idx, value) in facet.values.iter().enumerate() {
for term in value.1.0.iter() {
self.string_set_to_single_term_id_vec[i]
.entry(term.to_string())
.or_insert(AHashSet::from_iter(vec![idx as u32]))
.insert(idx as u32);
}
}
}
}
}
async fn clear_shard(&mut self) {
let semaphore = self.semaphore.clone();
let permit = semaphore.acquire_owned().await.unwrap();
self.level_terms.clear();
let mut mmap_options = MmapOptions::new();
let mmap: MmapMut = mmap_options.len(4).map_anon().unwrap();
self.index_file_mmap = mmap
.make_read_only()
.expect("Unable to make Mmap read-only");
let _ = self.index_file.rewind();
if let Err(e) = self.index_file.set_len(0) {
println!(
"Unable to index_file.set_len in clear_index {} {} {:?}",
self.index_path_string, self.indexed_doc_count, e
)
};
if !self.compressed_docstore_segment_block_buffer.is_empty() {
self.compressed_docstore_segment_block_buffer = vec![0; ROARING_BLOCK_SIZE * 4];
};
write_u16(
INDEX_FORMAT_VERSION_MAJOR,
&mut self.compressed_index_segment_block_buffer,
0,
);
write_u16(
INDEX_FORMAT_VERSION_MINOR,
&mut self.compressed_index_segment_block_buffer,
2,
);
let _ = self
.index_file
.write(&self.compressed_index_segment_block_buffer[0..INDEX_HEADER_SIZE as usize]);
let _ = self.index_file.flush();
self.index_file_mmap =
unsafe { Mmap::map(&self.index_file).expect("Unable to create Mmap") };
self.docstore_file_mmap = unsafe {
MmapOptions::new()
.len(0)
.map(&self.docstore_file)
.expect("Unable to create Mmap")
};
let _ = self.docstore_file.rewind();
if let Err(e) = self.docstore_file.set_len(0) {
println!("Unable to docstore_file.set_len in clear_index {:?}", e)
};
let _ = self.docstore_file.flush();
let _ = self.delete_file.rewind();
if let Err(e) = self.delete_file.set_len(0) {
println!("Unable to delete_file.set_len in clear_index {:?}", e)
};
let _ = self.delete_file.flush();
self.delete_hashset.clear();
self.facets_file_mmap = unsafe {
MmapOptions::new()
.len(0)
.map_mut(&self.facets_file)
.expect("Unable to create Mmap")
};
let _ = self.facets_file.rewind();
if let Err(e) = self
.facets_file
.set_len((self.facets_size_sum * ROARING_BLOCK_SIZE) as u64)
{
println!("Unable to facets_file.set_len in clear_index {:?}", e)
};
let _ = self.facets_file.flush();
self.facets_file_mmap =
unsafe { MmapMut::map_mut(&self.facets_file).expect("Unable to create Mmap") };
let index_path = Path::new(&self.index_path_string);
let _ = fs::remove_file(index_path.join(FACET_VALUES_FILENAME));
for facet in self.facets.iter_mut() {
facet.values.clear();
facet.min = ValueType::None;
facet.max = ValueType::None;
}
if !self.stored_field_names.is_empty() && self.meta.access_type == AccessType::Mmap {
self.docstore_file_mmap =
unsafe { Mmap::map(&self.docstore_file).expect("Unable to create Mmap") };
}
self.vector_file_mmap = unsafe {
MmapOptions::new()
.len(0)
.map(&self.docstore_file)
.expect("Unable to create Mmap")
};
let _ = self.vector_file.rewind();
if let Err(e) = self.vector_file.set_len(0) {
println!("Unable to vector_file.set_len in clear_index {:?}", e)
};
let _ = self.vector_file.flush();
self.vector_file_mmap =
unsafe { Mmap::map(&self.vector_file).expect("Unable to create Mmap") };
self.indexed_vector_count = 0;
self.indexed_cluster_count = 0;
self.document_length_normalized_average = 0.0;
self.indexed_doc_count = 0;
self.committed_doc_count = 0;
self.positions_sum_normalized = 0;
self.level_index = Vec::new();
for segment in self.segments_index.iter_mut() {
segment.byte_array_blocks.clear();
segment.byte_array_blocks_pointer.clear();
segment.segment.clear();
}
for segment in self.segments_level0.iter_mut() {
segment.segment.clear();
}
self.key_count_sum = 0;
self.block_id = 0;
self.strip_compressed_sum = 0;
self.postings_buffer_pointer = 0;
self.docid_count = 0;
self.size_compressed_docid_index = 0;
self.size_compressed_positions_index = 0;
self.position_count = 0;
self.postinglist_count = 0;
self.is_last_level_incomplete = false;
drop(permit);
}
pub(crate) fn get_index_string_facets_shard(
&self,
query_facets: Vec<QueryFacet>,
) -> Option<AHashMap<String, Facet>> {
if self.facets.is_empty() {
return None;
}
let mut result_query_facets = Vec::new();
if !query_facets.is_empty() {
result_query_facets = vec![ResultFacet::default(); self.facets.len()];
for query_facet in query_facets.iter() {
match &query_facet {
QueryFacet::String16 {
field,
prefix,
length,
} => {
if let Some(idx) = self.facets_map.get(field)
&& self.facets[*idx].field_type == FieldType::String16
{
result_query_facets[*idx] = ResultFacet {
field: field.clone(),
prefix: prefix.clone(),
length: *length as u32,
..Default::default()
}
}
}
QueryFacet::StringSet16 {
field,
prefix,
length,
} => {
if let Some(idx) = self.facets_map.get(field)
&& self.facets[*idx].field_type == FieldType::StringSet16
{
result_query_facets[*idx] = ResultFacet {
field: field.clone(),
prefix: prefix.clone(),
length: *length as u32,
..Default::default()
}
}
}
QueryFacet::String32 {
field,
prefix,
length,
} => {
if let Some(idx) = self.facets_map.get(field)
&& self.facets[*idx].field_type == FieldType::String32
{
result_query_facets[*idx] = ResultFacet {
field: field.clone(),
prefix: prefix.clone(),
length: *length,
..Default::default()
}
}
}
QueryFacet::StringSet32 {
field,
prefix,
length,
} => {
if let Some(idx) = self.facets_map.get(field)
&& self.facets[*idx].field_type == FieldType::StringSet32
{
result_query_facets[*idx] = ResultFacet {
field: field.clone(),
prefix: prefix.clone(),
length: *length,
..Default::default()
}
}
}
_ => {}
};
}
}
let mut facets: AHashMap<String, Facet> = AHashMap::new();
for (i, facet) in result_query_facets.iter().enumerate() {
if facet.length == 0 || self.facets[i].values.is_empty() {
continue;
}
if self.facets[i].field_type == FieldType::StringSet16
|| self.facets[i].field_type == FieldType::StringSet32
{
let mut hash_map: AHashMap<String, usize> = AHashMap::new();
for value in self.facets[i].values.iter() {
for term in value.1.0.iter() {
*hash_map.entry(term.clone()).or_insert(0) += value.1.1;
}
}
let v = hash_map
.iter()
.sorted_unstable_by(|a, b| b.1.cmp(a.1))
.map(|(a, c)| (a.to_string(), *c))
.filter(|(a, _c)| facet.prefix.is_empty() || a.starts_with(&facet.prefix))
.take(facet.length as usize)
.collect::<Vec<_>>();
if !v.is_empty() {
facets.insert(facet.field.clone(), v);
}
} else {
let v = self.facets[i]
.values
.iter()
.sorted_unstable_by(|a, b| b.1.cmp(a.1))
.map(|(a, c)| (a.to_string(), c.1))
.filter(|(a, _c)| facet.prefix.is_empty() || a.starts_with(&facet.prefix))
.take(facet.length as usize)
.collect::<Vec<_>>();
if !v.is_empty() {
facets.insert(facet.field.clone(), v);
}
}
}
Some(facets)
}
}
impl Index {
pub async fn current_doc_count(&self) -> usize {
let mut current_doc_count = 0;
for shard in self.shard_vec.iter() {
current_doc_count +=
shard.read().await.indexed_doc_count - shard.read().await.delete_hashset.len();
}
current_doc_count
}
pub async fn uncommitted_doc_count(&self) -> usize {
let mut uncommitted_doc_count = 0;
for shard in self.shard_vec.iter() {
uncommitted_doc_count +=
shard.read().await.indexed_doc_count - shard.read().await.committed_doc_count;
}
uncommitted_doc_count
}
pub async fn committed_doc_count(&self) -> usize {
let mut committed_doc_count = 0;
for shard in self.shard_vec.iter() {
committed_doc_count += shard.read().await.committed_doc_count;
}
committed_doc_count
}
pub async fn indexed_doc_count(&self) -> usize {
let mut indexed_doc_count = 0;
for shard in self.shard_vec.iter() {
indexed_doc_count += shard.read().await.indexed_doc_count;
}
indexed_doc_count
}
pub async fn indexed_vector_count(&self) -> usize {
let mut indexed_vector_count = 0;
for shard in self.shard_vec.iter() {
indexed_vector_count += shard.read().await.indexed_vector_count;
}
indexed_vector_count
}
pub async fn indexed_cluster_count(&self) -> usize {
let mut indexed_cluster_count = 0;
for shard in self.shard_vec.iter() {
indexed_cluster_count += shard.read().await.indexed_cluster_count;
}
indexed_cluster_count
}
pub async fn level_count(&self) -> usize {
let mut level_count = 0;
for shard in self.shard_vec.iter() {
level_count += shard.read().await.level_index.len();
}
level_count
}
pub async fn shard_count(&self) -> usize {
self.shard_number
}
pub fn facets_count(&self) -> usize {
self.facets.len()
}
pub async fn index_facets_minmax(&self) -> HashMap<String, MinMaxFieldJson> {
let mut facets_minmax: HashMap<String, MinMaxFieldJson> = HashMap::new();
for shard in self.shard_vec.iter() {
for facet in shard.read().await.facets.iter() {
match (&facet.min, &facet.max) {
(ValueType::U8(min), ValueType::U8(max)) => {
if let Some(item) = facets_minmax.get_mut(&facet.name) {
*item = MinMaxFieldJson {
min: (*min.min(&(item.min.as_u64().unwrap() as u8))).into(),
max: (*max.min(&(item.max.as_u64().unwrap() as u8))).into(),
}
} else {
facets_minmax.insert(
facet.name.clone(),
MinMaxFieldJson {
min: (*min).into(),
max: (*max).into(),
},
);
}
}
(ValueType::U16(min), ValueType::U16(max)) => {
if let Some(item) = facets_minmax.get_mut(&facet.name) {
*item = MinMaxFieldJson {
min: (*min.min(&(item.min.as_u64().unwrap() as u16))).into(),
max: (*max.min(&(item.max.as_u64().unwrap() as u16))).into(),
}
} else {
facets_minmax.insert(
facet.name.clone(),
MinMaxFieldJson {
min: (*min).into(),
max: (*max).into(),
},
);
}
}
(ValueType::U32(min), ValueType::U32(max)) => {
if let Some(item) = facets_minmax.get_mut(&facet.name) {
*item = MinMaxFieldJson {
min: (*min.min(&(item.min.as_u64().unwrap() as u32))).into(),
max: (*max.min(&(item.max.as_u64().unwrap() as u32))).into(),
}
} else {
facets_minmax.insert(
facet.name.clone(),
MinMaxFieldJson {
min: (*min).into(),
max: (*max).into(),
},
);
}
}
(ValueType::U64(min), ValueType::U64(max)) => {
if let Some(item) = facets_minmax.get_mut(&facet.name) {
*item = MinMaxFieldJson {
min: (*min.min(&(item.min.as_u64().unwrap()))).into(),
max: (*max.min(&(item.max.as_u64().unwrap()))).into(),
}
} else {
facets_minmax.insert(
facet.name.clone(),
MinMaxFieldJson {
min: (*min).into(),
max: (*max).into(),
},
);
}
}
(ValueType::I8(min), ValueType::I8(max)) => {
if let Some(item) = facets_minmax.get_mut(&facet.name) {
*item = MinMaxFieldJson {
min: (*min.min(&(item.min.as_i64().unwrap() as i8))).into(),
max: (*max.min(&(item.max.as_i64().unwrap() as i8))).into(),
}
} else {
facets_minmax.insert(
facet.name.clone(),
MinMaxFieldJson {
min: (*min).into(),
max: (*max).into(),
},
);
}
}
(ValueType::I16(min), ValueType::I16(max)) => {
if let Some(item) = facets_minmax.get_mut(&facet.name) {
*item = MinMaxFieldJson {
min: (*min.min(&(item.min.as_i64().unwrap() as i16))).into(),
max: (*max.min(&(item.max.as_i64().unwrap() as i16))).into(),
}
} else {
facets_minmax.insert(
facet.name.clone(),
MinMaxFieldJson {
min: (*min).into(),
max: (*max).into(),
},
);
}
}
(ValueType::I32(min), ValueType::I32(max)) => {
if let Some(item) = facets_minmax.get_mut(&facet.name) {
*item = MinMaxFieldJson {
min: (*min.min(&(item.min.as_i64().unwrap() as i32))).into(),
max: (*max.min(&(item.max.as_i64().unwrap() as i32))).into(),
}
} else {
facets_minmax.insert(
facet.name.clone(),
MinMaxFieldJson {
min: (*min).into(),
max: (*max).into(),
},
);
}
}
(ValueType::I64(min), ValueType::I64(max)) => {
if let Some(item) = facets_minmax.get_mut(&facet.name) {
*item = MinMaxFieldJson {
min: (*min.min(&(item.min.as_i64().unwrap()))).into(),
max: (*max.min(&(item.max.as_i64().unwrap()))).into(),
}
} else {
facets_minmax.insert(
facet.name.clone(),
MinMaxFieldJson {
min: (*min).into(),
max: (*max).into(),
},
);
}
}
(ValueType::Timestamp(min), ValueType::Timestamp(max)) => {
if let Some(item) = facets_minmax.get_mut(&facet.name) {
*item = MinMaxFieldJson {
min: (*min.min(&(item.min.as_i64().unwrap()))).into(),
max: (*max.min(&(item.max.as_i64().unwrap()))).into(),
}
} else {
facets_minmax.insert(
facet.name.clone(),
MinMaxFieldJson {
min: (*min).into(),
max: (*max).into(),
},
);
}
}
(ValueType::F32(min), ValueType::F32(max)) => {
if let Some(item) = facets_minmax.get_mut(&facet.name) {
*item = MinMaxFieldJson {
min: min.min(item.min.as_f64().unwrap() as f32).into(),
max: max.min(item.max.as_f64().unwrap() as f32).into(),
}
} else {
facets_minmax.insert(
facet.name.clone(),
MinMaxFieldJson {
min: (*min).into(),
max: (*max).into(),
},
);
}
}
(ValueType::F64(min), ValueType::F64(max)) => {
if let Some(item) = facets_minmax.get_mut(&facet.name) {
*item = MinMaxFieldJson {
min: min.min(item.min.as_f64().unwrap()).into(),
max: max.min(item.max.as_f64().unwrap()).into(),
}
} else {
facets_minmax.insert(
facet.name.clone(),
MinMaxFieldJson {
min: (*min).into(),
max: (*max).into(),
},
);
}
}
_ => {}
}
}
}
facets_minmax
}
pub async fn get_index_string_facets(
&self,
query_facets: Vec<QueryFacet>,
) -> Option<AHashMap<String, Facet>> {
if self.facets.is_empty() {
return None;
}
let mut result: AHashMap<String, Facet> = AHashMap::new();
let mut result_facets: AHashMap<String, (AHashMap<String, usize>, u32)> = AHashMap::new();
for query_facet in query_facets.iter() {
match query_facet {
QueryFacet::String16 {
field,
prefix: _,
length,
} => {
result_facets.insert(field.into(), (AHashMap::new(), *length as u32));
}
QueryFacet::StringSet16 {
field,
prefix: _,
length,
} => {
result_facets.insert(field.into(), (AHashMap::new(), *length as u32));
}
QueryFacet::String32 {
field,
prefix: _,
length,
} => {
result_facets.insert(field.into(), (AHashMap::new(), *length));
}
QueryFacet::StringSet32 {
field,
prefix: _,
length,
} => {
result_facets.insert(field.into(), (AHashMap::new(), *length));
}
_ => {}
}
}
for shard_arc in self.shard_vec.iter() {
let shard = shard_arc.read().await;
if !shard.facets.is_empty() {
for facet in shard.facets.iter() {
if let Some(existing) = result_facets.get_mut(&facet.name) {
for (key, value) in facet.values.iter() {
*existing.0.entry(key.clone()).or_insert(0) += value.1;
}
};
}
}
}
for (key, value) in result_facets.iter_mut() {
let sum = value
.0
.iter()
.sorted_unstable_by(|a, b| b.1.cmp(a.1))
.map(|(a, c)| (a.clone(), *c))
.take(value.1 as usize)
.collect::<Vec<_>>();
result.insert(key.clone(), sum);
}
Some(result)
}
pub async fn clear_index(&mut self) {
let index_path = Path::new(&self.index_path_string);
let _ = fs::remove_file(index_path.join(DICTIONARY_FILENAME));
if let Some(spelling_correction) = self.meta.spelling_correction.as_ref() {
self.symspell_option = Some(Arc::new(RwLock::new(SymSpell::new(
spelling_correction.max_dictionary_edit_distance,
spelling_correction.term_length_threshold.clone(),
7,
spelling_correction.count_threshold,
))));
}
let _ = fs::remove_file(index_path.join(COMPLETIONS_FILENAME));
if let Some(_query_completion) = self.meta.query_completion.as_ref() {
self.completion_option = Some(Arc::new(RwLock::new(PruningRadixTrie::new())));
}
let mut result_object_list = Vec::new();
for shard in self.shard_vec.iter() {
let shard_clone = shard.clone();
result_object_list.push(tokio::spawn(async move {
shard_clone.write().await.clear_shard().await;
}));
}
future::join_all(result_object_list).await;
}
pub fn delete_index(&mut self) {
let index_path = Path::new(&self.index_path_string);
let _ = fs::remove_dir_all(index_path);
}
pub fn get_synonyms(&self) -> Result<Vec<Synonym>, String> {
if let Ok(synonym_file) =
File::open(Path::new(&self.index_path_string).join(SYNONYMS_FILENAME))
{
if let Ok(synonyms) = serde_json::from_reader(BufReader::new(synonym_file)) {
Ok(synonyms)
} else {
Err("not found".into())
}
} else {
Err("not found".into())
}
}
pub fn set_synonyms(&mut self, synonyms: &Vec<Synonym>) -> Result<usize, String> {
serde_json::to_writer(
&File::create(Path::new(&self.index_path_string).join(SYNONYMS_FILENAME)).unwrap(),
&synonyms,
)
.unwrap();
self.synonyms_map = get_synonyms_map(synonyms, self.segment_number_mask1);
Ok(synonyms.len())
}
pub fn add_synonyms(&mut self, synonyms: &[Synonym]) -> Result<usize, String> {
let mut merged_synonyms = if let Ok(synonym_file) =
File::open(Path::new(&self.index_path_string).join(SYNONYMS_FILENAME))
{
serde_json::from_reader(BufReader::new(synonym_file)).unwrap_or_default()
} else {
Vec::new()
};
merged_synonyms.extend(synonyms.iter().cloned());
serde_json::to_writer(
&File::create(Path::new(&self.index_path_string).join(SYNONYMS_FILENAME)).unwrap(),
&merged_synonyms,
)
.unwrap();
self.synonyms_map = get_synonyms_map(&merged_synonyms, self.segment_number_mask1);
Ok(merged_synonyms.len())
}
}
#[allow(async_fn_in_trait)]
pub trait Close {
async fn close(&self);
}
impl Close for IndexArc {
async fn close(&self) {
self.commit().await;
let mut modified = false;
for shard in self.read().await.shard_vec.iter() {
if shard.read().await.modified {
modified = true;
break;
}
}
let mut dictionary_source = false;
let mut completion_source = false;
if modified {
for schema_item in self.read().await.schema_map.iter() {
if schema_item.1.dictionary_source {
dictionary_source = true;
}
if schema_item.1.completion_source {
completion_source = true;
}
}
}
if completion_source
&& let Some(completion_option) = &self.read().await.completion_option.as_ref()
{
let trie = completion_option.read().await;
let completions_path =
Path::new(&self.read().await.index_path_string).join(COMPLETIONS_FILENAME);
_ = trie.save_completions(&completions_path, ":");
}
if dictionary_source && let Some(symspell) = &mut self.read().await.symspell_option.as_ref()
{
let dictionary_path =
Path::new(&self.read().await.index_path_string).join(DICTIONARY_FILENAME);
let _ = symspell.read().await.save_dictionary(&dictionary_path, " ");
}
let mut result_object_list = Vec::new();
for shard in self.read().await.shard_vec.iter() {
let shard_clone = shard.clone();
result_object_list.push(tokio::spawn(async move {
let mut mmap_options = MmapOptions::new();
let mmap: MmapMut = mmap_options.len(4).map_anon().unwrap();
shard_clone.write().await.index_file_mmap = mmap
.make_read_only()
.expect("Unable to make Mmap read-only");
let mut mmap_options = MmapOptions::new();
let mmap: MmapMut = mmap_options.len(4).map_anon().unwrap();
shard_clone.write().await.docstore_file_mmap = mmap
.make_read_only()
.expect("Unable to make Mmap read-only");
}));
}
future::join_all(result_object_list).await;
}
}
#[allow(async_fn_in_trait)]
pub trait DeleteDocument {
async fn delete_document(&self, docid: u64);
}
impl DeleteDocument for IndexArc {
async fn delete_document(&self, docid: u64) {
let index_ref = self.read().await;
let shard_number = index_ref.shard_number as u64;
let shard_id = docid % shard_number;
let doc_id = docid / shard_number;
let mut shard_mut = index_ref.shard_vec[shard_id as usize].write().await;
if doc_id as usize >= shard_mut.indexed_doc_count {
return;
}
if shard_mut.delete_hashset.insert(doc_id as usize) {
let mut buffer: [u8; 8] = [0; 8];
write_u64(doc_id, &mut buffer, 0);
let _ = shard_mut.delete_file.write(&buffer);
let _ = shard_mut.delete_file.flush();
}
}
}
#[allow(async_fn_in_trait)]
pub trait DeleteDocuments {
async fn delete_documents(&self, docid_vec: Vec<u64>);
}
impl DeleteDocuments for IndexArc {
async fn delete_documents(&self, docid_vec: Vec<u64>) {
for docid in docid_vec {
self.delete_document(docid).await;
}
}
}
#[allow(clippy::too_many_arguments)]
#[allow(async_fn_in_trait)]
pub trait DeleteDocumentsByQuery {
async fn delete_documents_by_query(
&self,
query_string: String,
query_type_default: QueryType,
offset: usize,
length: usize,
include_uncommitted: bool,
field_filter: Vec<String>,
facet_filter: Vec<FacetFilter>,
result_sort: Vec<ResultSort>,
);
}
impl DeleteDocumentsByQuery for IndexArc {
async fn delete_documents_by_query(
&self,
query_string: String,
query_type_default: QueryType,
offset: usize,
length: usize,
include_uncommitted: bool,
field_filter: Vec<String>,
facet_filter: Vec<FacetFilter>,
result_sort: Vec<ResultSort>,
) {
let rlo = self
.search(
query_string.to_owned(),
None,
query_type_default,
SearchMode::Lexical,
false,
offset,
length,
ResultType::Topk,
include_uncommitted,
field_filter,
Vec::new(),
facet_filter,
result_sort,
QueryRewriting::SearchOnly,
)
.await;
let document_id_vec: Vec<u64> = rlo
.results
.iter()
.map(|result| result.doc_id as u64)
.collect();
self.delete_documents(document_id_vec).await;
}
}
#[allow(async_fn_in_trait)]
pub trait UpdateDocument {
async fn update_document(&self, id_document: (u64, Document));
}
impl UpdateDocument for IndexArc {
async fn update_document(&self, id_document: (u64, Document)) {
self.delete_document(id_document.0).await;
self.index_document(id_document.1, FileType::None).await;
}
}
#[allow(async_fn_in_trait)]
pub trait UpdateDocuments {
async fn update_documents(&self, id_document_vec: Vec<(u64, Document)>);
}
impl UpdateDocuments for IndexArc {
async fn update_documents(&self, id_document_vec: Vec<(u64, Document)>) {
let (docid_vec, document_vec): (Vec<_>, Vec<_>) = id_document_vec.into_iter().unzip();
self.delete_documents(docid_vec).await;
self.index_documents(document_vec).await;
}
}
#[allow(async_fn_in_trait)]
pub trait IndexDocuments {
async fn index_documents(&self, document_vec: Vec<Document>);
}
impl IndexDocuments for IndexArc {
async fn index_documents(&self, document_vec: Vec<Document>) {
for document in document_vec {
self.index_document(document, FileType::None).await;
}
}
}
#[allow(async_fn_in_trait)]
pub trait IndexDocument {
async fn index_document(&self, document: Document, file: FileType);
}
impl IndexDocument for IndexArc {
async fn index_document(&self, document: Document, file: FileType) {
let shard_number = self.read().await.shard_number;
let docid_global_arc = self.read().await.docid_global.clone();
let mut docid_global = docid_global_arc.write().await;
let docid_global_clone = *docid_global;
let shard_id = *docid_global % shard_number;
let shard_arc = self.read().await.shard_vec[shard_id].clone();
let semaphore = shard_arc.read().await.semaphore.clone();
let permit = semaphore.acquire_owned().await.unwrap();
*docid_global += 1;
drop(docid_global);
INDEX_RUNTIME.handle().spawn(async move {
shard_arc
.index_document_shard(document, file, docid_global_clone)
.await;
drop(permit);
});
}
}
#[allow(async_fn_in_trait)]
pub(crate) trait IndexDocumentShard {
async fn index_document_shard(&self, document: Document, file: FileType, docid_global: usize);
}
pub(crate) fn object_values_to_string_vec_recursive(value: &Value, out: &mut Vec<String>) {
match value {
Value::String(s) => out.push(s.clone()),
Value::Array(arr) => {
for v in arr {
object_values_to_string_vec_recursive(v, out);
}
}
Value::Object(map) => {
for v in map.values() {
object_values_to_string_vec_recursive(v, out);
}
}
_ => {}
}
}
impl IndexDocumentShard for ShardArc {
async fn index_document_shard(&self, document: Document, file: FileType, docid_global: usize) {
let shard_arc_clone = self.clone();
let shard_ref = self.read().await;
let schema = shard_ref.indexed_schema_vec.clone();
let ngram_indexing = shard_ref.meta.ngram_indexing;
let indexed_field_vec_len = shard_ref.indexed_field_vec.len();
let tokenizer_type = shard_ref.meta.tokenizer;
let segment_number_mask1 = shard_ref.segment_number_mask1;
drop(shard_ref);
let token_per_field_max: u32 = u16::MAX as u32;
let mut unique_terms: AHashMap<String, TermObject> = AHashMap::new();
let mut field_vec: Vec<(usize, u8, u32, u32)> = Vec::new();
let shard_ref2 = shard_arc_clone.read().await;
for schema_field in schema.iter() {
if !schema_field.index_lexical {
continue;
}
if let Some(field_value) = document.get(&schema_field.field) {
let mut non_unique_terms: Vec<NonUniqueTermObject> = Vec::new();
let mut nonunique_terms_count = 0u32;
let text = match schema_field.field_type {
FieldType::Json => {
if matches!(field_value, Value::Object { .. }) {
let mut strings_vec: Vec<String> = Vec::new();
object_values_to_string_vec_recursive(field_value, &mut strings_vec);
strings_vec.join(" ")
} else {
serde_json::from_value::<String>(field_value.clone())
.unwrap_or(field_value.to_string())
}
}
FieldType::Text | FieldType::String16 | FieldType::String32 => {
serde_json::from_value::<String>(field_value.clone())
.unwrap_or(field_value.to_string())
}
_ => field_value.to_string(),
};
let mut query_type_mut = QueryType::Union;
tokenizer(
&shard_ref2,
&text,
&mut unique_terms,
&mut non_unique_terms,
tokenizer_type,
segment_number_mask1,
&mut nonunique_terms_count,
token_per_field_max,
MAX_POSITIONS_PER_TERM,
false,
&mut query_type_mut,
ngram_indexing,
schema_field.indexed_field_id,
indexed_field_vec_len,
)
.await;
let document_length_compressed: u8 = int_to_byte4(nonunique_terms_count);
let document_length_normalized: u32 =
DOCUMENT_LENGTH_COMPRESSION[document_length_compressed as usize];
field_vec.push((
schema_field.indexed_field_id,
document_length_compressed,
document_length_normalized,
nonunique_terms_count,
));
}
}
drop(shard_ref2);
let ngrams: Vec<String> = unique_terms
.iter()
.filter(|term| term.1.ngram_type != NgramType::SingleTerm)
.map(|term| term.1.term.clone())
.collect();
for term in ngrams.iter() {
let ngram = unique_terms.get(term).unwrap();
match ngram.ngram_type {
NgramType::SingleTerm => {}
NgramType::NgramFF | NgramType::NgramFR | NgramType::NgramRF => {
let term_ngram1 = ngram.term_ngram_1.clone();
let term_ngram2 = ngram.term_ngram_0.clone();
for indexed_field_id in 0..indexed_field_vec_len {
let positions_count_ngram1 =
unique_terms[&term_ngram1].field_positions_vec[indexed_field_id].len();
let positions_count_ngram2 =
unique_terms[&term_ngram2].field_positions_vec[indexed_field_id].len();
let ngram = unique_terms.get_mut(term).unwrap();
if positions_count_ngram1 > 0 {
ngram
.field_vec_ngram1
.push((indexed_field_id, positions_count_ngram1 as u32));
}
if positions_count_ngram2 > 0 {
ngram
.field_vec_ngram2
.push((indexed_field_id, positions_count_ngram2 as u32));
}
}
}
_ => {
let term_ngram1 = ngram.term_ngram_2.clone();
let term_ngram2 = ngram.term_ngram_1.clone();
let term_ngram3 = ngram.term_ngram_0.clone();
for indexed_field_id in 0..indexed_field_vec_len {
let positions_count_ngram1 =
unique_terms[&term_ngram1].field_positions_vec[indexed_field_id].len();
let positions_count_ngram2 =
unique_terms[&term_ngram2].field_positions_vec[indexed_field_id].len();
let positions_count_ngram3 =
unique_terms[&term_ngram3].field_positions_vec[indexed_field_id].len();
let ngram = unique_terms.get_mut(term).unwrap();
if positions_count_ngram1 > 0 {
ngram
.field_vec_ngram1
.push((indexed_field_id, positions_count_ngram1 as u32));
}
if positions_count_ngram2 > 0 {
ngram
.field_vec_ngram2
.push((indexed_field_id, positions_count_ngram2 as u32));
}
if positions_count_ngram3 > 0 {
ngram
.field_vec_ngram3
.push((indexed_field_id, positions_count_ngram3 as u32));
}
}
}
}
}
let document_item = DocumentItem {
document,
unique_terms,
field_vec,
};
shard_arc_clone
.index_document_shard_2(document_item, file, docid_global)
.await;
}
}
#[allow(async_fn_in_trait)]
pub(crate) trait IndexDocumentShard2 {
async fn index_document_shard_2(
&self,
document_item: DocumentItem,
file: FileType,
docid_global: usize,
);
}
impl IndexDocumentShard2 for ShardArc {
async fn index_document_shard_2(
&self,
document_item: DocumentItem,
file: FileType,
docid_global: usize,
) {
let mut shard_mut = self.write().await;
let docid_local = docid_global / shard_mut.shard_number;
shard_mut.indexed_doc_count = docid_local + 1;
let do_commit = shard_mut.block_id != docid_local >> 16;
if do_commit {
if shard_mut.is_vector_indexing {
shard_mut.commit_vector_shard().await;
}
shard_mut.commit_lexical_shard(docid_local).await;
shard_mut.block_id = docid_local >> 16;
}
if shard_mut.is_vector_indexing {
shard_mut
.index_vector_shard(docid_local, &document_item.document)
.await;
}
if !shard_mut.facets.is_empty() {
let facets_size_sum = shard_mut.facets_size_sum;
for i in 0..shard_mut.facets.len() {
let facet = &mut shard_mut.facets[i];
if let Some(field_value) = document_item.document.get(&facet.name) {
let address = (facets_size_sum * docid_local) + facet.offset;
match facet.field_type {
FieldType::U8 => {
let value = field_value.as_u64().unwrap_or_default() as u8;
match (&facet.min, &facet.max) {
(ValueType::U8(min), ValueType::U8(max)) => {
if value < *min {
facet.min = ValueType::U8(value);
}
if value > *max {
facet.max = ValueType::U8(value);
}
}
(ValueType::None, ValueType::None) => {
facet.min = ValueType::U8(value);
facet.max = ValueType::U8(value);
}
_ => {}
}
shard_mut.facets_file_mmap[address] = value
}
FieldType::U16 => {
let value = field_value.as_u64().unwrap_or_default() as u16;
match (&facet.min, &facet.max) {
(ValueType::U16(min), ValueType::U16(max)) => {
if value < *min {
facet.min = ValueType::U16(value);
}
if value > *max {
facet.max = ValueType::U16(value);
}
}
(ValueType::None, ValueType::None) => {
facet.min = ValueType::U16(value);
facet.max = ValueType::U16(value);
}
_ => {}
}
write_u16(value, &mut shard_mut.facets_file_mmap, address)
}
FieldType::U32 => {
let value = field_value.as_u64().unwrap_or_default() as u32;
match (&facet.min, &facet.max) {
(ValueType::U32(min), ValueType::U32(max)) => {
if value < *min {
facet.min = ValueType::U32(value);
}
if value > *max {
facet.max = ValueType::U32(value);
}
}
(ValueType::None, ValueType::None) => {
facet.min = ValueType::U32(value);
facet.max = ValueType::U32(value);
}
_ => {}
}
write_u32(value, &mut shard_mut.facets_file_mmap, address)
}
FieldType::U64 => {
let value = field_value.as_u64().unwrap_or_default();
match (&facet.min, &facet.max) {
(ValueType::U64(min), ValueType::U64(max)) => {
if value < *min {
facet.min = ValueType::U64(value);
}
if value > *max {
facet.max = ValueType::U64(value);
}
}
(ValueType::None, ValueType::None) => {
facet.min = ValueType::U64(value);
facet.max = ValueType::U64(value);
}
_ => {}
}
write_u64(value, &mut shard_mut.facets_file_mmap, address)
}
FieldType::I8 => {
let value = field_value.as_i64().unwrap_or_default() as i8;
match (&facet.min, &facet.max) {
(ValueType::I8(min), ValueType::I8(max)) => {
if value < *min {
facet.min = ValueType::I8(value);
}
if value > *max {
facet.max = ValueType::I8(value);
}
}
(ValueType::None, ValueType::None) => {
facet.min = ValueType::I8(value);
facet.max = ValueType::I8(value);
}
_ => {}
}
write_i8(value, &mut shard_mut.facets_file_mmap, address)
}
FieldType::I16 => {
let value = field_value.as_i64().unwrap_or_default() as i16;
match (&facet.min, &facet.max) {
(ValueType::I16(min), ValueType::I16(max)) => {
if value < *min {
facet.min = ValueType::I16(value);
}
if value > *max {
facet.max = ValueType::I16(value);
}
}
(ValueType::None, ValueType::None) => {
facet.min = ValueType::I16(value);
facet.max = ValueType::I16(value);
}
_ => {}
}
write_i16(value, &mut shard_mut.facets_file_mmap, address)
}
FieldType::I32 => {
let value = field_value.as_i64().unwrap_or_default() as i32;
match (&facet.min, &facet.max) {
(ValueType::I32(min), ValueType::I32(max)) => {
if value < *min {
facet.min = ValueType::I32(value);
}
if value > *max {
facet.max = ValueType::I32(value);
}
}
(ValueType::None, ValueType::None) => {
facet.min = ValueType::I32(value);
facet.max = ValueType::I32(value);
}
_ => {}
}
write_i32(value, &mut shard_mut.facets_file_mmap, address)
}
FieldType::I64 => {
let value = field_value.as_i64().unwrap_or_default();
match (&facet.min, &facet.max) {
(ValueType::I64(min), ValueType::I64(max)) => {
if value < *min {
facet.min = ValueType::I64(value);
}
if value > *max {
facet.max = ValueType::I64(value);
}
}
(ValueType::None, ValueType::None) => {
facet.min = ValueType::I64(value);
facet.max = ValueType::I64(value);
}
_ => {}
}
write_i64(value, &mut shard_mut.facets_file_mmap, address)
}
FieldType::Timestamp => {
let value = field_value.as_i64().unwrap_or_default();
match (&facet.min, &facet.max) {
(ValueType::Timestamp(min), ValueType::Timestamp(max)) => {
if value < *min {
facet.min = ValueType::Timestamp(value);
}
if value > *max {
facet.max = ValueType::Timestamp(value);
}
}
(ValueType::None, ValueType::None) => {
facet.min = ValueType::Timestamp(value);
facet.max = ValueType::Timestamp(value);
}
_ => {}
}
write_i64(value, &mut shard_mut.facets_file_mmap, address);
}
FieldType::F32 => {
let value = field_value.as_f64().unwrap_or_default() as f32;
match (&facet.min, &facet.max) {
(ValueType::F32(min), ValueType::F32(max)) => {
if value < *min {
facet.min = ValueType::F32(value);
}
if value > *max {
facet.max = ValueType::F32(value);
}
}
(ValueType::None, ValueType::None) => {
facet.min = ValueType::F32(value);
facet.max = ValueType::F32(value);
}
_ => {}
}
write_f32(value, &mut shard_mut.facets_file_mmap, address)
}
FieldType::F64 => {
let value = field_value.as_f64().unwrap_or_default();
match (&facet.min, &facet.max) {
(ValueType::F64(min), ValueType::F64(max)) => {
if value < *min {
facet.min = ValueType::F64(value);
}
if value > *max {
facet.max = ValueType::F64(value);
}
}
(ValueType::None, ValueType::None) => {
facet.min = ValueType::F64(value);
facet.max = ValueType::F64(value);
}
_ => {}
}
write_f64(value, &mut shard_mut.facets_file_mmap, address)
}
FieldType::String16 if facet.values.len() < u16::MAX as usize => {
let key = serde_json::from_value::<String>(field_value.clone())
.unwrap_or(field_value.to_string());
let key_string = key.clone();
let key = vec![key];
facet.values.entry(key_string.clone()).or_insert((key, 0)).1 += 1;
let facet_value_id =
facet.values.get_index_of(&key_string).unwrap() as u16;
write_u16(facet_value_id, &mut shard_mut.facets_file_mmap, address)
}
FieldType::StringSet16 if facet.values.len() < u16::MAX as usize => {
let mut key: Vec<String> =
serde_json::from_value(field_value.clone()).unwrap();
key.sort();
let key_string = key.join("_");
facet.values.entry(key_string.clone()).or_insert((key, 0)).1 += 1;
let facet_value_id =
facet.values.get_index_of(&key_string).unwrap() as u16;
write_u16(facet_value_id, &mut shard_mut.facets_file_mmap, address)
}
FieldType::String32 if facet.values.len() < u32::MAX as usize => {
let key = serde_json::from_value::<String>(field_value.clone())
.unwrap_or(field_value.to_string());
let key_string = key.clone();
let key = vec![key];
facet.values.entry(key_string.clone()).or_insert((key, 0)).1 += 1;
let facet_value_id =
facet.values.get_index_of(&key_string).unwrap() as u32;
write_u32(facet_value_id, &mut shard_mut.facets_file_mmap, address)
}
FieldType::StringSet32 if facet.values.len() < u32::MAX as usize => {
let mut key: Vec<String> =
serde_json::from_value(field_value.clone()).unwrap();
key.sort();
let key_string = key.join("_");
facet.values.entry(key_string.clone()).or_insert((key, 0)).1 += 1;
let facet_value_id =
facet.values.get_index_of(&key_string).unwrap() as u32;
write_u32(facet_value_id, &mut shard_mut.facets_file_mmap, address)
}
FieldType::Point => {
if let Ok(point) = serde_json::from_value::<Point>(field_value.clone())
&& point.len() == 2
{
if point[0] >= -90.0
&& point[0] <= 90.0
&& point[1] >= -180.0
&& point[1] <= 180.0
{
let morton_code = encode_morton_2_d(&point);
write_u64(morton_code, &mut shard_mut.facets_file_mmap, address)
} else {
println!(
"outside valid coordinate range: {} {}",
point[0], point[1]
);
}
}
}
_ => {}
};
}
}
}
if !shard_mut.uncommitted {
if shard_mut.segments_level0[0].positions_compressed.is_empty() {
for strip0 in shard_mut.segments_level0.iter_mut() {
strip0.positions_compressed = vec![0; MAX_POSITIONS_PER_TERM * 2];
}
}
shard_mut.uncommitted = true;
}
let mut longest_field_id: usize = 0;
let mut longest_field_length: u32 = 0;
for value in document_item.field_vec {
if docid_local == 0 && value.3 > longest_field_length {
longest_field_id = value.0;
longest_field_length = value.3;
}
shard_mut.document_length_compressed_array[value.0]
[docid_local & 0b11111111_11111111] = value.1;
shard_mut.positions_sum_normalized += value.2 as u64;
shard_mut.indexed_field_vec[value.0].field_length_sum += value.2 as usize;
}
if docid_local == 0 && shard_mut.is_lexical_indexing {
if !shard_mut.longest_field_auto {
longest_field_id = shard_mut.longest_field_id;
}
shard_mut.longest_field_id = longest_field_id;
shard_mut.indexed_field_vec[longest_field_id].is_longest_field = true;
if shard_mut.longest_field_auto && shard_mut.indexed_field_vec.len() > 1 {
println!(
"detect longest field id {} name {} length {}",
longest_field_id,
shard_mut.indexed_field_vec[longest_field_id].schema_field_name,
longest_field_length
);
}
}
let mut unique_terms = document_item.unique_terms;
if !shard_mut.synonyms_map.is_empty() {
let unique_terms_clone = unique_terms.clone();
for term in unique_terms_clone.iter() {
if term.1.ngram_type == NgramType::SingleTerm {
let synonym = shard_mut.synonyms_map.get(&term.1.key_hash).cloned();
if let Some(synonym) = synonym {
for synonym_term in synonym {
let mut term_clone = term.1.clone();
term_clone.key_hash = synonym_term.1.0;
term_clone.key0 = synonym_term.1.1;
term_clone.term = synonym_term.0.clone();
if let Some(existing) = unique_terms.get_mut(&synonym_term.0) {
existing
.field_positions_vec
.iter_mut()
.zip(term_clone.field_positions_vec.iter())
.for_each(|(x1, x2)| {
x1.extend_from_slice(x2);
x1.sort_unstable();
});
} else {
unique_terms.insert(synonym_term.0.clone(), term_clone);
};
}
}
}
}
}
for term in unique_terms {
shard_mut.index_posting(term.1, docid_local, false, 0, 0, 0);
}
match file {
FileType::Path(file_path) => {
if let Err(e) = shard_mut.copy_file(&file_path, docid_local) {
println!("can't copy PDF {} {}", file_path.display(), e);
}
}
FileType::Bytes(file_path, file_bytes) => {
if let Err(e) = shard_mut.write_file(&file_bytes, docid_local) {
println!("can't copy PDF {} {}", file_path.display(), e);
}
}
_ => {}
}
if !shard_mut.stored_field_names.is_empty() {
shard_mut.store_document(docid_local, document_item.document);
}
if do_commit {
drop(shard_mut);
warmup(self).await;
}
}
}
pub(crate) struct DocumentItem {
pub document: Document,
pub unique_terms: AHashMap<String, TermObject>,
pub field_vec: Vec<(usize, u8, u32, u32)>,
}