Skip to main content

luci/
merge.rs

1//! Segment merge: combine multiple segments into one, applying deletions.
2//!
3//! Re-reads source documents from each segment's doc store and re-indexes
4//! them through the full indexing pipeline (text analysis, columnar
5//! values, geo points, nested documents). This preserves all field types.
6//!
7//! Vectors live in the index-wide global HNSW per
8//! [[global-vector-indices]] Alternative B and are not touched
9//! by merge. Instead, this function returns an `ord_remap` table that
10//! the caller (the writer) uses to rewrite the global HNSW's
11//! `(segment_id, local_doc_id)` resolver entries so they point at the
12//! merged segment's new doc ids.
13//!
14//! Direct posting-list merge (merge-sort FSTs, re-delta-encode postings)
15//! is a future optimization — see [[architecture-segment-merge]].
16
17use std::collections::HashMap;
18
19use crate::analysis::AnalyzerRegistry;
20use crate::core::{DocId, FieldId, LuciError, SegmentId};
21use crate::mapping::Mapping;
22
23use crate::deletion::DeletionMap;
24use crate::segment::reader::SegmentReader;
25
26/// Output of [`merge_segments`].
27///
28/// `bytes` is the serialized merged segment. `ord_remap` maps every
29/// `(source_segment_id, source_local_doc_id)` that survived the merge
30/// to the merged segment's corresponding `(new_segment_id, new_local_doc_id)`.
31/// The writer feeds this to [`crate::vector::global::GlobalHnsw::rewrite_after_merge`]
32/// so the global graph's resolver tracks the renamed docs.
33pub struct MergeOutput {
34    pub bytes: Vec<u8>,
35    pub ord_remap: HashMap<(SegmentId, u32), (SegmentId, u32)>,
36}
37
38/// Merge multiple segments into one, applying deletions.
39///
40/// Re-reads source documents from each segment's doc store and runs them
41/// through the full `IndexWriter::add()` pipeline, preserving all field
42/// types: text, keyword, numeric, boolean, geo_point, nested. Vector
43/// data is not present on the per-segment payload anymore — see
44/// [[global-vector-indices]] — so this function only emits a
45/// remap table for the caller to update the global HNSW.
46///
47/// Documents marked as deleted in `deletions` are excluded.
48pub fn merge_segments(
49    new_segment_id: SegmentId,
50    readers: &[&SegmentReader],
51    deletions: &DeletionMap,
52    schema: &Mapping,
53    analyzers: &AnalyzerRegistry,
54) -> Result<MergeOutput, LuciError> {
55    use crate::segment::builder::SegmentBuilder;
56
57    // Pre-compute the (reader_idx, source_doc) → merged_ord mapping
58    // and the final merged-segment live doc count. Per-reader the
59    // mapping is a `Vec<Option<u32>>`: `None` for deleted docs,
60    // `Some(ord)` otherwise.
61    let mut ord_maps: Vec<Vec<Option<u32>>> = Vec::with_capacity(readers.len());
62    let mut total_live_count: u32 = 0;
63    for reader in readers {
64        let seg_id = reader.segment_id();
65        let dc = reader.doc_count() as usize;
66        let mut map = vec![None; dc];
67        for doc_idx in 0..reader.doc_count() {
68            if deletions.is_deleted(seg_id, DocId::new(doc_idx)) {
69                continue;
70            }
71            map[doc_idx as usize] = Some(total_live_count);
72            total_live_count += 1;
73        }
74        ord_maps.push(map);
75    }
76
77    let mut builder = SegmentBuilder::new(new_segment_id, schema);
78
79    // Re-read source documents from each segment's doc store and run
80    // them through the full indexing pipeline. Vectors are not touched
81    // here — they live in the global HNSW.
82    for reader in readers.iter() {
83        let seg_id = reader.segment_id();
84        let doc_store = reader.doc_store();
85
86        for doc_idx in 0..reader.doc_count() {
87            let doc_id = DocId::new(doc_idx);
88            if deletions.is_deleted(seg_id, doc_id) {
89                continue;
90            }
91            let source_bytes = match doc_store.get(doc_idx) {
92                Some(bytes) => bytes,
93                None => continue,
94            };
95            let doc: serde_json::Value = match serde_json::from_slice(&source_bytes) {
96                Ok(v) => v,
97                Err(_) => continue,
98            };
99
100            index_document(&doc, &source_bytes, schema, analyzers, &mut builder).map_err(|e| {
101                match e {
102                    LuciError::InvalidValue(msg) => LuciError::InvalidValue(format!(
103                        "segment {seg_id:?} document {doc_idx}: {msg}"
104                    )),
105                    other => other,
106                }
107            })?;
108        }
109    }
110
111    // Build the (old_seg, old_doc) → (new_seg, new_doc) remap by
112    // walking each reader's ord_map in order.
113    let mut ord_remap: HashMap<(SegmentId, u32), (SegmentId, u32)> = HashMap::new();
114    for (reader_idx, reader) in readers.iter().enumerate() {
115        let seg_id = reader.segment_id();
116        for (src_doc_idx, opt) in ord_maps[reader_idx].iter().enumerate() {
117            if let Some(merged_ord) = opt {
118                ord_remap.insert((seg_id, src_doc_idx as u32), (new_segment_id, *merged_ord));
119            }
120        }
121    }
122
123    Ok(MergeOutput {
124        bytes: builder.build(),
125        ord_remap,
126    })
127}
128
129/// Index a single document into a SegmentBuilder, handling all field
130/// types except dense_vector (which lives in the global HNSW).
131///
132/// This replicates the field processing logic from `IndexWriter::add()` so
133/// that merge preserves all data (text, columnar, geo, nested).
134fn index_document(
135    doc: &serde_json::Value,
136    source_bytes: &[u8],
137    schema: &Mapping,
138    analyzers: &AnalyzerRegistry,
139    builder: &mut crate::segment::builder::SegmentBuilder,
140) -> Result<(), LuciError> {
141    use crate::analysis::Token;
142    use crate::columnar::writer::ColumnValue;
143    use crate::mapping::FieldType;
144    use crate::spatial::geo::GeoPoint;
145
146    let obj = match doc.as_object() {
147        Some(o) => o,
148        None => return Ok(()),
149    };
150
151    let mut analyzed_fields: Vec<(FieldId, Vec<Token>)> = Vec::new();
152    let mut column_values: Vec<(FieldId, ColumnValue)> = Vec::new();
153    let mut geo_points: Vec<(FieldId, GeoPoint)> = Vec::new();
154    let mut geo_shapes: Vec<(FieldId, ::geo::Geometry<f64>)> = Vec::new();
155
156    for (field_name, value) in obj {
157        let field_id = match schema.field_id(field_name) {
158            Some(id) => id,
159            None => continue,
160        };
161
162        let mapping = schema.field(field_id);
163
164        // Build tokens for inverted index (skipped if index: false)
165        let tokens = match &mapping.field_type {
166            FieldType::Text => {
167                let text = value.as_str().unwrap_or_default();
168                let analyzer_name = mapping.analyzer.as_deref().unwrap_or("standard");
169                let analyzer = analyzers.get(analyzer_name);
170                analyzer.analyze(text)
171            }
172            FieldType::Keyword => {
173                let text = match value {
174                    serde_json::Value::String(s) => s.clone(),
175                    other => other.to_string(),
176                };
177                vec![Token::new(text, 0, 0, 0)]
178            }
179            FieldType::Ip => {
180                let text = value.as_str().unwrap_or_default();
181                let normalized = crate::ip::normalize_ip(text);
182                if normalized.is_empty() {
183                    Vec::new()
184                } else {
185                    vec![Token::new(normalized, 0, 0, 0)]
186                }
187            }
188            _ => Vec::new(),
189        };
190
191        if !tokens.is_empty() && mapping.indexed {
192            analyzed_fields.push((field_id, tokens));
193        }
194
195        // Geo points
196        if matches!(mapping.field_type, FieldType::GeoPoint) {
197            if let Some(point) = GeoPoint::from_json(value) {
198                geo_points.push((field_id, point));
199            }
200        }
201
202        // Geo shapes
203        if matches!(mapping.field_type, FieldType::GeoShape) {
204            if let Some(geom) = crate::spatial::shape::parse_geojson(value) {
205                geo_shapes.push((field_id, geom));
206            }
207        }
208
209        // Columnar values
210        if mapping.doc_values {
211            let col_val = match &mapping.field_type {
212                FieldType::Keyword => match value {
213                    serde_json::Value::String(s) => ColumnValue::keyword(s.clone())?,
214                    serde_json::Value::Null => ColumnValue::Null,
215                    other => ColumnValue::keyword(other.to_string())?,
216                },
217                FieldType::Integer | FieldType::Long => match value {
218                    serde_json::Value::Number(n) => ColumnValue::I64(n.as_i64().unwrap_or(0)),
219                    _ => ColumnValue::Null,
220                },
221                FieldType::Float | FieldType::Double => match value {
222                    serde_json::Value::Number(n) => ColumnValue::F64(n.as_f64().unwrap_or(0.0)),
223                    _ => ColumnValue::Null,
224                },
225                FieldType::Boolean => match value {
226                    serde_json::Value::Bool(b) => ColumnValue::Bool(*b),
227                    _ => ColumnValue::Null,
228                },
229                FieldType::TokenCount => {
230                    let text = value.as_str().unwrap_or_default();
231                    let analyzer_name = mapping.analyzer.as_deref().unwrap_or("standard");
232                    let analyzer = analyzers.get(analyzer_name);
233                    ColumnValue::I64(analyzer.analyze(text).len() as i64)
234                }
235                FieldType::Ip => {
236                    let text = value.as_str().unwrap_or_default();
237                    match crate::ip::ip_to_i64(text) {
238                        Some(v) => ColumnValue::I64(v),
239                        None => ColumnValue::Null,
240                    }
241                }
242                _ => ColumnValue::Null,
243            };
244            column_values.push((field_id, col_val));
245        }
246    }
247
248    // Add the document
249    let has_nested = schema
250        .fields()
251        .iter()
252        .any(|f| matches!(f.field_type, FieldType::Nested));
253
254    builder.add_document(&analyzed_fields, source_bytes);
255
256    if has_nested {
257        builder.mark_parent();
258    }
259
260    for (field_id, col_val) in column_values {
261        builder.add_column_value(field_id, col_val);
262    }
263
264    for (field_id, point) in geo_points {
265        builder.add_geo_point(field_id, point);
266    }
267
268    for (field_id, geom) in &geo_shapes {
269        builder.add_geo_shape(*field_id, geom);
270    }
271
272    // Nested documents
273    for mapping in schema.fields() {
274        if !matches!(mapping.field_type, FieldType::Nested) {
275            continue;
276        }
277        let field_name = &mapping.name;
278        if let Some(serde_json::Value::Array(nested_arr)) = obj.get(field_name) {
279            for nested_obj in nested_arr {
280                if let Some(nested_map) = nested_obj.as_object() {
281                    let mut nested_fields: Vec<(FieldId, Vec<Token>)> = Vec::new();
282                    for (nested_key, nested_val) in nested_map {
283                        let prefixed = format!("{field_name}.{nested_key}");
284                        if let Some(fid) = schema.field_id(&prefixed) {
285                            let m = schema.field(fid);
286                            let tokens = match &m.field_type {
287                                FieldType::Text => {
288                                    let text = nested_val.as_str().unwrap_or_default();
289                                    let analyzer =
290                                        analyzers.get(m.analyzer.as_deref().unwrap_or("standard"));
291                                    analyzer.analyze(text)
292                                }
293                                FieldType::Keyword => {
294                                    let text = match nested_val {
295                                        serde_json::Value::String(s) => s.clone(),
296                                        other => other.to_string(),
297                                    };
298                                    vec![Token::new(text, 0, 0, 0)]
299                                }
300                                _ => continue,
301                            };
302                            if !tokens.is_empty() {
303                                nested_fields.push((fid, tokens));
304                            }
305                        }
306                    }
307                    builder.add_document(&nested_fields, b"{}");
308                    builder.mark_nested();
309                }
310            }
311        }
312    }
313    Ok(())
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use crate::columnar::writer::ColumnType;
320    use crate::query::term::TermQuery;
321    use crate::segment::builder::SegmentBuilder;
322    use crate::segment::reader::SegmentReader;
323
324    use crate::core::{DocId, FieldId};
325    use crate::mapping::FieldType;
326
327    fn test_schema() -> Mapping {
328        Mapping::builder()
329            .field("body", FieldType::Text)
330            .field("tag", FieldType::Keyword)
331            .build()
332    }
333
334    fn build_segment(id: u64, docs: &[serde_json::Value]) -> Vec<u8> {
335        let schema = test_schema();
336        let analyzers = AnalyzerRegistry::new();
337        let mut builder = SegmentBuilder::new(SegmentId::new(id), &schema);
338        for doc in docs {
339            index_document(
340                doc,
341                &serde_json::to_vec(doc).unwrap(),
342                &schema,
343                &analyzers,
344                &mut builder,
345            )
346            .unwrap();
347        }
348        builder.build()
349    }
350
351    #[test]
352    fn merges_two_segments() {
353        let s1 = build_segment(
354            1,
355            &[
356                serde_json::json!({"body": "hello world", "tag": "a"}),
357                serde_json::json!({"body": "goodbye world", "tag": "a"}),
358            ],
359        );
360        let s2 = build_segment(
361            2,
362            &[
363                serde_json::json!({"body": "hello luci", "tag": "b"}),
364                serde_json::json!({"body": "luci search engine", "tag": "b"}),
365            ],
366        );
367        let r1 = SegmentReader::open(s1).unwrap();
368        let r2 = SegmentReader::open(s2).unwrap();
369        let readers: Vec<&SegmentReader> = vec![&r1, &r2];
370        let schema = test_schema();
371        let analyzers = AnalyzerRegistry::new();
372        let deletions = DeletionMap::new();
373        let new_id = SegmentId::new(3);
374        let out = merge_segments(new_id, &readers, &deletions, &schema, &analyzers).unwrap();
375        let merged = SegmentReader::open(out.bytes).unwrap();
376        assert_eq!(merged.doc_count(), 4);
377
378        // Search the merged segment
379        use crate::search::searcher::Searcher;
380        let store = crate::search::segment_store::SegmentStore::new(
381            vec![merged],
382            AnalyzerRegistry::new(),
383            None,
384            None,
385        );
386        let searcher = Searcher::new(&store);
387        let res = searcher
388            .search_query(
389                &TermQuery {
390                    field: "body".into(),
391                    value: "hello".into(),
392                },
393                10,
394                0,
395            )
396            .unwrap();
397        assert_eq!(res.total_hits.value, 2);
398    }
399
400    /// Test 7: a merge re-indexes keyword values through the same
401    /// `ColumnarWriter`, so the merged segment's keyword columns are written in
402    /// the new blocked format and every value round-trips. Merge never decodes
403    /// source columns by `col_type` (values come from the doc store), so a
404    /// legacy/blocked source mix is irrelevant.
405    #[test]
406    fn merge_produces_blocked() {
407        let s1 = build_segment(
408            1,
409            &[
410                serde_json::json!({"body": "one", "tag": "alpha"}),
411                serde_json::json!({"body": "two", "tag": "beta"}),
412            ],
413        );
414        let s2 = build_segment(
415            2,
416            &[
417                serde_json::json!({"body": "three", "tag": "alpha"}),
418                serde_json::json!({"body": "four", "tag": "gamma"}),
419            ],
420        );
421        let r1 = SegmentReader::open(s1).unwrap();
422        let r2 = SegmentReader::open(s2).unwrap();
423        let readers: Vec<&SegmentReader> = vec![&r1, &r2];
424        let schema = test_schema();
425        let analyzers = AnalyzerRegistry::new();
426        let deletions = DeletionMap::new();
427        let out =
428            merge_segments(SegmentId::new(3), &readers, &deletions, &schema, &analyzers).unwrap();
429        let merged = SegmentReader::open(out.bytes).unwrap();
430        assert_eq!(merged.doc_count(), 4);
431
432        let tag_fid = schema.field_id("tag").unwrap();
433        let col = merged.column(tag_fid).expect("merged tag column present");
434        assert_eq!(col.col_type(), ColumnType::KeywordBlocked);
435        assert_eq!(col.dict_size(), 3); // alpha, beta, gamma
436        let tags: Vec<Option<&str>> = (0..4).map(|d| col.keyword_value(d)).collect();
437        assert!(tags.contains(&Some("alpha")));
438        assert!(tags.contains(&Some("beta")));
439        assert!(tags.contains(&Some("gamma")));
440        assert!(tags.iter().all(|t| t.is_some()));
441    }
442
443    #[test]
444    fn applies_deletions() {
445        let s1 = build_segment(
446            1,
447            &[
448                serde_json::json!({"body": "alpha"}),
449                serde_json::json!({"body": "beta"}),
450                serde_json::json!({"body": "gamma"}),
451            ],
452        );
453        let r1 = SegmentReader::open(s1).unwrap();
454        let readers: Vec<&SegmentReader> = vec![&r1];
455        let schema = test_schema();
456        let analyzers = AnalyzerRegistry::new();
457        let mut deletions = DeletionMap::new();
458        deletions.mark_deleted(SegmentId::new(1), DocId::new(1));
459        let new_id = SegmentId::new(2);
460        let out = merge_segments(new_id, &readers, &deletions, &schema, &analyzers).unwrap();
461        let merged = SegmentReader::open(out.bytes).unwrap();
462        assert_eq!(merged.doc_count(), 2);
463
464        // The remap should include alive docs (0, 2) and skip the deleted one (1).
465        assert_eq!(
466            out.ord_remap.get(&(SegmentId::new(1), 0)),
467            Some(&(new_id, 0))
468        );
469        assert!(!out.ord_remap.contains_key(&(SegmentId::new(1), 1)));
470        assert_eq!(
471            out.ord_remap.get(&(SegmentId::new(1), 2)),
472            Some(&(new_id, 1))
473        );
474
475        let _ = FieldId::new(0);
476    }
477}