Skip to main content

luci/
writer.rs

1//! IndexWriter: buffer documents, analyze text, flush segments, commit.
2//!
3//! The public write API for Luci. Documents are validated against the
4//! schema, text fields are analyzed, and results are accumulated in an
5//! in-memory segment buffer. On commit (or memory budget exceeded), the
6//! buffer is flushed to a segment and written to storage.
7//!
8//! See [[architecture-indexing-pipeline]] and [[architecture-overview#Step 9]].
9
10use crate::analysis::{AnalyzerRegistry, Token};
11use crate::core::{FieldId, LuciError, Result, SegmentId};
12use crate::mapping::{DynamicMode, FieldType, Mapping};
13use crate::storage::Storage;
14
15use crate::columnar::writer::ColumnValue;
16use crate::segment::builder::SegmentBuilder;
17use crate::spatial::geo::GeoPoint;
18use crate::vector::global::GlobalHnsw;
19use crate::vector::hnsw::BuildThreads;
20
21/// Default memory budget before auto-flush (64 MB).
22const DEFAULT_MEMORY_BUDGET: usize = 64 * 1024 * 1024;
23
24/// Writes documents to a Luci index.
25///
26/// Usage:
27/// 1. Create with `IndexWriter::new(storage, schema, analyzers)`
28/// 2. Call `put(doc)` for each document
29/// 3. Call `commit()` to make documents searchable
30pub struct IndexWriter {
31    storage: Box<dyn Storage>,
32    schema: Mapping,
33    analyzers: AnalyzerRegistry,
34    builder: SegmentBuilder,
35    next_segment_id: u64,
36    memory_budget: usize,
37    /// Approximate bytes consumed by the current buffer.
38    buffer_size: usize,
39    /// Merge policy configuration.
40    merge_policy: crate::merge_policy::MergePolicy,
41    /// Pending deletions (applied during search, cleaned up on merge).
42    pending_deletions: crate::deletion::DeletionMap,
43    /// Analysis settings JSON to persist alongside the mapping.
44    /// See [[feature-analysis-pipeline]].
45    analysis_json: Option<serde_json::Value>,
46    /// Single global HNSW per dense_vector field, decoupled from the
47    /// segment model per [[global-vector-indices]]
48    /// Alternative B. Vectors flow only here — segments no longer
49    /// carry their own per-field HNSWs. Persisted via the storage
50    /// layer's per-field `write_vector_index` API on commit.
51    global_hnsw: GlobalHnsw,
52    /// Thread budget for the parallel HNSW connect phase run at commit.
53    /// `Ambient` (the production default) uses rayon's global pool;
54    /// deterministic test/profile harnesses force `Fixed(1)` for a
55    /// bit-identical graph. See [[optimization-concurrent-hnsw-insert]].
56    build_threads: BuildThreads,
57}
58
59impl IndexWriter {
60    /// Create a new IndexWriter.
61    pub fn new(
62        storage: impl Storage + 'static,
63        schema: Mapping,
64        analyzers: AnalyzerRegistry,
65    ) -> Self {
66        let next_id = storage.generation() + 1;
67        let builder = SegmentBuilder::new(SegmentId::new(next_id), &schema);
68        let global_hnsw = GlobalHnsw::new(&schema);
69        Self {
70            storage: Box::new(storage),
71            schema,
72            analyzers,
73            builder,
74            next_segment_id: next_id,
75            memory_budget: DEFAULT_MEMORY_BUDGET,
76            buffer_size: 0,
77            merge_policy: crate::merge_policy::MergePolicy::default(),
78            pending_deletions: crate::deletion::DeletionMap::new(),
79            analysis_json: None,
80            global_hnsw,
81            build_threads: BuildThreads::Ambient,
82        }
83    }
84
85    /// Override the thread budget for the commit-time HNSW connect phase.
86    /// Production leaves the `Ambient` default (rayon global pool);
87    /// deterministic Rust tests/profile harnesses set `Fixed(1)` to get a
88    /// bit-identical graph independent of the ambient pool size. Not
89    /// exposed to the Python API. See [[optimization-concurrent-hnsw-insert]].
90    pub fn set_build_threads(&mut self, threads: BuildThreads) {
91        self.build_threads = threads;
92    }
93
94    /// Set the analysis settings JSON to persist alongside the mapping.
95    pub fn set_analysis_json(&mut self, json: Option<serde_json::Value>) {
96        self.analysis_json = json;
97    }
98
99    /// Load persisted deletions (called on Index::open).
100    pub fn load_deletions(&mut self, deletions: crate::deletion::DeletionMap) {
101        self.pending_deletions = deletions;
102    }
103
104    /// Replace the in-memory global HNSW with one deserialized from
105    /// persisted bytes (called on `Index::open`).
106    pub fn load_global_hnsw(&mut self, global_hnsw: GlobalHnsw) {
107        self.global_hnsw = global_hnsw;
108    }
109
110    /// Access the global HNSW for serialization on commit and for
111    /// taking a search-side snapshot via `to_bytes`.
112    pub fn global_hnsw(&self) -> &GlobalHnsw {
113        &self.global_hnsw
114    }
115
116    /// Mark a document as deleted. Takes effect on next search (no commit needed).
117    pub fn mark_deleted(&mut self, segment_id: SegmentId, doc_id: crate::core::DocId) {
118        self.pending_deletions.mark_deleted(segment_id, doc_id);
119    }
120
121    /// Get the current deletion map (for search-time filtering).
122    pub fn deletions(&self) -> &crate::deletion::DeletionMap {
123        &self.pending_deletions
124    }
125
126    /// Set the memory budget for auto-flush.
127    pub fn set_memory_budget(&mut self, budget: usize) {
128        self.memory_budget = budget;
129    }
130
131    /// Set the timeout for acquiring the cross-process write lock.
132    ///
133    /// Default: 5 seconds. If another process holds the write lock,
134    /// retries until the timeout, then returns `WriterLocked`.
135    pub fn set_write_timeout(&mut self, timeout: std::time::Duration) {
136        self.storage.set_write_timeout(timeout);
137    }
138
139    /// Add a JSON document to the index.
140    ///
141    /// The document is validated against the schema, text fields are analyzed,
142    /// and the result is buffered. Call `commit()` to make it searchable.
143    pub fn add(&mut self, doc: serde_json::Value) -> Result<()> {
144        let obj = doc
145            .as_object()
146            .ok_or_else(|| LuciError::InvalidQuery("document must be a JSON object".into()))?;
147
148        let source = serde_json::to_vec(&doc)
149            .map_err(|e| LuciError::InvalidQuery(format!("JSON serialization failed: {e}")))?;
150
151        let mut analyzed_fields: Vec<(FieldId, Vec<Token>)> = Vec::new();
152        let mut column_values: Vec<(FieldId, ColumnValue)> = Vec::new();
153
154        // Inject _id: use user-provided or auto-generate via UUID v4.
155        // Random v4 (not v7) — our inverted index is FST-based, not a B-tree,
156        // so we don't need time-sortable IDs for insert locality. v4 avoids
157        // leaking the index time via the ID.
158        let doc_id_str = match obj.get("_id").and_then(|v| v.as_str()) {
159            Some(id) => id.to_string(),
160            None => uuid::Uuid::new_v4().to_string(),
161        };
162        if let Some(id_field_id) = self.schema.field_id("_id") {
163            analyzed_fields.push((id_field_id, vec![Token::new(doc_id_str.clone(), 0, 0, 0)]));
164            column_values.push((id_field_id, ColumnValue::keyword(doc_id_str)?));
165        }
166        let mut vector_fields: Vec<(FieldId, Vec<f32>)> = Vec::new();
167        let mut geo_points: Vec<(FieldId, GeoPoint)> = Vec::new();
168        let mut geo_shapes: Vec<(FieldId, ::geo::Geometry<f64>)> = Vec::new();
169        let mut copy_to_pairs: Vec<(String, String)> = Vec::new(); // (target_field, source_text)
170
171        for (field_name, value) in obj {
172            // _id is handled above (explicit injection + columnar push).
173            // Without this skip, the field loop would push _id to the
174            // columnar store a second time, doubling the per-doc ordinal
175            // stride and breaking `SearchHit.id` column reads.
176            if field_name == "_id" {
177                continue;
178            }
179            // Look up or dynamically map the field
180            let field_id = match self.schema.field_id(field_name) {
181                Some(id) => id,
182                None => match self.schema.dynamic_mode() {
183                    DynamicMode::False => continue, // store in _source but don't index
184                    DynamicMode::True => {
185                        // TODO: infer field type and add to mapping dynamically.
186                        // For now, skip unknown fields (stored in _source only).
187                        continue;
188                    }
189                },
190            };
191
192            let mapping = self.schema.field(field_id);
193
194            // Build tokens for the inverted index (skipped if index: false)
195            let tokens = match &mapping.field_type {
196                FieldType::Text => {
197                    let text = value.as_str().unwrap_or_default();
198                    let analyzer_name = mapping.analyzer.as_deref().unwrap_or("standard");
199                    let analyzer = self.analyzers.get(analyzer_name);
200                    analyzer.analyze(text)
201                }
202                FieldType::Keyword => {
203                    let text = match value {
204                        serde_json::Value::String(s) => s.clone(),
205                        other => other.to_string(),
206                    };
207                    vec![Token::new(text.clone(), 0, 0, 0)]
208                }
209                FieldType::Ip => {
210                    // Index normalized IP string as keyword token
211                    let text = value.as_str().unwrap_or_default();
212                    let normalized = crate::ip::normalize_ip(text);
213                    if normalized.is_empty() {
214                        Vec::new()
215                    } else {
216                        vec![Token::new(normalized, 0, 0, 0)]
217                    }
218                }
219                _ => {
220                    // Numeric/boolean/date fields: only columnar, no inverted index
221                    Vec::new()
222                }
223            };
224
225            if !tokens.is_empty() && mapping.indexed {
226                analyzed_fields.push((field_id, tokens));
227            }
228
229            // Store geo shape for geo_shape fields
230            if matches!(mapping.field_type, FieldType::GeoShape) {
231                if let Some(geom) = crate::spatial::shape::parse_geojson(value) {
232                    geo_shapes.push((field_id, geom));
233                }
234            }
235
236            // Store geo point for geo_point fields
237            if matches!(mapping.field_type, FieldType::GeoPoint) {
238                if let Some(point) = GeoPoint::from_json(value) {
239                    geo_points.push((field_id, point));
240                }
241            }
242
243            // Store vector for dense_vector fields
244            if mapping.field_type.is_dense_vector() {
245                if let serde_json::Value::Array(arr) = value {
246                    let vec: Vec<f32> = arr
247                        .iter()
248                        .filter_map(|v| v.as_f64().map(|f| f as f32))
249                        .collect();
250                    if !vec.is_empty() {
251                        vector_fields.push((field_id, vec));
252                    }
253                }
254            }
255
256            // Store column value for doc_values fields
257            if mapping.doc_values {
258                let col_val = match &mapping.field_type {
259                    FieldType::Keyword => match value {
260                        serde_json::Value::String(s) => ColumnValue::keyword(s.clone())?,
261                        serde_json::Value::Null => ColumnValue::Null,
262                        other => ColumnValue::keyword(other.to_string())?,
263                    },
264                    FieldType::Integer | FieldType::Long => match value {
265                        serde_json::Value::Number(n) => ColumnValue::I64(n.as_i64().unwrap_or(0)),
266                        _ => ColumnValue::Null,
267                    },
268                    FieldType::Float | FieldType::Double => match value {
269                        serde_json::Value::Number(n) => ColumnValue::F64(n.as_f64().unwrap_or(0.0)),
270                        _ => ColumnValue::Null,
271                    },
272                    FieldType::Boolean => match value {
273                        serde_json::Value::Bool(b) => ColumnValue::Bool(*b),
274                        _ => ColumnValue::Null,
275                    },
276                    FieldType::TokenCount => {
277                        let text = value.as_str().unwrap_or_default();
278                        let analyzer_name = mapping.analyzer.as_deref().unwrap_or("standard");
279                        let analyzer = self.analyzers.get(analyzer_name);
280                        ColumnValue::I64(analyzer.analyze(text).len() as i64)
281                    }
282                    FieldType::Ip => {
283                        let text = value.as_str().unwrap_or_default();
284                        match crate::ip::ip_to_i64(text) {
285                            Some(v) => ColumnValue::I64(v),
286                            None => ColumnValue::Null,
287                        }
288                    }
289                    _ => ColumnValue::Null, // Text fields don't get doc_values
290                };
291                column_values.push((field_id, col_val));
292            }
293
294            // Collect copy_to pairs for post-loop processing
295            if !mapping.copy_to.is_empty() {
296                let source_text = match value {
297                    serde_json::Value::String(s) => s.clone(),
298                    other => other.to_string(),
299                };
300                for target in &mapping.copy_to {
301                    copy_to_pairs.push((target.clone(), source_text.clone()));
302                }
303            }
304        }
305
306        // Process copy_to: analyze source text with target field's analyzer.
307        // See [[feature-mapping-copy-to]]. Targets are validated at index
308        // creation time via `Mapping::validate`, so an unknown target here
309        // is an upstream invariant violation, not user input — panic loudly
310        // rather than silently dropping the copy. See [[code-must-not-lie]].
311        for (target_name, source_text) in &copy_to_pairs {
312            let target_id = self.schema.field_id(target_name).unwrap_or_else(|| {
313                panic!(
314                    "copy_to target \"{target_name}\" missing from schema; \
315                     Mapping::validate should have rejected this at index \
316                     creation. This is an internal wiring bug, not user input."
317                );
318            });
319            let target_mapping = self.schema.field(target_id);
320            if !target_mapping.indexed {
321                continue;
322            }
323
324            let tokens = match &target_mapping.field_type {
325                FieldType::Text => {
326                    let analyzer_name = target_mapping.analyzer.as_deref().unwrap_or("standard");
327                    let analyzer = self.analyzers.get(analyzer_name);
328                    analyzer.analyze(source_text)
329                }
330                FieldType::Keyword => {
331                    vec![Token::new(source_text.clone(), 0, 0, 0)]
332                }
333                _ => continue,
334            };
335            if !tokens.is_empty() {
336                analyzed_fields.push((target_id, tokens));
337            }
338        }
339
340        // Process multi-field sub-fields: route parent's source value
341        // through the sub-field's analysis chain.
342        // See [[feature-mapping-multi-fields]].
343        for mapping in self.schema.fields() {
344            if let Some(ref parent_name) = mapping.parent_field {
345                // Get the parent's value from the document
346                let parent_value = match obj.get(parent_name) {
347                    Some(v) => v,
348                    None => continue,
349                };
350                let field_id = match self.schema.field_id(&mapping.name) {
351                    Some(id) => id,
352                    None => continue,
353                };
354
355                if mapping.indexed {
356                    let tokens = match &mapping.field_type {
357                        FieldType::Text => {
358                            let text = parent_value.as_str().unwrap_or_default();
359                            let analyzer_name = mapping.analyzer.as_deref().unwrap_or("standard");
360                            let analyzer = self.analyzers.get(analyzer_name);
361                            analyzer.analyze(text)
362                        }
363                        FieldType::Keyword => {
364                            let text = match parent_value {
365                                serde_json::Value::String(s) => s.clone(),
366                                other => other.to_string(),
367                            };
368                            vec![Token::new(text, 0, 0, 0)]
369                        }
370                        _ => continue,
371                    };
372                    analyzed_fields.push((field_id, tokens));
373                }
374
375                if mapping.doc_values {
376                    let col_val = match &mapping.field_type {
377                        FieldType::Keyword => {
378                            let text = match parent_value {
379                                serde_json::Value::String(s) => s.clone(),
380                                other => other.to_string(),
381                            };
382                            ColumnValue::keyword(text)?
383                        }
384                        _ => continue,
385                    };
386                    column_values.push((field_id, col_val));
387                }
388            }
389        }
390
391        self.buffer_size += source.len();
392
393        // Mark this as a parent doc if we have nested fields
394        let has_nested = self
395            .schema
396            .fields()
397            .iter()
398            .any(|f| matches!(f.field_type, FieldType::Nested));
399
400        // Snapshot the segment+doc id this document will land at
401        // *before* `add_document` increments `doc_count`. The global
402        // HNSW resolver uses this pair to route kNN hits back to the
403        // owning segment.
404        let segment_id = self.builder.segment_id();
405        let local_doc_id = self.builder.doc_count();
406
407        self.builder.add_document(&analyzed_fields, &source);
408
409        if has_nested {
410            self.builder.mark_parent();
411        }
412
413        // Add column values for doc_values fields
414        for (field_id, col_val) in column_values {
415            self.builder.add_column_value(field_id, col_val);
416        }
417
418        // Vectors flow only to the index-wide global HNSW per
419        // [[global-vector-indices]] Alternative B. `store_vector` defers
420        // graph linkage to the commit-time `connect_pending` (parallel),
421        // but still normalizes and rejects zero/non-finite cosine inputs
422        // up front — propagate so the bulk caller aborts the batch instead
423        // of silently embedding a degenerate vector. See
424        // [[optimize-cosine-norm-precompute]] §"Zero-vector policy" and
425        // [[optimization-concurrent-hnsw-insert]] §Write model.
426        for (field_id, vec) in vector_fields {
427            self.global_hnsw
428                .store_vector(field_id, segment_id, local_doc_id, vec)?;
429        }
430
431        // Add geo points
432        for (field_id, point) in geo_points {
433            self.builder.add_geo_point(field_id, point);
434        }
435
436        // Add geo shapes
437        for (field_id, geom) in &geo_shapes {
438            self.builder.add_geo_shape(*field_id, geom);
439        }
440
441        // Index nested objects as hidden documents
442        for mapping in self.schema.fields() {
443            if !matches!(mapping.field_type, FieldType::Nested) {
444                continue;
445            }
446            let field_name = &mapping.name;
447            if let Some(serde_json::Value::Array(nested_arr)) = obj.get(field_name) {
448                for nested_obj in nested_arr {
449                    if let Some(nested_map) = nested_obj.as_object() {
450                        // Index each nested object as a hidden document
451                        let mut nested_fields: Vec<(FieldId, Vec<Token>)> = Vec::new();
452                        for (nested_key, nested_val) in nested_map {
453                            // Path-prefixed field name: "offers.seller"
454                            let prefixed = format!("{field_name}.{nested_key}");
455                            if let Some(fid) = self.schema.field_id(&prefixed) {
456                                let m = self.schema.field(fid);
457                                let tokens = match &m.field_type {
458                                    FieldType::Text => {
459                                        let text = nested_val.as_str().unwrap_or_default();
460                                        let analyzer = self
461                                            .analyzers
462                                            .get(m.analyzer.as_deref().unwrap_or("standard"));
463                                        analyzer.analyze(text)
464                                    }
465                                    FieldType::Keyword => {
466                                        let text = match nested_val {
467                                            serde_json::Value::String(s) => s.clone(),
468                                            other => other.to_string(),
469                                        };
470                                        vec![Token::new(text, 0, 0, 0)]
471                                    }
472                                    _ => continue,
473                                };
474                                if !tokens.is_empty() {
475                                    nested_fields.push((fid, tokens));
476                                }
477                            }
478                        }
479                        // Add as hidden doc (empty source)
480                        self.builder.add_document(&nested_fields, b"{}");
481                        self.builder.mark_nested();
482                    }
483                }
484            }
485        }
486
487        // Auto-flush if memory budget exceeded
488        if self.buffer_size >= self.memory_budget {
489            self.flush()?;
490        }
491
492        Ok(())
493    }
494
495    /// Flush the current buffer to a segment (without committing).
496    fn flush(&mut self) -> Result<()> {
497        if self.builder.is_empty() {
498            return Ok(());
499        }
500
501        let segment_id = SegmentId::new(self.next_segment_id);
502        self.next_segment_id += 1;
503
504        // Take the current builder and replace with a new one
505        let builder = std::mem::replace(
506            &mut self.builder,
507            SegmentBuilder::new(SegmentId::new(self.next_segment_id), &self.schema),
508        );
509
510        let segment_data = builder.build();
511        self.storage.write_segment(segment_id, &segment_data)?;
512        self.buffer_size = 0;
513
514        Ok(())
515    }
516
517    /// Flush the buffer and commit all pending segments to storage.
518    ///
519    /// Persists the current field mappings alongside segment metadata.
520    /// After commit returns, all documents added via `add()` are searchable.
521    /// If the merge policy triggers, a synchronous merge is executed before
522    /// returning.
523    pub fn commit(&mut self) -> Result<()> {
524        self.flush()?;
525        // Persist mapping (and analysis settings) as user metadata
526        let mut mapping_json = self.schema.to_json();
527        if let Some(ref analysis) = self.analysis_json {
528            if let Some(obj) = mapping_json.as_object_mut() {
529                let mut settings = serde_json::Map::new();
530                settings.insert("analysis".to_string(), analysis.clone());
531                obj.insert("settings".to_string(), serde_json::Value::Object(settings));
532            }
533        }
534        let mapping_bytes = serde_json::to_vec(&mapping_json).map_err(|e| {
535            LuciError::Io(std::io::Error::new(
536                std::io::ErrorKind::Other,
537                format!("failed to serialize mapping: {e}"),
538            ))
539        })?;
540
541        // user_metadata stays small: mapping JSON + deletion bitmap,
542        // length-prefixed. The global HNSW does NOT live here — it's
543        // stored as a per-field vector index via the storage layer's
544        // `write_vector_index` API. See [[global-vector-indices]].
545        let deletion_bytes = self.pending_deletions.to_bytes();
546        let mut metadata = Vec::with_capacity(4 + mapping_bytes.len() + 4 + deletion_bytes.len());
547        metadata.extend_from_slice(&(mapping_bytes.len() as u32).to_le_bytes());
548        metadata.extend_from_slice(&mapping_bytes);
549        metadata.extend_from_slice(&(deletion_bytes.len() as u32).to_le_bytes());
550        metadata.extend_from_slice(&deletion_bytes);
551        self.storage.set_user_metadata(metadata);
552
553        // Link every pending vector tail BEFORE persisting. This is the
554        // load-bearing ordering invariant: `connect_pending` runs here,
555        // ahead of both `field_to_bytes` below and the `maybe_merge`
556        // re-persist further down, so no write-side persist can observe an
557        // unlinked tail. A disconnected graph (every neighbor list empty)
558        // would silently drop every kNN hit past the entry point. See
559        // [[optimization-concurrent-hnsw-insert]] §Write model.
560        for field_id in self.global_hnsw.non_empty_field_ids() {
561            self.global_hnsw
562                .connect_pending(field_id, self.build_threads);
563        }
564
565        // Persist each non-empty vector index as its own extent.
566        // `write_vector_index` replaces the prior committed bytes for
567        // the same FieldId, freeing the old extent during commit.
568        for field_id in self.global_hnsw.non_empty_field_ids() {
569            if let Some(bytes) = self.global_hnsw.field_to_bytes(field_id) {
570                self.storage.write_vector_index(field_id, &bytes)?;
571            }
572        }
573
574        self.storage.commit()?;
575        self.maybe_merge()?;
576
577        Ok(())
578    }
579
580    /// Check the merge policy and execute a merge if needed.
581    fn maybe_merge(&mut self) -> Result<()> {
582        use crate::merge_policy::{SegmentInfo, find_merge};
583
584        let infos: Vec<SegmentInfo> = self
585            .storage
586            .segments()
587            .iter()
588            .map(|e| {
589                SegmentInfo {
590                    segment_id: e.segment_id,
591                    size_bytes: e.data_len,
592                    doc_count: 0, // TODO: store doc_count in SegmentEntry
593                    deletion_count: 0,
594                }
595            })
596            .collect();
597
598        let candidate = match find_merge(&self.merge_policy, &infos) {
599            Some(c) => c,
600            None => return Ok(()),
601        };
602
603        self.execute_merge(&candidate.segment_ids)
604    }
605
606    /// Execute a merge: read source segments, merge, write result, replace.
607    fn execute_merge(&mut self, source_ids: &[SegmentId]) -> Result<()> {
608        use crate::deletion::DeletionMap;
609        use crate::segment::reader::SegmentReader;
610
611        // Open source segment readers
612        let mut readers = Vec::new();
613        let mut segment_data = Vec::new();
614        for &seg_id in source_ids {
615            let data = self.storage.read_segment(seg_id)?;
616            segment_data.push(data);
617        }
618        for data in &segment_data {
619            readers.push(SegmentReader::open(data.clone())?);
620        }
621        let reader_refs: Vec<&SegmentReader> = readers.iter().collect();
622
623        // Merge into a new segment
624        let new_id = SegmentId::new(self.next_segment_id);
625        self.next_segment_id += 1;
626
627        let deletions = DeletionMap::new();
628        let merge_output = crate::merge::merge_segments(
629            new_id,
630            &reader_refs,
631            &deletions,
632            &self.schema,
633            &self.analyzers,
634        )?;
635
636        // Write the merged segment
637        self.storage.write_segment(new_id, &merge_output.bytes)?;
638
639        // Remove source segments
640        self.storage.remove_segments(source_ids);
641
642        // Rewrite the global HNSW resolver so existing vector ordinals
643        // point at the merged segment with the new local doc ids.
644        // Without this step the resolver would carry dangling entries
645        // for the merged-away segment ids.
646        self.global_hnsw
647            .rewrite_after_merge(&merge_output.ord_remap);
648
649        // Re-persist every vector index with the rewritten resolver.
650        // `commit()` persisted the vector index *before* calling
651        // `maybe_merge`, so the on-disk copy still routes the merged
652        // docs to the now-removed source segments. The reader loads the
653        // persisted copy (not the writer's in-memory resolver), so
654        // without this re-persist it carries dangling entries and
655        // silently drops every kNN hit that resolves to a merged-away
656        // segment — a [[code-must-not-lie]] silent-drop that cost ~0.09
657        // recall at 600k once the segment count crossed the merge
658        // threshold. See [[vector-recall-investigation-audit]] H6.
659        for field_id in self.global_hnsw.non_empty_field_ids() {
660            if let Some(bytes) = self.global_hnsw.field_to_bytes(field_id) {
661                self.storage.write_vector_index(field_id, &bytes)?;
662            }
663        }
664
665        // Commit the replacement
666        self.storage.commit()?;
667
668        Ok(())
669    }
670
671    /// Force-merge all segments down to at most `max_segments`.
672    ///
673    /// Repeatedly merges until the segment count is at or below the target.
674    /// This is expensive and should only be called after bulk indexing is
675    /// complete, not during normal operation.
676    pub fn force_merge(&mut self, max_segments: usize) -> Result<()> {
677        loop {
678            let segments = self.storage.segments();
679            if segments.len() <= max_segments {
680                break;
681            }
682
683            // Merge all segments into one batch (up to max_merge_at_once)
684            let ids: Vec<SegmentId> = segments
685                .iter()
686                .take(self.merge_policy.max_merge_at_once)
687                .map(|e| e.segment_id)
688                .collect();
689
690            if ids.len() < 2 {
691                break;
692            }
693
694            self.execute_merge(&ids)?;
695        }
696        Ok(())
697    }
698
699    /// Number of documents in the current (unflushed) buffer.
700    pub fn buffered_doc_count(&self) -> u32 {
701        self.builder.doc_count()
702    }
703
704    /// Discard the in-memory segment buffer without flushing to storage.
705    ///
706    /// Used for transaction rollback. Resets the builder to an empty state.
707    pub fn discard_buffer(&mut self) {
708        let seg_id = SegmentId::new(self.next_segment_id);
709        self.builder = SegmentBuilder::new(seg_id, &self.schema);
710        self.buffer_size = 0;
711    }
712}
713
714#[cfg(test)]
715mod tests {
716    use super::*;
717    use crate::mapping::FieldType;
718    use crate::storage::SingleFileDirectory;
719    use std::path::PathBuf;
720
721    fn test_dir(name: &str) -> PathBuf {
722        let dir =
723            std::env::temp_dir().join(format!("luci_writer_test_{}_{name}", std::process::id()));
724        let _ = std::fs::remove_dir_all(&dir);
725        dir
726    }
727
728    fn cleanup(path: &std::path::Path) {
729        let _ = std::fs::remove_dir_all(path);
730    }
731
732    fn basic_setup(name: &str) -> (PathBuf, IndexWriter) {
733        let path = test_dir(name);
734        let storage = SingleFileDirectory::create(&path).unwrap();
735        let schema = Mapping::builder()
736            .field("title", FieldType::Text)
737            .field("status", FieldType::Keyword)
738            .build();
739        let analyzers = AnalyzerRegistry::new();
740        let writer = IndexWriter::new(storage, schema, analyzers);
741        (path, writer)
742    }
743
744    #[test]
745    fn put_single_doc_and_commit() {
746        let (path, mut writer) = basic_setup("single");
747        writer
748            .add(serde_json::json!({
749                "title": "hello world",
750                "status": "active"
751            }))
752            .unwrap();
753        writer.commit().unwrap();
754
755        // Verify segment was written
756        let storage = SingleFileDirectory::open(&path).unwrap();
757        assert_eq!(storage.segments().len(), 1);
758
759        cleanup(&path);
760    }
761
762    #[test]
763    fn put_multiple_docs_and_commit() {
764        let (path, mut writer) = basic_setup("multi");
765        for i in 0..10 {
766            writer
767                .add(serde_json::json!({
768                    "title": format!("document {i}"),
769                    "status": "published"
770                }))
771                .unwrap();
772        }
773        writer.commit().unwrap();
774
775        let storage = SingleFileDirectory::open(&path).unwrap();
776        assert_eq!(storage.segments().len(), 1);
777
778        cleanup(&path);
779    }
780
781    #[test]
782    fn text_fields_analyzed() {
783        let (path, mut writer) = basic_setup("analyzed");
784        writer
785            .add(serde_json::json!({
786                "title": "The Quick Brown Fox",
787                "status": "active"
788            }))
789            .unwrap();
790        writer.commit().unwrap();
791
792        // Read the segment and verify lowercase terms are in the index
793        let storage = SingleFileDirectory::open(&path).unwrap();
794        let seg_id = storage.segments()[0].segment_id;
795        let data = storage.read_segment(seg_id).unwrap();
796
797        use crate::segment::reader::SegmentReader;
798        let reader = SegmentReader::open(data).unwrap();
799
800        // Standard analyzer should lowercase
801        assert!(reader.postings(FieldId::new(0), "the").is_some());
802        assert!(reader.postings(FieldId::new(0), "quick").is_some());
803        assert!(reader.postings(FieldId::new(0), "brown").is_some());
804        assert!(reader.postings(FieldId::new(0), "fox").is_some());
805
806        // Original casing should not be found
807        assert!(reader.postings(FieldId::new(0), "The").is_none());
808        assert!(reader.postings(FieldId::new(0), "Quick").is_none());
809
810        cleanup(&path);
811    }
812
813    #[test]
814    fn keyword_fields_exact() {
815        let (path, mut writer) = basic_setup("keyword");
816        writer
817            .add(serde_json::json!({
818                "title": "test",
819                "status": "Active"
820            }))
821            .unwrap();
822        writer.commit().unwrap();
823
824        let storage = SingleFileDirectory::open(&path).unwrap();
825        let data = storage
826            .read_segment(storage.segments()[0].segment_id)
827            .unwrap();
828
829        use crate::segment::reader::SegmentReader;
830        let reader = SegmentReader::open(data).unwrap();
831
832        // Keyword field should preserve case
833        assert!(reader.postings(FieldId::new(1), "Active").is_some());
834        assert!(reader.postings(FieldId::new(1), "active").is_none());
835
836        cleanup(&path);
837    }
838
839    #[test]
840    fn commit_with_no_docs_is_noop() {
841        let (path, mut writer) = basic_setup("empty_commit");
842        writer.commit().unwrap();
843
844        let storage = SingleFileDirectory::open(&path).unwrap();
845        assert!(storage.segments().is_empty());
846
847        cleanup(&path);
848    }
849
850    #[test]
851    fn auto_flush_on_memory_budget() {
852        let (path, mut writer) = basic_setup("autoflush");
853        writer.set_memory_budget(100); // Very small budget
854
855        for i in 0..5 {
856            writer
857                .add(serde_json::json!({
858                    "title": format!("document number {i} with some extra text to exceed the budget"),
859                    "status": "active"
860                }))
861                .unwrap();
862        }
863        writer.commit().unwrap();
864
865        // Multiple segments should have been created due to auto-flush
866        let storage = SingleFileDirectory::open(&path).unwrap();
867        assert!(
868            storage.segments().len() > 1,
869            "expected multiple segments from auto-flush, got {}",
870            storage.segments().len()
871        );
872
873        cleanup(&path);
874    }
875
876    #[test]
877    fn dynamic_false_ignores_unknown() {
878        let path = test_dir("dynamic_false");
879        let storage = SingleFileDirectory::create(&path).unwrap();
880        let schema = Mapping::builder()
881            .field("title", FieldType::Text)
882            .dynamic(DynamicMode::False)
883            .build();
884        let analyzers = AnalyzerRegistry::new();
885        let mut writer = IndexWriter::new(storage, schema, analyzers);
886
887        // Should succeed — unknown field is silently ignored
888        writer
889            .add(serde_json::json!({
890                "title": "hello",
891                "unknown_field": "value"
892            }))
893            .unwrap();
894        writer.commit().unwrap();
895
896        cleanup(&path);
897    }
898
899    #[test]
900    fn multiple_commits() {
901        let (path, mut writer) = basic_setup("multi_commit");
902        writer
903            .add(serde_json::json!({"title": "first", "status": "a"}))
904            .unwrap();
905        writer.commit().unwrap();
906
907        writer
908            .add(serde_json::json!({"title": "second", "status": "b"}))
909            .unwrap();
910        writer.commit().unwrap();
911
912        let storage = SingleFileDirectory::open(&path).unwrap();
913        assert_eq!(storage.segments().len(), 2);
914
915        cleanup(&path);
916    }
917
918    #[test]
919    fn source_stored_correctly() {
920        let (path, mut writer) = basic_setup("source");
921        let doc = serde_json::json!({"title": "hello world", "status": "active"});
922        writer.add(doc.clone()).unwrap();
923        writer.commit().unwrap();
924
925        let storage = SingleFileDirectory::open(&path).unwrap();
926        let data = storage
927            .read_segment(storage.segments()[0].segment_id)
928            .unwrap();
929
930        use crate::segment::reader::SegmentReader;
931        let reader = SegmentReader::open(data).unwrap();
932        let source = reader.doc_store().get(0).unwrap();
933        let stored: serde_json::Value = serde_json::from_slice(&source).unwrap();
934        assert_eq!(stored, doc);
935
936        cleanup(&path);
937    }
938}