use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use crate::{
add_result::{PostingListObjectSingle, add_result_singleterm_multifield},
compatible::{_blsr_u64, _mm_tzcnt_64},
index::{
AccessType, BlockObjectIndex, CompressionType, NonUniquePostingListObjectQuery,
PostingListObjectQuery, SORT_FLAG, SPEEDUP_FLAG, Shard,
},
intersection::{BlockObject, bitpacking32_get_delta},
search::{FilterSparse, ResultType, SearchResult},
utils::{read_u16, read_u64},
};
use ahash::AHashSet;
use num_traits::FromPrimitive;
#[allow(clippy::too_many_arguments)]
#[allow(clippy::ptr_arg)]
#[allow(non_snake_case)]
pub(crate) async fn single_docid<'a>(
shard: &'a Shard,
query_list: &mut Vec<PostingListObjectQuery<'a>>,
not_query_list: &mut [PostingListObjectQuery<'a>],
blo: &BlockObjectIndex,
term_index: usize,
result_count: &mut i32,
search_result: &mut SearchResult<'_>,
top_k: usize,
result_type: &ResultType,
field_filter_set: &AHashSet<u16>,
facet_filter: &[FilterSparse],
) {
let block_score = blo.max_block_score;
let filtered = !not_query_list.is_empty()
|| !field_filter_set.is_empty()
|| !search_result.topk_candidates.result_sort.is_empty()
|| (!search_result.query_facets.is_empty() || !facet_filter.is_empty())
&& result_type != &ResultType::Topk;
if SPEEDUP_FLAG
&& (result_type == &ResultType::Count
|| (search_result.topk_candidates.current_heap_size == top_k
&& block_score <= search_result.topk_candidates._elements[0].score))
&& (!filtered || result_type == &ResultType::Topk)
{
return;
}
let block_id = blo.block_id;
for plo in not_query_list.iter_mut() {
let query_list_item_mut = plo;
let result = query_list_item_mut
.blocks
.binary_search_by(|block| block.block_id.cmp(&block_id));
match result {
Ok(p_block) => {
query_list_item_mut.bm25_flag = true;
query_list_item_mut.p_block = p_block as i32
}
Err(_) => {
query_list_item_mut.bm25_flag = false;
continue;
}
};
let blo = &query_list_item_mut.blocks[query_list_item_mut.p_block as usize];
query_list_item_mut.compression_type =
FromPrimitive::from_u32(blo.compression_type_pointer >> 30).unwrap();
query_list_item_mut.rank_position_pointer_range =
blo.compression_type_pointer & 0b0011_1111_1111_1111_1111_1111_1111_1111;
let posting_pointer_size_sum = blo.pointer_pivot_p_docid as usize * 2
+ if (blo.pointer_pivot_p_docid as usize) <= blo.posting_count as usize {
((blo.posting_count as usize + 1) - blo.pointer_pivot_p_docid as usize) * 3
} else {
0
};
query_list_item_mut.compressed_doc_id_range =
query_list_item_mut.rank_position_pointer_range as usize + posting_pointer_size_sum;
if shard.meta.access_type == AccessType::Mmap {
let segment = &shard.segments_index[query_list_item_mut.key0 as usize];
query_list_item_mut.byte_array =
&shard.index_file_mmap[segment.byte_array_blocks_pointer[blo.block_id as usize].0
..segment.byte_array_blocks_pointer[blo.block_id as usize].0
+ segment.byte_array_blocks_pointer[blo.block_id as usize].1];
} else {
query_list_item_mut.byte_array = &shard.segments_index
[query_list_item_mut.key0 as usize]
.byte_array_blocks[blo.block_id as usize];
}
query_list_item_mut.p_docid = 0;
query_list_item_mut.p_docid_count =
query_list_item_mut.blocks[query_list_item_mut.p_block as usize].posting_count as usize
+ 1;
query_list_item_mut.docid = 0;
if query_list_item_mut.compression_type == CompressionType::Rle {
query_list_item_mut.p_run_count = read_u16(
query_list_item_mut.byte_array,
query_list_item_mut.compressed_doc_id_range,
) as i32;
let startdocid = read_u16(
query_list_item_mut.byte_array,
query_list_item_mut.compressed_doc_id_range + 2,
);
let runlength = read_u16(
query_list_item_mut.byte_array,
query_list_item_mut.compressed_doc_id_range + 4,
);
query_list_item_mut.docid = startdocid as i32;
query_list_item_mut.run_end = (startdocid + runlength) as i32;
query_list_item_mut.p_run_sum = runlength as i32;
query_list_item_mut.p_run = 6;
}
}
let compression_type: CompressionType =
FromPrimitive::from_u32(blo.compression_type_pointer >> 30).unwrap();
let rank_position_pointer_range: u32 =
blo.compression_type_pointer & 0b0011_1111_1111_1111_1111_1111_1111_1111;
let posting_pointer_size_sum = blo.pointer_pivot_p_docid as u32 * 2
+ if (blo.pointer_pivot_p_docid as usize) <= blo.posting_count as usize {
((blo.posting_count as u32 + 1) - blo.pointer_pivot_p_docid as u32) * 3
} else {
0
};
let compressed_doc_id_range: u32 = rank_position_pointer_range + posting_pointer_size_sum;
let query_list_item_mut = &mut query_list[term_index];
let byte_array = if shard.meta.access_type == AccessType::Mmap {
let segment = &shard.segments_index[query_list_item_mut.key0 as usize];
&shard.index_file_mmap[segment.byte_array_blocks_pointer[blo.block_id as usize].0
..segment.byte_array_blocks_pointer[blo.block_id as usize].0
+ segment.byte_array_blocks_pointer[blo.block_id as usize].1]
} else {
&shard.segments_index[query_list_item_mut.key0 as usize].byte_array_blocks
[blo.block_id as usize]
};
let mut plo = PostingListObjectSingle {
rank_position_pointer_range,
pointer_pivot_p_docid: blo.pointer_pivot_p_docid,
byte_array,
p_docid: 0,
idf: query_list_item_mut.idf,
idf_ngram1: query_list_item_mut.idf_ngram1,
idf_ngram2: query_list_item_mut.idf_ngram2,
idf_ngram3: query_list_item_mut.idf_ngram3,
ngram_type: query_list_item_mut.ngram_type.clone(),
};
match compression_type {
CompressionType::Array => {
for i in 0..=blo.posting_count {
plo.p_docid = i as i32;
add_result_singleterm_multifield(
shard,
((blo.block_id as usize) << 16)
| read_u16(
byte_array,
compressed_doc_id_range as usize + (i as usize * 2),
) as usize,
result_count,
search_result,
top_k,
result_type,
field_filter_set,
facet_filter,
&plo,
not_query_list,
block_score,
);
}
}
CompressionType::Delta => {
let deltasizebits = 4;
let rangebits: i32 =
byte_array[compressed_doc_id_range as usize] as i32 >> (8 - deltasizebits);
let mut docid_old: i32 = -1;
let mut bitposition: u32 = (compressed_doc_id_range << 3) + deltasizebits;
for i in 0..=blo.posting_count {
plo.p_docid = i as i32;
let delta = bitpacking32_get_delta(byte_array, bitposition, rangebits as u32);
bitposition += rangebits as u32;
let doc_id: u16 = (docid_old + delta as i32 + 1) as u16;
docid_old = doc_id as i32;
add_result_singleterm_multifield(
shard,
((blo.block_id as usize) << 16) | doc_id as usize,
result_count,
search_result,
top_k,
result_type,
field_filter_set,
facet_filter,
&plo,
not_query_list,
block_score,
);
}
}
CompressionType::Rle => {
let runs_count = read_u16(&byte_array[compressed_doc_id_range as usize..], 0) as i32;
plo.p_docid = 0;
for i in (1..(runs_count << 1) + 1).step_by(2) {
let startdocid = read_u16(
&byte_array[compressed_doc_id_range as usize..],
i as usize * 2,
);
let runlength = read_u16(
&byte_array[compressed_doc_id_range as usize..],
(i + 1) as usize * 2,
);
for j in 0..=runlength {
add_result_singleterm_multifield(
shard,
((blo.block_id as usize) << 16) | (startdocid + j) as usize,
result_count,
search_result,
top_k,
result_type,
field_filter_set,
facet_filter,
&plo,
not_query_list,
block_score,
);
plo.p_docid += 1;
}
}
}
CompressionType::Bitmap => {
plo.p_docid = 0;
let block_id_msb = (blo.block_id as usize) << 16;
for ulong_pos in 0u64..1024 {
let mut intersect: u64 = read_u64(
&byte_array[compressed_doc_id_range as usize..],
ulong_pos as usize * 8,
);
while intersect != 0 {
let bit_pos = unsafe { _mm_tzcnt_64(intersect) } as u64;
intersect = unsafe { _blsr_u64(intersect) };
add_result_singleterm_multifield(
shard,
block_id_msb | ((ulong_pos << 6) + bit_pos) as usize,
result_count,
search_result,
top_k,
result_type,
field_filter_set,
facet_filter,
&plo,
not_query_list,
block_score,
);
plo.p_docid += 1;
}
}
}
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn single_blockid<'a>(
index: &'a Shard,
non_unique_query_list: &mut [NonUniquePostingListObjectQuery<'a>],
query_list: &mut Vec<PostingListObjectQuery<'a>>,
not_query_list: &mut [PostingListObjectQuery<'a>],
result_count_arc: &Arc<AtomicUsize>,
search_result: &mut SearchResult<'_>,
top_k: usize,
result_type: &ResultType,
field_filter_set: &AHashSet<u16>,
facet_filter: &[FilterSparse],
matching_blocks: &mut i32,
) {
let term_index = 0;
let filtered = !not_query_list.is_empty()
|| !field_filter_set.is_empty()
|| !index.delete_hashset.is_empty()
|| !search_result.topk_candidates.result_sort.is_empty()
|| (!search_result.query_facets.is_empty() || !facet_filter.is_empty())
&& result_type != &ResultType::Topk;
if (index.enable_single_term_topk || (result_type == &ResultType::Count))
&& (non_unique_query_list.len() <= 1)
&& !filtered
{
result_count_arc.fetch_add(
query_list[term_index].posting_count as usize,
Ordering::Relaxed,
);
return;
}
let mut result_count_local = 0;
let enable_inter_query_threading_single =
if !index.enable_search_quality_test && index.enable_inter_query_threading_auto {
query_list[term_index].posting_count / query_list[term_index].p_block_max as u32 > 10
} else {
index.enable_inter_query_threading
};
let mut block_vec: Vec<BlockObject> = Vec::new();
for (p_block, blo) in query_list[term_index].blocks.iter().enumerate() {
if !enable_inter_query_threading_single {
let block_score = blo.max_block_score;
if SPEEDUP_FLAG && SORT_FLAG {
let p_block_vec: Vec<i32> = vec![p_block as i32];
let block_object = BlockObject {
block_id: blo.block_id as usize,
block_score,
p_block_vec,
};
block_vec.push(block_object);
} else if !SPEEDUP_FLAG
|| (filtered && result_type != &ResultType::Topk)
|| search_result.topk_candidates.current_heap_size < top_k
|| block_score > search_result.topk_candidates._elements[0].score
{
single_docid(
index,
query_list,
not_query_list,
blo,
term_index,
&mut result_count_local,
search_result,
top_k,
result_type,
field_filter_set,
facet_filter,
)
.await;
}
}
}
if SORT_FLAG && SPEEDUP_FLAG {
block_vec.sort_unstable_by(|x, y| y.block_score.partial_cmp(&x.block_score).unwrap());
for (block_index, block) in block_vec.iter().enumerate() {
if !filtered && block_index == top_k {
break;
}
if (search_result.topk_candidates.current_heap_size == top_k)
&& (block.block_score <= search_result.topk_candidates._elements[0].score)
{
if !filtered {
break;
} else if result_type == &ResultType::Topk {
continue;
}
}
let blo = &query_list[term_index].blocks[block.p_block_vec[0] as usize];
single_docid(
index,
query_list,
not_query_list,
blo,
term_index,
&mut result_count_local,
search_result,
top_k,
result_type,
field_filter_set,
facet_filter,
)
.await;
}
}
result_count_arc.fetch_add(
if !filtered {
query_list[term_index].posting_count as usize
} else {
result_count_local as usize
},
Ordering::Relaxed,
);
*matching_blocks = query_list[term_index].blocks.len() as i32;
}