1use std::collections::HashMap;
18
19use crate::analysis::AnalyzerRegistry;
20use crate::core::{DocId, FieldId, LuciError, SegmentId};
21use crate::mapping::Mapping;
22
23use crate::deletion::DeletionMap;
24use crate::segment::reader::SegmentReader;
25
26pub struct MergeOutput {
34 pub bytes: Vec<u8>,
35 pub ord_remap: HashMap<(SegmentId, u32), (SegmentId, u32)>,
36}
37
38pub fn merge_segments(
49 new_segment_id: SegmentId,
50 readers: &[&SegmentReader],
51 deletions: &DeletionMap,
52 schema: &Mapping,
53 analyzers: &AnalyzerRegistry,
54) -> Result<MergeOutput, LuciError> {
55 use crate::segment::builder::SegmentBuilder;
56
57 let mut ord_maps: Vec<Vec<Option<u32>>> = Vec::with_capacity(readers.len());
62 let mut total_live_count: u32 = 0;
63 for reader in readers {
64 let seg_id = reader.segment_id();
65 let dc = reader.doc_count() as usize;
66 let mut map = vec![None; dc];
67 for doc_idx in 0..reader.doc_count() {
68 if deletions.is_deleted(seg_id, DocId::new(doc_idx)) {
69 continue;
70 }
71 map[doc_idx as usize] = Some(total_live_count);
72 total_live_count += 1;
73 }
74 ord_maps.push(map);
75 }
76
77 let mut builder = SegmentBuilder::new(new_segment_id, schema);
78
79 for reader in readers.iter() {
83 let seg_id = reader.segment_id();
84 let doc_store = reader.doc_store();
85
86 for doc_idx in 0..reader.doc_count() {
87 let doc_id = DocId::new(doc_idx);
88 if deletions.is_deleted(seg_id, doc_id) {
89 continue;
90 }
91 let source_bytes = match doc_store.get(doc_idx) {
92 Some(bytes) => bytes,
93 None => continue,
94 };
95 let doc: serde_json::Value = match serde_json::from_slice(&source_bytes) {
96 Ok(v) => v,
97 Err(_) => continue,
98 };
99
100 index_document(&doc, &source_bytes, schema, analyzers, &mut builder).map_err(|e| {
101 match e {
102 LuciError::InvalidValue(msg) => LuciError::InvalidValue(format!(
103 "segment {seg_id:?} document {doc_idx}: {msg}"
104 )),
105 other => other,
106 }
107 })?;
108 }
109 }
110
111 let mut ord_remap: HashMap<(SegmentId, u32), (SegmentId, u32)> = HashMap::new();
114 for (reader_idx, reader) in readers.iter().enumerate() {
115 let seg_id = reader.segment_id();
116 for (src_doc_idx, opt) in ord_maps[reader_idx].iter().enumerate() {
117 if let Some(merged_ord) = opt {
118 ord_remap.insert((seg_id, src_doc_idx as u32), (new_segment_id, *merged_ord));
119 }
120 }
121 }
122
123 Ok(MergeOutput {
124 bytes: builder.build(),
125 ord_remap,
126 })
127}
128
129fn index_document(
135 doc: &serde_json::Value,
136 source_bytes: &[u8],
137 schema: &Mapping,
138 analyzers: &AnalyzerRegistry,
139 builder: &mut crate::segment::builder::SegmentBuilder,
140) -> Result<(), LuciError> {
141 use crate::analysis::Token;
142 use crate::columnar::writer::ColumnValue;
143 use crate::mapping::FieldType;
144 use crate::spatial::geo::GeoPoint;
145
146 let obj = match doc.as_object() {
147 Some(o) => o,
148 None => return Ok(()),
149 };
150
151 let mut analyzed_fields: Vec<(FieldId, Vec<Token>)> = Vec::new();
152 let mut column_values: Vec<(FieldId, ColumnValue)> = Vec::new();
153 let mut geo_points: Vec<(FieldId, GeoPoint)> = Vec::new();
154 let mut geo_shapes: Vec<(FieldId, ::geo::Geometry<f64>)> = Vec::new();
155
156 for (field_name, value) in obj {
157 let field_id = match schema.field_id(field_name) {
158 Some(id) => id,
159 None => continue,
160 };
161
162 let mapping = schema.field(field_id);
163
164 let tokens = match &mapping.field_type {
166 FieldType::Text => {
167 let text = value.as_str().unwrap_or_default();
168 let analyzer_name = mapping.analyzer.as_deref().unwrap_or("standard");
169 let analyzer = analyzers.get(analyzer_name);
170 analyzer.analyze(text)
171 }
172 FieldType::Keyword => {
173 let text = match value {
174 serde_json::Value::String(s) => s.clone(),
175 other => other.to_string(),
176 };
177 vec![Token::new(text, 0, 0, 0)]
178 }
179 FieldType::Ip => {
180 let text = value.as_str().unwrap_or_default();
181 let normalized = crate::ip::normalize_ip(text);
182 if normalized.is_empty() {
183 Vec::new()
184 } else {
185 vec![Token::new(normalized, 0, 0, 0)]
186 }
187 }
188 _ => Vec::new(),
189 };
190
191 if !tokens.is_empty() && mapping.indexed {
192 analyzed_fields.push((field_id, tokens));
193 }
194
195 if matches!(mapping.field_type, FieldType::GeoPoint) {
197 if let Some(point) = GeoPoint::from_json(value) {
198 geo_points.push((field_id, point));
199 }
200 }
201
202 if matches!(mapping.field_type, FieldType::GeoShape) {
204 if let Some(geom) = crate::spatial::shape::parse_geojson(value) {
205 geo_shapes.push((field_id, geom));
206 }
207 }
208
209 if mapping.doc_values {
211 let col_val = match &mapping.field_type {
212 FieldType::Keyword => match value {
213 serde_json::Value::String(s) => ColumnValue::keyword(s.clone())?,
214 serde_json::Value::Null => ColumnValue::Null,
215 other => ColumnValue::keyword(other.to_string())?,
216 },
217 FieldType::Integer | FieldType::Long => match value {
218 serde_json::Value::Number(n) => ColumnValue::I64(n.as_i64().unwrap_or(0)),
219 _ => ColumnValue::Null,
220 },
221 FieldType::Float | FieldType::Double => match value {
222 serde_json::Value::Number(n) => ColumnValue::F64(n.as_f64().unwrap_or(0.0)),
223 _ => ColumnValue::Null,
224 },
225 FieldType::Boolean => match value {
226 serde_json::Value::Bool(b) => ColumnValue::Bool(*b),
227 _ => ColumnValue::Null,
228 },
229 FieldType::TokenCount => {
230 let text = value.as_str().unwrap_or_default();
231 let analyzer_name = mapping.analyzer.as_deref().unwrap_or("standard");
232 let analyzer = analyzers.get(analyzer_name);
233 ColumnValue::I64(analyzer.analyze(text).len() as i64)
234 }
235 FieldType::Ip => {
236 let text = value.as_str().unwrap_or_default();
237 match crate::ip::ip_to_i64(text) {
238 Some(v) => ColumnValue::I64(v),
239 None => ColumnValue::Null,
240 }
241 }
242 _ => ColumnValue::Null,
243 };
244 column_values.push((field_id, col_val));
245 }
246 }
247
248 let has_nested = schema
250 .fields()
251 .iter()
252 .any(|f| matches!(f.field_type, FieldType::Nested));
253
254 builder.add_document(&analyzed_fields, source_bytes);
255
256 if has_nested {
257 builder.mark_parent();
258 }
259
260 for (field_id, col_val) in column_values {
261 builder.add_column_value(field_id, col_val);
262 }
263
264 for (field_id, point) in geo_points {
265 builder.add_geo_point(field_id, point);
266 }
267
268 for (field_id, geom) in &geo_shapes {
269 builder.add_geo_shape(*field_id, geom);
270 }
271
272 for mapping in schema.fields() {
274 if !matches!(mapping.field_type, FieldType::Nested) {
275 continue;
276 }
277 let field_name = &mapping.name;
278 if let Some(serde_json::Value::Array(nested_arr)) = obj.get(field_name) {
279 for nested_obj in nested_arr {
280 if let Some(nested_map) = nested_obj.as_object() {
281 let mut nested_fields: Vec<(FieldId, Vec<Token>)> = Vec::new();
282 for (nested_key, nested_val) in nested_map {
283 let prefixed = format!("{field_name}.{nested_key}");
284 if let Some(fid) = schema.field_id(&prefixed) {
285 let m = schema.field(fid);
286 let tokens = match &m.field_type {
287 FieldType::Text => {
288 let text = nested_val.as_str().unwrap_or_default();
289 let analyzer =
290 analyzers.get(m.analyzer.as_deref().unwrap_or("standard"));
291 analyzer.analyze(text)
292 }
293 FieldType::Keyword => {
294 let text = match nested_val {
295 serde_json::Value::String(s) => s.clone(),
296 other => other.to_string(),
297 };
298 vec![Token::new(text, 0, 0, 0)]
299 }
300 _ => continue,
301 };
302 if !tokens.is_empty() {
303 nested_fields.push((fid, tokens));
304 }
305 }
306 }
307 builder.add_document(&nested_fields, b"{}");
308 builder.mark_nested();
309 }
310 }
311 }
312 }
313 Ok(())
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319 use crate::columnar::writer::ColumnType;
320 use crate::query::term::TermQuery;
321 use crate::segment::builder::SegmentBuilder;
322 use crate::segment::reader::SegmentReader;
323
324 use crate::core::{DocId, FieldId};
325 use crate::mapping::FieldType;
326
327 fn test_schema() -> Mapping {
328 Mapping::builder()
329 .field("body", FieldType::Text)
330 .field("tag", FieldType::Keyword)
331 .build()
332 }
333
334 fn build_segment(id: u64, docs: &[serde_json::Value]) -> Vec<u8> {
335 let schema = test_schema();
336 let analyzers = AnalyzerRegistry::new();
337 let mut builder = SegmentBuilder::new(SegmentId::new(id), &schema);
338 for doc in docs {
339 index_document(
340 doc,
341 &serde_json::to_vec(doc).unwrap(),
342 &schema,
343 &analyzers,
344 &mut builder,
345 )
346 .unwrap();
347 }
348 builder.build()
349 }
350
351 #[test]
352 fn merges_two_segments() {
353 let s1 = build_segment(
354 1,
355 &[
356 serde_json::json!({"body": "hello world", "tag": "a"}),
357 serde_json::json!({"body": "goodbye world", "tag": "a"}),
358 ],
359 );
360 let s2 = build_segment(
361 2,
362 &[
363 serde_json::json!({"body": "hello luci", "tag": "b"}),
364 serde_json::json!({"body": "luci search engine", "tag": "b"}),
365 ],
366 );
367 let r1 = SegmentReader::open(s1).unwrap();
368 let r2 = SegmentReader::open(s2).unwrap();
369 let readers: Vec<&SegmentReader> = vec![&r1, &r2];
370 let schema = test_schema();
371 let analyzers = AnalyzerRegistry::new();
372 let deletions = DeletionMap::new();
373 let new_id = SegmentId::new(3);
374 let out = merge_segments(new_id, &readers, &deletions, &schema, &analyzers).unwrap();
375 let merged = SegmentReader::open(out.bytes).unwrap();
376 assert_eq!(merged.doc_count(), 4);
377
378 use crate::search::searcher::Searcher;
380 let store = crate::search::segment_store::SegmentStore::new(
381 vec![merged],
382 AnalyzerRegistry::new(),
383 None,
384 None,
385 );
386 let searcher = Searcher::new(&store);
387 let res = searcher
388 .search_query(
389 &TermQuery {
390 field: "body".into(),
391 value: "hello".into(),
392 },
393 10,
394 0,
395 )
396 .unwrap();
397 assert_eq!(res.total_hits.value, 2);
398 }
399
400 #[test]
406 fn merge_produces_blocked() {
407 let s1 = build_segment(
408 1,
409 &[
410 serde_json::json!({"body": "one", "tag": "alpha"}),
411 serde_json::json!({"body": "two", "tag": "beta"}),
412 ],
413 );
414 let s2 = build_segment(
415 2,
416 &[
417 serde_json::json!({"body": "three", "tag": "alpha"}),
418 serde_json::json!({"body": "four", "tag": "gamma"}),
419 ],
420 );
421 let r1 = SegmentReader::open(s1).unwrap();
422 let r2 = SegmentReader::open(s2).unwrap();
423 let readers: Vec<&SegmentReader> = vec![&r1, &r2];
424 let schema = test_schema();
425 let analyzers = AnalyzerRegistry::new();
426 let deletions = DeletionMap::new();
427 let out =
428 merge_segments(SegmentId::new(3), &readers, &deletions, &schema, &analyzers).unwrap();
429 let merged = SegmentReader::open(out.bytes).unwrap();
430 assert_eq!(merged.doc_count(), 4);
431
432 let tag_fid = schema.field_id("tag").unwrap();
433 let col = merged.column(tag_fid).expect("merged tag column present");
434 assert_eq!(col.col_type(), ColumnType::KeywordBlocked);
435 assert_eq!(col.dict_size(), 3); let tags: Vec<Option<&str>> = (0..4).map(|d| col.keyword_value(d)).collect();
437 assert!(tags.contains(&Some("alpha")));
438 assert!(tags.contains(&Some("beta")));
439 assert!(tags.contains(&Some("gamma")));
440 assert!(tags.iter().all(|t| t.is_some()));
441 }
442
443 #[test]
444 fn applies_deletions() {
445 let s1 = build_segment(
446 1,
447 &[
448 serde_json::json!({"body": "alpha"}),
449 serde_json::json!({"body": "beta"}),
450 serde_json::json!({"body": "gamma"}),
451 ],
452 );
453 let r1 = SegmentReader::open(s1).unwrap();
454 let readers: Vec<&SegmentReader> = vec![&r1];
455 let schema = test_schema();
456 let analyzers = AnalyzerRegistry::new();
457 let mut deletions = DeletionMap::new();
458 deletions.mark_deleted(SegmentId::new(1), DocId::new(1));
459 let new_id = SegmentId::new(2);
460 let out = merge_segments(new_id, &readers, &deletions, &schema, &analyzers).unwrap();
461 let merged = SegmentReader::open(out.bytes).unwrap();
462 assert_eq!(merged.doc_count(), 2);
463
464 assert_eq!(
466 out.ord_remap.get(&(SegmentId::new(1), 0)),
467 Some(&(new_id, 0))
468 );
469 assert!(!out.ord_remap.contains_key(&(SegmentId::new(1), 1)));
470 assert_eq!(
471 out.ord_remap.get(&(SegmentId::new(1), 2)),
472 Some(&(new_id, 1))
473 );
474
475 let _ = FieldId::new(0);
476 }
477}