use std::collections::HashMap;
use crate::analysis::AnalyzerRegistry;
use crate::core::{DocId, FieldId, LuciError, SegmentId};
use crate::mapping::Mapping;
use crate::deletion::DeletionMap;
use crate::segment::reader::SegmentReader;
pub struct MergeOutput {
pub bytes: Vec<u8>,
pub ord_remap: HashMap<(SegmentId, u32), (SegmentId, u32)>,
}
pub fn merge_segments(
new_segment_id: SegmentId,
readers: &[&SegmentReader],
deletions: &DeletionMap,
schema: &Mapping,
analyzers: &AnalyzerRegistry,
) -> Result<MergeOutput, LuciError> {
use crate::segment::builder::SegmentBuilder;
let mut ord_maps: Vec<Vec<Option<u32>>> = Vec::with_capacity(readers.len());
let mut total_live_count: u32 = 0;
for reader in readers {
let seg_id = reader.segment_id();
let dc = reader.doc_count() as usize;
let mut map = vec![None; dc];
for doc_idx in 0..reader.doc_count() {
if deletions.is_deleted(seg_id, DocId::new(doc_idx)) {
continue;
}
map[doc_idx as usize] = Some(total_live_count);
total_live_count += 1;
}
ord_maps.push(map);
}
let mut builder = SegmentBuilder::new(new_segment_id, schema);
for reader in readers.iter() {
let seg_id = reader.segment_id();
let doc_store = reader.doc_store();
for doc_idx in 0..reader.doc_count() {
let doc_id = DocId::new(doc_idx);
if deletions.is_deleted(seg_id, doc_id) {
continue;
}
let source_bytes = match doc_store.get(doc_idx) {
Some(bytes) => bytes,
None => continue,
};
let doc: serde_json::Value = match serde_json::from_slice(&source_bytes) {
Ok(v) => v,
Err(_) => continue,
};
index_document(&doc, &source_bytes, schema, analyzers, &mut builder).map_err(|e| {
match e {
LuciError::InvalidValue(msg) => LuciError::InvalidValue(format!(
"segment {seg_id:?} document {doc_idx}: {msg}"
)),
other => other,
}
})?;
}
}
let mut ord_remap: HashMap<(SegmentId, u32), (SegmentId, u32)> = HashMap::new();
for (reader_idx, reader) in readers.iter().enumerate() {
let seg_id = reader.segment_id();
for (src_doc_idx, opt) in ord_maps[reader_idx].iter().enumerate() {
if let Some(merged_ord) = opt {
ord_remap.insert((seg_id, src_doc_idx as u32), (new_segment_id, *merged_ord));
}
}
}
Ok(MergeOutput {
bytes: builder.build(),
ord_remap,
})
}
fn index_document(
doc: &serde_json::Value,
source_bytes: &[u8],
schema: &Mapping,
analyzers: &AnalyzerRegistry,
builder: &mut crate::segment::builder::SegmentBuilder,
) -> Result<(), LuciError> {
use crate::analysis::Token;
use crate::columnar::writer::ColumnValue;
use crate::mapping::FieldType;
use crate::spatial::geo::GeoPoint;
let obj = match doc.as_object() {
Some(o) => o,
None => return Ok(()),
};
let mut analyzed_fields: Vec<(FieldId, Vec<Token>)> = Vec::new();
let mut column_values: Vec<(FieldId, ColumnValue)> = Vec::new();
let mut geo_points: Vec<(FieldId, GeoPoint)> = Vec::new();
let mut geo_shapes: Vec<(FieldId, ::geo::Geometry<f64>)> = Vec::new();
for (field_name, value) in obj {
let field_id = match schema.field_id(field_name) {
Some(id) => id,
None => continue,
};
let mapping = schema.field(field_id);
let tokens = match &mapping.field_type {
FieldType::Text => {
let text = value.as_str().unwrap_or_default();
let analyzer_name = mapping.analyzer.as_deref().unwrap_or("standard");
let analyzer = analyzers.get(analyzer_name);
analyzer.analyze(text)
}
FieldType::Keyword => {
let text = match value {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
vec![Token::new(text, 0, 0, 0)]
}
FieldType::Ip => {
let text = value.as_str().unwrap_or_default();
let normalized = crate::ip::normalize_ip(text);
if normalized.is_empty() {
Vec::new()
} else {
vec![Token::new(normalized, 0, 0, 0)]
}
}
_ => Vec::new(),
};
if !tokens.is_empty() && mapping.indexed {
analyzed_fields.push((field_id, tokens));
}
if matches!(mapping.field_type, FieldType::GeoPoint) {
if let Some(point) = GeoPoint::from_json(value) {
geo_points.push((field_id, point));
}
}
if matches!(mapping.field_type, FieldType::GeoShape) {
if let Some(geom) = crate::spatial::shape::parse_geojson(value) {
geo_shapes.push((field_id, geom));
}
}
if mapping.doc_values {
let col_val = match &mapping.field_type {
FieldType::Keyword => match value {
serde_json::Value::String(s) => ColumnValue::keyword(s.clone())?,
serde_json::Value::Null => ColumnValue::Null,
other => ColumnValue::keyword(other.to_string())?,
},
FieldType::Integer | FieldType::Long => match value {
serde_json::Value::Number(n) => ColumnValue::I64(n.as_i64().unwrap_or(0)),
_ => ColumnValue::Null,
},
FieldType::Float | FieldType::Double => match value {
serde_json::Value::Number(n) => ColumnValue::F64(n.as_f64().unwrap_or(0.0)),
_ => ColumnValue::Null,
},
FieldType::Boolean => match value {
serde_json::Value::Bool(b) => ColumnValue::Bool(*b),
_ => ColumnValue::Null,
},
FieldType::TokenCount => {
let text = value.as_str().unwrap_or_default();
let analyzer_name = mapping.analyzer.as_deref().unwrap_or("standard");
let analyzer = analyzers.get(analyzer_name);
ColumnValue::I64(analyzer.analyze(text).len() as i64)
}
FieldType::Ip => {
let text = value.as_str().unwrap_or_default();
match crate::ip::ip_to_i64(text) {
Some(v) => ColumnValue::I64(v),
None => ColumnValue::Null,
}
}
_ => ColumnValue::Null,
};
column_values.push((field_id, col_val));
}
}
let has_nested = schema
.fields()
.iter()
.any(|f| matches!(f.field_type, FieldType::Nested));
builder.add_document(&analyzed_fields, source_bytes);
if has_nested {
builder.mark_parent();
}
for (field_id, col_val) in column_values {
builder.add_column_value(field_id, col_val);
}
for (field_id, point) in geo_points {
builder.add_geo_point(field_id, point);
}
for (field_id, geom) in &geo_shapes {
builder.add_geo_shape(*field_id, geom);
}
for mapping in schema.fields() {
if !matches!(mapping.field_type, FieldType::Nested) {
continue;
}
let field_name = &mapping.name;
if let Some(serde_json::Value::Array(nested_arr)) = obj.get(field_name) {
for nested_obj in nested_arr {
if let Some(nested_map) = nested_obj.as_object() {
let mut nested_fields: Vec<(FieldId, Vec<Token>)> = Vec::new();
for (nested_key, nested_val) in nested_map {
let prefixed = format!("{field_name}.{nested_key}");
if let Some(fid) = schema.field_id(&prefixed) {
let m = schema.field(fid);
let tokens = match &m.field_type {
FieldType::Text => {
let text = nested_val.as_str().unwrap_or_default();
let analyzer =
analyzers.get(m.analyzer.as_deref().unwrap_or("standard"));
analyzer.analyze(text)
}
FieldType::Keyword => {
let text = match nested_val {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
vec![Token::new(text, 0, 0, 0)]
}
_ => continue,
};
if !tokens.is_empty() {
nested_fields.push((fid, tokens));
}
}
}
builder.add_document(&nested_fields, b"{}");
builder.mark_nested();
}
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::columnar::writer::ColumnType;
use crate::query::term::TermQuery;
use crate::segment::builder::SegmentBuilder;
use crate::segment::reader::SegmentReader;
use crate::core::{DocId, FieldId};
use crate::mapping::FieldType;
fn test_schema() -> Mapping {
Mapping::builder()
.field("body", FieldType::Text)
.field("tag", FieldType::Keyword)
.build()
}
fn build_segment(id: u64, docs: &[serde_json::Value]) -> Vec<u8> {
let schema = test_schema();
let analyzers = AnalyzerRegistry::new();
let mut builder = SegmentBuilder::new(SegmentId::new(id), &schema);
for doc in docs {
index_document(
doc,
&serde_json::to_vec(doc).unwrap(),
&schema,
&analyzers,
&mut builder,
)
.unwrap();
}
builder.build()
}
#[test]
fn merges_two_segments() {
let s1 = build_segment(
1,
&[
serde_json::json!({"body": "hello world", "tag": "a"}),
serde_json::json!({"body": "goodbye world", "tag": "a"}),
],
);
let s2 = build_segment(
2,
&[
serde_json::json!({"body": "hello luci", "tag": "b"}),
serde_json::json!({"body": "luci search engine", "tag": "b"}),
],
);
let r1 = SegmentReader::open(s1).unwrap();
let r2 = SegmentReader::open(s2).unwrap();
let readers: Vec<&SegmentReader> = vec![&r1, &r2];
let schema = test_schema();
let analyzers = AnalyzerRegistry::new();
let deletions = DeletionMap::new();
let new_id = SegmentId::new(3);
let out = merge_segments(new_id, &readers, &deletions, &schema, &analyzers).unwrap();
let merged = SegmentReader::open(out.bytes).unwrap();
assert_eq!(merged.doc_count(), 4);
use crate::search::searcher::Searcher;
let store = crate::search::segment_store::SegmentStore::new(
vec![merged],
AnalyzerRegistry::new(),
None,
None,
);
let searcher = Searcher::new(&store);
let res = searcher
.search_query(
&TermQuery {
field: "body".into(),
value: "hello".into(),
},
10,
0,
)
.unwrap();
assert_eq!(res.total_hits.value, 2);
}
#[test]
fn merge_produces_blocked() {
let s1 = build_segment(
1,
&[
serde_json::json!({"body": "one", "tag": "alpha"}),
serde_json::json!({"body": "two", "tag": "beta"}),
],
);
let s2 = build_segment(
2,
&[
serde_json::json!({"body": "three", "tag": "alpha"}),
serde_json::json!({"body": "four", "tag": "gamma"}),
],
);
let r1 = SegmentReader::open(s1).unwrap();
let r2 = SegmentReader::open(s2).unwrap();
let readers: Vec<&SegmentReader> = vec![&r1, &r2];
let schema = test_schema();
let analyzers = AnalyzerRegistry::new();
let deletions = DeletionMap::new();
let out =
merge_segments(SegmentId::new(3), &readers, &deletions, &schema, &analyzers).unwrap();
let merged = SegmentReader::open(out.bytes).unwrap();
assert_eq!(merged.doc_count(), 4);
let tag_fid = schema.field_id("tag").unwrap();
let col = merged.column(tag_fid).expect("merged tag column present");
assert_eq!(col.col_type(), ColumnType::KeywordBlocked);
assert_eq!(col.dict_size(), 3); let tags: Vec<Option<&str>> = (0..4).map(|d| col.keyword_value(d)).collect();
assert!(tags.contains(&Some("alpha")));
assert!(tags.contains(&Some("beta")));
assert!(tags.contains(&Some("gamma")));
assert!(tags.iter().all(|t| t.is_some()));
}
#[test]
fn applies_deletions() {
let s1 = build_segment(
1,
&[
serde_json::json!({"body": "alpha"}),
serde_json::json!({"body": "beta"}),
serde_json::json!({"body": "gamma"}),
],
);
let r1 = SegmentReader::open(s1).unwrap();
let readers: Vec<&SegmentReader> = vec![&r1];
let schema = test_schema();
let analyzers = AnalyzerRegistry::new();
let mut deletions = DeletionMap::new();
deletions.mark_deleted(SegmentId::new(1), DocId::new(1));
let new_id = SegmentId::new(2);
let out = merge_segments(new_id, &readers, &deletions, &schema, &analyzers).unwrap();
let merged = SegmentReader::open(out.bytes).unwrap();
assert_eq!(merged.doc_count(), 2);
assert_eq!(
out.ord_remap.get(&(SegmentId::new(1), 0)),
Some(&(new_id, 0))
);
assert!(!out.ord_remap.contains_key(&(SegmentId::new(1), 1)));
assert_eq!(
out.ord_remap.get(&(SegmentId::new(1), 2)),
Some(&(new_id, 1))
);
let _ = FieldId::new(0);
}
}