use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::Arc;
use crate::core::{DocId, FieldId, SegmentId};
use crate::columnar::owned::OwnedColumn;
use crate::columnar::reader::ColumnReader;
use crate::query::ast::QueryExpression;
use crate::search::SortValue;
use crate::search::reader::Reader;
use crate::search::segment_store::SegmentStore;
use crate::segment::reader::SegmentReader;
#[derive(Default)]
pub(crate) struct FieldReaderCache {
readers: RefCell<HashMap<(SegmentId, FieldId), OwnedColumn>>,
}
impl FieldReaderCache {
pub fn with_column<R>(
&self,
segment: &SegmentReader,
field_id: FieldId,
f: impl FnOnce(&ColumnReader<'_>) -> R,
) -> Option<R> {
let key = (segment.segment_id(), field_id);
if let Some(col) = self.readers.borrow().get(&key) {
return Some(f(col.inner()));
}
let owned = OwnedColumn::new(Some(field_id), segment)?;
let result = f(owned.inner());
self.readers.borrow_mut().insert(key, owned);
Some(result)
}
}
#[derive(Clone, Debug)]
pub(crate) struct HitRef {
pub doc_id: DocId,
pub segment_id: SegmentId,
pub score: f32,
pub sort_values: Option<Vec<SortValue>>,
pub collapse_key: Option<serde_json::Value>,
}
pub struct SearchResults {
field_readers: FieldReaderCache,
hits: Vec<HitRef>,
total_hits: crate::search::TotalHits,
aggregations: std::collections::HashMap<String, crate::agg::AggregationResult>,
store: Arc<SegmentStore>,
query: Option<QueryExpression>,
}
impl SearchResults {
pub(crate) fn new(
hits: Vec<HitRef>,
total_hits: crate::search::TotalHits,
aggregations: std::collections::HashMap<String, crate::agg::AggregationResult>,
store: Arc<SegmentStore>,
query: Option<QueryExpression>,
) -> Self {
Self {
field_readers: FieldReaderCache::default(),
hits,
total_hits,
aggregations,
store,
query,
}
}
pub fn iter(&self) -> impl Iterator<Item = Hit<'_>> {
self.hits.iter().map(move |hr| Hit {
hit_ref: hr,
store: &self.store,
query: self.query.as_ref(),
cache: &self.field_readers,
})
}
pub fn hit(&self, index: usize) -> Option<Hit<'_>> {
self.hits.get(index).map(|hr| Hit {
hit_ref: hr,
store: &self.store,
query: self.query.as_ref(),
cache: &self.field_readers,
})
}
pub fn len(&self) -> usize {
self.hits.len()
}
pub fn is_empty(&self) -> bool {
self.hits.is_empty()
}
pub fn total_hits(&self) -> &crate::search::TotalHits {
&self.total_hits
}
pub fn aggregations(
&self,
) -> &std::collections::HashMap<String, crate::agg::AggregationResult> {
&self.aggregations
}
pub fn store(&self) -> &Arc<SegmentStore> {
&self.store
}
}
pub struct Hit<'a> {
hit_ref: &'a HitRef,
store: &'a SegmentStore,
query: Option<&'a QueryExpression>,
cache: &'a FieldReaderCache,
}
impl<'a> Hit<'a> {
pub fn score(&self) -> f32 {
self.hit_ref.score
}
pub fn doc_id(&self) -> DocId {
self.hit_ref.doc_id
}
pub fn segment_id(&self) -> SegmentId {
self.hit_ref.segment_id
}
pub fn sort_values(&self) -> Option<&[SortValue]> {
self.hit_ref.sort_values.as_deref()
}
pub fn collapse_key(&self) -> Option<&serde_json::Value> {
self.hit_ref.collapse_key.as_ref()
}
pub fn source(&self) -> Option<serde_json::Value> {
let reader = Reader::new(self.store);
reader.get_source(self.hit_ref.segment_id, self.hit_ref.doc_id)
}
pub fn source_filtered(
&self,
filter: &crate::search::SourceFilter,
) -> Option<serde_json::Value> {
use crate::search::SourceFilter;
match filter {
SourceFilter::Disabled => None,
SourceFilter::Enabled => self.source(),
f => {
let source = self.source()?;
crate::search::filter_source(&source, f)
}
}
}
pub fn fields(&self, names: &[String]) -> serde_json::Map<String, serde_json::Value> {
let reader = Reader::new(self.store);
reader.retrieve_fields_cached(
self.cache,
self.hit_ref.segment_id,
self.hit_ref.doc_id,
names,
)
}
pub fn id(&self) -> Option<String> {
let reader = Reader::new(self.store);
let mut fields = reader.retrieve_fields_cached(
self.cache,
self.hit_ref.segment_id,
self.hit_ref.doc_id,
&["_id".to_string()],
);
let first = match fields.remove("_id")? {
serde_json::Value::Array(arr) => arr.into_iter().next()?,
_ => return None,
};
match first {
serde_json::Value::String(s) => Some(s),
_ => None,
}
}
pub fn source_bytes(&self) -> Option<Vec<u8>> {
let reader = Reader::new(self.store);
reader.get_source_bytes(self.hit_ref.segment_id, self.hit_ref.doc_id)
}
pub fn explain(&self) -> crate::core::Result<Option<crate::search::Explanation>> {
use crate::query::Query as _;
let Some(query) = self.query else {
return Ok(None);
};
match query {
QueryExpression::Scoring(scoring) => {
let searcher = crate::search::searcher::Searcher::new(self.store);
let weight = scoring.bind(&searcher, crate::core::ScoreMode::Complete)?;
let Some(segment) = self
.store
.segments()
.iter()
.find(|s| s.segment_id() == self.hit_ref.segment_id)
else {
return Ok(None);
};
Ok(weight.explain(segment, self.hit_ref.doc_id).ok())
}
QueryExpression::Ranking(_) => Ok(Some(crate::search::Explanation::leaf(
self.hit_ref.score,
"fusion score".to_string(),
))),
}
}
pub fn highlight(&self, field: &str) -> Option<Vec<crate::search::highlight::Highlight>> {
let config = crate::search::highlight::HighlightConfig {
fields: vec![crate::search::highlight::HighlightFieldConfig {
field: field.to_string(),
fragment_size: 0,
number_of_fragments: 0,
}],
require_field_match: true,
order: crate::search::highlight::HighlightOrder::None,
};
let mut by_field = self.highlight_with_config(&config)?;
by_field.remove(field)
}
pub fn highlight_with_config(
&self,
config: &crate::search::highlight::HighlightConfig,
) -> Option<std::collections::HashMap<String, Vec<crate::search::highlight::Highlight>>> {
let query = self.query?;
let searcher = crate::search::searcher::Searcher::new(self.store);
let mut query_terms = std::collections::HashMap::new();
for scoring in query.scoring_expressions() {
let terms = crate::search::highlight::extract_query_terms(scoring, &searcher);
for (field, field_terms) in terms {
query_terms
.entry(field)
.or_insert_with(std::collections::HashSet::new)
.extend(field_terms);
}
}
let reader = Reader::new(self.store);
let source = reader.get_source(self.hit_ref.segment_id, self.hit_ref.doc_id)?;
crate::search::highlight::highlight_hit(
&source,
config,
&query_terms,
self.store.analyzers(),
self.store.mapping(),
)
}
pub fn inner_hits(
&self,
) -> crate::core::Result<Option<std::collections::HashMap<String, serde_json::Value>>> {
use crate::core::NO_MORE_DOCS;
use std::cmp::Ordering;
let Some(query) = self.query else {
return Ok(None);
};
let Some(scoring) = query.as_scoring() else {
return Ok(None);
};
let searcher = crate::search::searcher::Searcher::new(self.store);
let specs = crate::index::extract_inner_hit_specs(scoring, &searcher)?;
if specs.is_empty() {
return Ok(None);
}
let Some(segment) = self
.store
.segments()
.iter()
.find(|s| s.segment_id() == self.hit_ref.segment_id)
else {
return Ok(None);
};
let Some(parent_bitset) = segment.parent_bitset() else {
return Ok(None);
};
let parent_doc = self.hit_ref.doc_id.as_u32() as usize;
let child_start = parent_doc + 1;
let child_end = (child_start..parent_bitset.len())
.find(|&i| parent_bitset[i])
.unwrap_or(parent_bitset.len());
if child_start >= child_end {
return Ok(None);
}
let reader = Reader::new(self.store);
let parent_source = reader.get_source(self.hit_ref.segment_id, self.hit_ref.doc_id);
let mut inner_hits_map = std::collections::HashMap::new();
for spec in &specs {
let supplier = match spec.weight.scorer_supplier(segment)? {
Some(s) => s,
None => continue,
};
let mut scorer = supplier.scorer()?;
scorer.advance(crate::core::DocId::new(child_start as u32));
let mut matches: Vec<(u32, f32)> = Vec::new();
while scorer.doc_id() != NO_MORE_DOCS {
let doc = scorer.doc_id().as_u32();
if doc >= child_end as u32 {
break;
}
matches.push((doc, scorer.score()));
scorer.next();
}
if matches.is_empty() {
continue;
}
matches.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(Ordering::Equal));
let total = matches.len();
let page: Vec<_> = matches
.into_iter()
.skip(spec.config.from)
.take(spec.config.size)
.collect();
let inner_hit_docs: Vec<serde_json::Value> = page
.iter()
.map(|(child_doc, score)| {
let offset = (*child_doc as usize).saturating_sub(child_start);
let nested_source = parent_source
.as_ref()
.and_then(|ps| ps.get(&spec.path))
.and_then(|arr| arr.as_array())
.and_then(|a| a.get(offset))
.cloned();
serde_json::json!({
"_nested": { "field": spec.path, "offset": offset },
"_score": score,
"_source": nested_source,
})
})
.collect();
inner_hits_map.insert(
spec.name.clone(),
serde_json::json!({
"hits": {
"total": { "value": total, "relation": "eq" },
"hits": inner_hit_docs,
}
}),
);
}
if inner_hits_map.is_empty() {
Ok(None)
} else {
Ok(Some(inner_hits_map))
}
}
}
#[cfg(test)]
mod tests {
use crate::columnar::owned::COLUMN_OPENS;
use crate::index::Index;
use crate::mapping::{FieldType, Mapping};
use crate::search::expression::SearchExpression;
use serde_json::json;
use std::path::Path;
fn test_dir(name: &str) -> std::path::PathBuf {
let dir =
std::env::temp_dir().join(format!("luci_results_cache_{}_{name}", std::process::id()));
let _ = std::fs::remove_dir_all(&dir);
dir
}
fn cleanup(path: &Path) {
let _ = std::fs::remove_dir_all(path);
}
fn cache_schema() -> Mapping {
Mapping::builder()
.field("title", FieldType::Text)
.field("category", FieldType::Keyword)
.field("rank", FieldType::Long)
.build()
}
fn match_all_size(size: usize) -> SearchExpression {
SearchExpression::from_json(json!({"match_all": {}}), size).unwrap()
}
#[test]
fn cache_amortizes_within_single_segment() {
let path = test_dir("single_seg");
let index = Index::create_with_mapping(&path, cache_schema()).unwrap();
let docs: Vec<_> = (0..100)
.map(|i| json!({"title": format!("doc {i}"), "category": "c", "rank": i}))
.collect();
index.bulk(docs).unwrap();
let results = index.search(&match_all_size(10)).unwrap();
assert_eq!(results.len(), 10);
COLUMN_OPENS.with(|c| c.set(0));
for hit in results.iter() {
assert!(hit.id().is_some());
}
let opens = COLUMN_OPENS.with(|c| c.get());
assert_eq!(opens, 1, "expected 1 open for 10 hits in one segment");
cleanup(&path);
}
#[test]
fn cache_amortizes_across_multiple_segments() {
let path = test_dir("multi_seg");
let index = Index::create_with_mapping(&path, cache_schema()).unwrap();
for s in 0..3 {
let docs: Vec<_> = (0..50)
.map(|i| json!({"title": format!("doc {s}-{i}"), "category": "c", "rank": i}))
.collect();
index.bulk(docs).unwrap();
}
let results = index.search(&match_all_size(150)).unwrap();
assert_eq!(results.len(), 150);
let segs: std::collections::HashSet<_> = results.iter().map(|h| h.segment_id()).collect();
assert_eq!(segs.len(), 3, "expected hits across 3 segments");
COLUMN_OPENS.with(|c| c.set(0));
for hit in results.iter() {
assert!(hit.id().is_some());
}
let opens = COLUMN_OPENS.with(|c| c.get());
assert_eq!(opens, 3, "expected 1 open per segment × 3 segments");
cleanup(&path);
}
#[test]
fn cache_amortizes_multi_field() {
let path = test_dir("multi_field");
let index = Index::create_with_mapping(&path, cache_schema()).unwrap();
let docs: Vec<_> = (0..50)
.map(|i| json!({"title": format!("doc {i}"), "category": "c", "rank": i}))
.collect();
index.bulk(docs).unwrap();
let results = index.search(&match_all_size(10)).unwrap();
assert_eq!(results.len(), 10);
let names = vec![
"_id".to_string(),
"category".to_string(),
"rank".to_string(),
];
COLUMN_OPENS.with(|c| c.set(0));
for hit in results.iter() {
let f = hit.fields(&names);
assert_eq!(f.len(), 3);
}
let opens = COLUMN_OPENS.with(|c| c.get());
assert_eq!(opens, 3, "expected 1 open per field × 3 fields");
cleanup(&path);
}
#[test]
fn cache_drops_with_results() {
let path = test_dir("drop");
let index = Index::create_with_mapping(&path, cache_schema()).unwrap();
let docs: Vec<_> = (0..50)
.map(|i| json!({"title": format!("doc {i}"), "category": "c", "rank": i}))
.collect();
index.bulk(docs).unwrap();
{
let r1 = index.search(&match_all_size(5)).unwrap();
COLUMN_OPENS.with(|c| c.set(0));
for hit in r1.iter() {
assert!(hit.id().is_some());
}
assert_eq!(COLUMN_OPENS.with(|c| c.get()), 1);
}
let r2 = index.search(&match_all_size(5)).unwrap();
COLUMN_OPENS.with(|c| c.set(0));
for hit in r2.iter() {
assert!(hit.id().is_some());
}
assert_eq!(
COLUMN_OPENS.with(|c| c.get()),
1,
"fresh SearchResults must open the column again"
);
cleanup(&path);
}
#[test]
#[should_panic(expected = "already borrowed")]
fn nested_with_column_panics_on_outer_hit_inner_miss() {
let path = test_dir("nested_panic");
let index = Index::create_with_mapping(&path, cache_schema()).unwrap();
let docs: Vec<_> = (0..10)
.map(|i| json!({"title": format!("doc {i}"), "category": "c", "rank": i}))
.collect();
index.bulk(docs).unwrap();
let results = index.search(&match_all_size(1)).unwrap();
let store = results.store();
let segment = store.segments().iter().next().unwrap();
let id_field = segment
.header()
.fields
.iter()
.find(|f| f.field_name == "_id")
.unwrap()
.field_id;
let cat_field = segment
.header()
.fields
.iter()
.find(|f| f.field_name == "category")
.unwrap()
.field_id;
let cache = super::FieldReaderCache::default();
let _ = cache.with_column(segment, id_field, |_| ());
let _ = cache.with_column(segment, id_field, |_outer| {
cache.with_column(segment, cat_field, |_inner| {});
});
cleanup(&path);
}
}