Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter - async document indexing with parallel segment building
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8use rustc_hash::FxHashMap;
9use tokio::sync::Mutex as AsyncMutex;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::JoinHandle;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{
18    SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
19};
20use crate::tokenizer::BoxedTokenizer;
21
22use super::IndexConfig;
23
24/// Message sent to worker tasks
25enum WorkerMessage {
26    /// A document to index
27    Document(Document),
28    /// Signal to flush current builder and respond when done
29    Flush(oneshot::Sender<()>),
30}
31
32/// Async IndexWriter for adding documents and committing segments
33///
34/// Features:
35/// - Queue-based parallel indexing with worker tasks
36/// - Streams documents to disk immediately (no in-memory document storage)
37/// - Uses string interning for terms (reduced allocations)
38/// - Uses hashbrown HashMap (faster than BTreeMap)
39///
40/// **Architecture:**
41/// - `add_document()` sends to per-worker unbounded channels (non-blocking)
42/// - Round-robin distribution across workers - no mutex contention
43/// - Each worker owns a SegmentBuilder and flushes when memory threshold is reached
44///
45/// **State management:**
46/// - Building segments: Managed here (pending_builds)
47/// - Committed segments + metadata: Managed by SegmentManager (sole owner of metadata.json)
48pub struct IndexWriter<D: DirectoryWriter + 'static> {
49    pub(super) directory: Arc<D>,
50    pub(super) schema: Arc<Schema>,
51    pub(super) config: IndexConfig,
52    #[allow(dead_code)] // Used for creating new builders in worker_state
53    builder_config: SegmentBuilderConfig,
54    tokenizers: FxHashMap<Field, BoxedTokenizer>,
55    /// Per-worker channel senders - round-robin distribution
56    worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
57    /// Round-robin counter for worker selection
58    next_worker: AtomicUsize,
59    /// Worker task handles - kept alive to prevent premature shutdown
60    #[allow(dead_code)]
61    workers: Vec<JoinHandle<()>>,
62    /// Shared state for workers
63    #[allow(dead_code)]
64    worker_state: Arc<WorkerState<D>>,
65    /// Segment manager - owns metadata.json, handles segments and background merging
66    pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
67    /// Channel receiver for completed segment IDs and doc counts
68    segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<(String, u32)>>,
69    /// Count of in-flight background builds
70    pending_builds: Arc<AtomicUsize>,
71    /// Segments flushed to disk but not yet registered in metadata
72    flushed_segments: AsyncMutex<Vec<(String, u32)>>,
73}
74
75/// Shared state for worker tasks
76struct WorkerState<D: DirectoryWriter + 'static> {
77    directory: Arc<D>,
78    schema: Arc<Schema>,
79    config: IndexConfig,
80    builder_config: SegmentBuilderConfig,
81    tokenizers: FxHashMap<Field, BoxedTokenizer>,
82    segment_id_sender: mpsc::UnboundedSender<(String, u32)>,
83    pending_builds: Arc<AtomicUsize>,
84    /// Limits concurrent segment builds to prevent OOM from unbounded build parallelism.
85    /// Workers block at acquire, providing natural backpressure.
86    build_semaphore: Arc<tokio::sync::Semaphore>,
87}
88
89impl<D: DirectoryWriter + 'static> IndexWriter<D> {
90    /// Create a new index in the directory
91    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
92        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
93    }
94
95    /// Create a new index with custom builder config
96    pub async fn create_with_config(
97        directory: D,
98        schema: Schema,
99        config: IndexConfig,
100        builder_config: SegmentBuilderConfig,
101    ) -> Result<Self> {
102        let directory = Arc::new(directory);
103        let schema = Arc::new(schema);
104
105        // Create channel for background builds to report completed segment IDs
106        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
107
108        // Initialize metadata with schema
109        let metadata = super::IndexMetadata::new((*schema).clone());
110
111        // Create segment manager - owns metadata.json
112        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
113            Arc::clone(&directory),
114            Arc::clone(&schema),
115            metadata,
116            config.merge_policy.clone_box(),
117            config.term_cache_blocks,
118        ));
119
120        // Save initial metadata
121        segment_manager.update_metadata(|_| {}).await?;
122
123        let pending_builds = Arc::new(AtomicUsize::new(0));
124
125        // Limit concurrent segment builds to prevent OOM.
126        // With N workers, allow at most ceil(N/2) concurrent builds.
127        let num_workers = config.num_indexing_threads.max(1);
128        let max_concurrent_builds = num_workers.div_ceil(2).max(1);
129        let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
130
131        // Create shared worker state
132        let worker_state = Arc::new(WorkerState {
133            directory: Arc::clone(&directory),
134            schema: Arc::clone(&schema),
135            config: config.clone(),
136            builder_config: builder_config.clone(),
137            tokenizers: FxHashMap::default(),
138            segment_id_sender,
139            pending_builds: Arc::clone(&pending_builds),
140            build_semaphore,
141        });
142
143        // Create per-worker unbounded channels and spawn workers
144        let mut worker_senders = Vec::with_capacity(num_workers);
145        let mut workers = Vec::with_capacity(num_workers);
146
147        for _ in 0..num_workers {
148            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
149            worker_senders.push(tx);
150
151            let state = Arc::clone(&worker_state);
152            let handle = tokio::spawn(async move {
153                Self::worker_loop(state, rx).await;
154            });
155            workers.push(handle);
156        }
157
158        Ok(Self {
159            directory,
160            schema,
161            config,
162            builder_config,
163            tokenizers: FxHashMap::default(),
164            worker_senders,
165            next_worker: AtomicUsize::new(0),
166            workers,
167            worker_state,
168            segment_manager,
169            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
170            pending_builds,
171            flushed_segments: AsyncMutex::new(Vec::new()),
172        })
173    }
174
175    /// Open an existing index for writing
176    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
177        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
178    }
179
180    /// Open an existing index with custom builder config
181    pub async fn open_with_config(
182        directory: D,
183        config: IndexConfig,
184        builder_config: SegmentBuilderConfig,
185    ) -> Result<Self> {
186        let directory = Arc::new(directory);
187
188        // Load unified metadata (includes schema)
189        let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
190        let schema = Arc::new(metadata.schema.clone());
191
192        // Create channel for background builds to report completed segment IDs
193        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
194
195        // Create segment manager - owns metadata.json
196        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
197            Arc::clone(&directory),
198            Arc::clone(&schema),
199            metadata,
200            config.merge_policy.clone_box(),
201            config.term_cache_blocks,
202        ));
203
204        let pending_builds = Arc::new(AtomicUsize::new(0));
205
206        // Limit concurrent segment builds to prevent OOM.
207        let num_workers = config.num_indexing_threads.max(1);
208        let max_concurrent_builds = num_workers.div_ceil(2).max(1);
209        let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
210
211        // Create shared worker state
212        let worker_state = Arc::new(WorkerState {
213            directory: Arc::clone(&directory),
214            schema: Arc::clone(&schema),
215            config: config.clone(),
216            builder_config: builder_config.clone(),
217            tokenizers: FxHashMap::default(),
218            segment_id_sender,
219            pending_builds: Arc::clone(&pending_builds),
220            build_semaphore,
221        });
222
223        // Create per-worker unbounded channels and spawn workers
224        let mut worker_senders = Vec::with_capacity(num_workers);
225        let mut workers = Vec::with_capacity(num_workers);
226
227        for _ in 0..num_workers {
228            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
229            worker_senders.push(tx);
230
231            let state = Arc::clone(&worker_state);
232            let handle = tokio::spawn(async move {
233                Self::worker_loop(state, rx).await;
234            });
235            workers.push(handle);
236        }
237
238        Ok(Self {
239            directory,
240            schema,
241            config,
242            builder_config,
243            tokenizers: FxHashMap::default(),
244            worker_senders,
245            next_worker: AtomicUsize::new(0),
246            workers,
247            worker_state,
248            segment_manager,
249            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
250            pending_builds,
251            flushed_segments: AsyncMutex::new(Vec::new()),
252        })
253    }
254
255    /// Create an IndexWriter from an existing Index
256    ///
257    /// This shares the SegmentManager with the Index, ensuring consistent
258    /// segment lifecycle management.
259    pub fn from_index(index: &super::Index<D>) -> Self {
260        let segment_manager = Arc::clone(&index.segment_manager);
261        let directory = Arc::clone(&index.directory);
262        let schema = Arc::clone(&index.schema);
263        let config = index.config.clone();
264        let builder_config = crate::segment::SegmentBuilderConfig::default();
265
266        // Create channel for background builds
267        let (segment_id_sender, segment_id_receiver) = tokio::sync::mpsc::unbounded_channel();
268
269        let pending_builds = Arc::new(AtomicUsize::new(0));
270
271        // Limit concurrent segment builds to prevent OOM.
272        let num_workers = config.num_indexing_threads.max(1);
273        let max_concurrent_builds = num_workers.div_ceil(2).max(1);
274        let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
275
276        // Create shared worker state
277        let worker_state = Arc::new(WorkerState {
278            directory: Arc::clone(&directory),
279            schema: Arc::clone(&schema),
280            config: config.clone(),
281            builder_config: builder_config.clone(),
282            tokenizers: FxHashMap::default(),
283            segment_id_sender,
284            pending_builds: Arc::clone(&pending_builds),
285            build_semaphore,
286        });
287
288        // Create per-worker channels and spawn workers
289        let mut worker_senders = Vec::with_capacity(num_workers);
290        let mut workers = Vec::with_capacity(num_workers);
291
292        for _ in 0..num_workers {
293            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<WorkerMessage>();
294            worker_senders.push(tx);
295
296            let state = Arc::clone(&worker_state);
297            let handle = tokio::spawn(async move {
298                Self::worker_loop(state, rx).await;
299            });
300            workers.push(handle);
301        }
302
303        Self {
304            directory,
305            schema,
306            config,
307            builder_config,
308            tokenizers: FxHashMap::default(),
309            worker_senders,
310            next_worker: AtomicUsize::new(0),
311            workers,
312            worker_state,
313            segment_manager,
314            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
315            pending_builds,
316            flushed_segments: AsyncMutex::new(Vec::new()),
317        }
318    }
319
320    /// Get the schema
321    pub fn schema(&self) -> &Schema {
322        &self.schema
323    }
324
325    /// Set tokenizer for a field
326    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
327        self.tokenizers.insert(field, Box::new(tokenizer));
328    }
329
330    /// Add a document to the indexing queue
331    ///
332    /// Documents are sent to per-worker unbounded channels.
333    /// This is O(1) and never blocks - returns immediately.
334    /// Workers handle the actual indexing in parallel.
335    pub fn add_document(&self, doc: Document) -> Result<DocId> {
336        // Round-robin select worker
337        let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
338        self.worker_senders[idx]
339            .send(WorkerMessage::Document(doc))
340            .map_err(|_| Error::Internal("Document channel closed".into()))?;
341        Ok(0)
342    }
343
344    /// Add multiple documents to the indexing queue
345    ///
346    /// Documents are distributed round-robin to workers.
347    /// Returns immediately - never blocks.
348    pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
349        let num_workers = self.worker_senders.len();
350        let count = documents.len();
351        let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
352        for (i, doc) in documents.into_iter().enumerate() {
353            let idx = (base + i) % num_workers;
354            let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
355        }
356        Ok(count)
357    }
358
359    /// Worker loop - polls messages from its own channel and indexes documents
360    async fn worker_loop(
361        state: Arc<WorkerState<D>>,
362        mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
363    ) {
364        let mut builder: Option<SegmentBuilder> = None;
365        let mut _doc_count = 0u32;
366
367        loop {
368            // Receive from own channel - no mutex contention
369            let msg = receiver.recv().await;
370
371            let Some(msg) = msg else {
372                // Channel closed - flush remaining docs and exit
373                if let Some(b) = builder.take()
374                    && b.num_docs() > 0
375                {
376                    Self::spawn_segment_build(&state, b).await;
377                }
378                return;
379            };
380
381            match msg {
382                WorkerMessage::Document(doc) => {
383                    // Initialize builder if needed
384                    if builder.is_none() {
385                        match SegmentBuilder::new(
386                            (*state.schema).clone(),
387                            state.builder_config.clone(),
388                        ) {
389                            Ok(mut b) => {
390                                for (field, tokenizer) in &state.tokenizers {
391                                    b.set_tokenizer(*field, tokenizer.clone_box());
392                                }
393                                builder = Some(b);
394                            }
395                            Err(e) => {
396                                eprintln!("Failed to create segment builder: {:?}", e);
397                                continue;
398                            }
399                        }
400                    }
401
402                    // Index the document
403                    let b = builder.as_mut().unwrap();
404                    if let Err(e) = b.add_document(doc) {
405                        eprintln!("Failed to index document: {:?}", e);
406                        continue;
407                    }
408
409                    _doc_count += 1;
410
411                    // Periodically recalibrate memory estimate using capacity-based
412                    // calculation. The incremental tracker undercounts by ~33% because
413                    // Vec::push doubles capacity but we only track element sizes.
414                    if b.num_docs().is_multiple_of(1000) {
415                        b.recalibrate_memory();
416                    }
417
418                    // Check memory after every document - O(1) with incremental tracking.
419                    // Shrink per-worker budget when builds are in-flight to account for
420                    // build-phase memory amplification (posting duplication, sparse serialization).
421                    let in_flight = state.pending_builds.load(Ordering::Relaxed);
422                    let effective_slots = state.config.num_indexing_threads.max(1) + in_flight * 4;
423                    let per_worker_limit = state.config.max_indexing_memory_bytes / effective_slots;
424                    let builder_memory = b.estimated_memory_bytes();
425
426                    // Log memory usage periodically
427                    if _doc_count.is_multiple_of(10_000) {
428                        log::debug!(
429                            "[indexing] docs={}, memory={:.2} MB, limit={:.2} MB",
430                            b.num_docs(),
431                            builder_memory as f64 / (1024.0 * 1024.0),
432                            per_worker_limit as f64 / (1024.0 * 1024.0)
433                        );
434                    }
435
436                    // Require minimum 100 docs before flushing to avoid tiny segments
437                    // (sparse vectors with many dims can hit memory limit quickly)
438                    const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
439                    let doc_count = b.num_docs();
440
441                    if builder_memory >= per_worker_limit && doc_count >= MIN_DOCS_BEFORE_FLUSH {
442                        // Get detailed stats for debugging memory issues
443                        let stats = b.stats();
444                        let mb = stats.memory_breakdown;
445                        log::info!(
446                            "[indexing] flushing segment: docs={}, est_mem={:.2} MB, actual_mem={:.2} MB, \
447                             postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB, interner={:.2} MB, \
448                             unique_terms={}, sparse_dims={}",
449                            doc_count,
450                            builder_memory as f64 / (1024.0 * 1024.0),
451                            stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
452                            mb.postings_bytes as f64 / (1024.0 * 1024.0),
453                            mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
454                            mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
455                            mb.interner_bytes as f64 / (1024.0 * 1024.0),
456                            stats.unique_terms,
457                            b.sparse_dim_count(),
458                        );
459                        let full_builder = builder.take().unwrap();
460                        Self::spawn_segment_build(&state, full_builder).await;
461                        _doc_count = 0;
462                    }
463                }
464                WorkerMessage::Flush(respond) => {
465                    // Flush current builder if it has documents
466                    if let Some(b) = builder.take()
467                        && b.num_docs() > 0
468                    {
469                        // Log detailed memory breakdown on flush
470                        let stats = b.stats();
471                        let mb = stats.memory_breakdown;
472                        log::info!(
473                            "[indexing_flush] docs={}, total_mem={:.2} MB, \
474                             postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB ({} vectors), \
475                             interner={:.2} MB, positions={:.2} MB, unique_terms={}",
476                            b.num_docs(),
477                            stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
478                            mb.postings_bytes as f64 / (1024.0 * 1024.0),
479                            mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
480                            mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
481                            mb.dense_vector_count,
482                            mb.interner_bytes as f64 / (1024.0 * 1024.0),
483                            mb.position_index_bytes as f64 / (1024.0 * 1024.0),
484                            stats.unique_terms,
485                        );
486                        Self::spawn_segment_build(&state, b).await;
487                    }
488                    _doc_count = 0;
489                    // Signal that flush is complete for this worker
490                    let _ = respond.send(());
491                }
492            }
493        }
494    }
495    async fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
496        // Acquire semaphore permit before spawning - blocks if too many builds in flight.
497        // This provides backpressure: workers pause indexing until a build slot opens.
498        let permit = state.build_semaphore.clone().acquire_owned().await.unwrap();
499
500        let directory = Arc::clone(&state.directory);
501        let segment_id = SegmentId::new();
502        let segment_hex = segment_id.to_hex();
503        let sender = state.segment_id_sender.clone();
504        let pending_builds = Arc::clone(&state.pending_builds);
505
506        let doc_count = builder.num_docs();
507        let memory_bytes = builder.estimated_memory_bytes();
508
509        log::info!(
510            "[segment_build_started] segment_id={} doc_count={} memory_bytes={}",
511            segment_hex,
512            doc_count,
513            memory_bytes
514        );
515
516        pending_builds.fetch_add(1, Ordering::SeqCst);
517
518        tokio::spawn(async move {
519            let _permit = permit; // held for build duration, released on drop
520            let build_start = std::time::Instant::now();
521            let result = match builder.build(directory.as_ref(), segment_id).await {
522                Ok(meta) => {
523                    let build_duration_ms = build_start.elapsed().as_millis() as u64;
524                    log::info!(
525                        "[segment_build_completed] segment_id={} doc_count={} duration_ms={}",
526                        segment_hex,
527                        meta.num_docs,
528                        build_duration_ms
529                    );
530                    (segment_hex, meta.num_docs)
531                }
532                Err(e) => {
533                    log::error!(
534                        "[segment_build_failed] segment_id={} error={}",
535                        segment_hex,
536                        e
537                    );
538                    eprintln!("Background segment build failed: {:?}", e);
539                    // Signal failure with num_docs=0 so waiters don't block
540                    (segment_hex, 0)
541                }
542            };
543            // Always send to channel and decrement - even on failure
544            // This ensures flush()/commit() doesn't hang waiting for messages
545            let _ = sender.send(result);
546            pending_builds.fetch_sub(1, Ordering::SeqCst);
547        });
548    }
549
550    /// Get the number of pending background builds
551    pub fn pending_build_count(&self) -> usize {
552        self.pending_builds.load(Ordering::SeqCst)
553    }
554
555    /// Get the number of pending background merges
556    pub fn pending_merge_count(&self) -> usize {
557        self.segment_manager.pending_merge_count()
558    }
559
560    /// Check merge policy and spawn background merges if needed
561    ///
562    /// This is called automatically after segment builds complete via SegmentManager.
563    /// Can also be called manually to trigger merge checking.
564    pub async fn maybe_merge(&self) {
565        self.segment_manager.maybe_merge().await;
566    }
567
568    /// Wait for all pending merges to complete
569    pub async fn wait_for_merges(&self) {
570        self.segment_manager.wait_for_merges().await;
571    }
572
573    /// Get the segment tracker for sharing with readers
574    /// This allows readers to acquire snapshots that prevent segment deletion
575    pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
576        self.segment_manager.tracker()
577    }
578
579    /// Acquire a snapshot of current segments for reading
580    /// The snapshot holds references - segments won't be deleted while snapshot exists
581    pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot<D> {
582        self.segment_manager.acquire_snapshot().await
583    }
584
585    /// Clean up orphan segment files that are not registered
586    ///
587    /// This can happen if the process halts after segment files are written
588    /// but before they are registered in segments.json. Call this after opening
589    /// an index to reclaim disk space from incomplete operations.
590    ///
591    /// Returns the number of orphan segments deleted.
592    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
593        self.segment_manager.cleanup_orphan_segments().await
594    }
595
596    /// Flush all workers - serializes in-memory data to segment files on disk
597    ///
598    /// Sends flush signals to all workers, waits for them to acknowledge,
599    /// then waits for ALL pending background builds to complete.
600    /// Completed segments are accumulated in `flushed_segments` but NOT
601    /// registered in metadata - only `commit()` does that.
602    ///
603    /// Workers continue running and can accept new documents after flush.
604    pub async fn flush(&self) -> Result<()> {
605        // Send flush signal to each worker's channel
606        let mut responses = Vec::with_capacity(self.worker_senders.len());
607
608        for sender in &self.worker_senders {
609            let (tx, rx) = oneshot::channel();
610            if sender.send(WorkerMessage::Flush(tx)).is_err() {
611                // Channel closed, worker may have exited
612                continue;
613            }
614            responses.push(rx);
615        }
616
617        // Wait for all workers to acknowledge flush
618        for rx in responses {
619            let _ = rx.await;
620        }
621
622        // Wait for ALL pending builds to complete and collect results
623        let mut receiver = self.segment_id_receiver.lock().await;
624        while self.pending_builds.load(Ordering::SeqCst) > 0 {
625            if let Some((segment_hex, num_docs)) = receiver.recv().await {
626                if num_docs > 0 {
627                    self.flushed_segments
628                        .lock()
629                        .await
630                        .push((segment_hex, num_docs));
631                }
632            } else {
633                break; // Channel closed
634            }
635        }
636
637        // Drain any remaining messages (builds that completed between checks)
638        while let Ok((segment_hex, num_docs)) = receiver.try_recv() {
639            if num_docs > 0 {
640                self.flushed_segments
641                    .lock()
642                    .await
643                    .push((segment_hex, num_docs));
644            }
645        }
646
647        Ok(())
648    }
649
650    /// Commit all pending segments to metadata and wait for completion
651    ///
652    /// Calls `flush()` to serialize all in-memory data to disk, then
653    /// registers flushed segments in metadata. This provides transactional
654    /// semantics: on crash before commit, orphan files are cleaned up by
655    /// `cleanup_orphan_segments()`.
656    ///
657    /// **Auto-triggers vector index build** when threshold is crossed for any field.
658    pub async fn commit(&self) -> Result<()> {
659        // Flush all workers and wait for builds to complete
660        self.flush().await?;
661
662        // Register all flushed segments in metadata
663        let segments = std::mem::take(&mut *self.flushed_segments.lock().await);
664        for (segment_hex, num_docs) in segments {
665            self.segment_manager
666                .register_segment(segment_hex, num_docs)
667                .await?;
668        }
669
670        // Auto-trigger vector index build if threshold crossed
671        self.maybe_build_vector_index().await?;
672
673        Ok(())
674    }
675
676    // Vector index building methods are in vector_builder.rs
677
678    /// Merge all segments into one (called explicitly via force_merge)
679    async fn do_merge(&self) -> Result<()> {
680        let segment_ids = self.segment_manager.get_segment_ids().await;
681
682        if segment_ids.len() < 2 {
683            return Ok(());
684        }
685
686        let ids_to_merge: Vec<String> = segment_ids;
687
688        // Load segment readers
689        let mut readers = Vec::new();
690        let mut doc_offset = 0u32;
691
692        for id_str in &ids_to_merge {
693            let segment_id = SegmentId::from_hex(id_str)
694                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
695            let reader = SegmentReader::open(
696                self.directory.as_ref(),
697                segment_id,
698                Arc::clone(&self.schema),
699                doc_offset,
700                self.config.term_cache_blocks,
701            )
702            .await?;
703            doc_offset += reader.meta().num_docs;
704            readers.push(reader);
705        }
706
707        // Calculate total doc count for the merged segment
708        let total_docs: u32 = readers.iter().map(|r| r.meta().num_docs).sum();
709
710        // Merge into new segment
711        let merger = SegmentMerger::new(Arc::clone(&self.schema));
712        let new_segment_id = SegmentId::new();
713        merger
714            .merge(self.directory.as_ref(), &readers, new_segment_id)
715            .await?;
716
717        // Atomically update segments and delete old ones via SegmentManager
718        self.segment_manager
719            .replace_segments(vec![(new_segment_id.to_hex(), total_docs)], ids_to_merge)
720            .await?;
721
722        Ok(())
723    }
724
725    /// Force merge all segments into one
726    pub async fn force_merge(&self) -> Result<()> {
727        // First commit all pending documents (waits for completion)
728        self.commit().await?;
729        // Wait for any background merges to complete (avoid race with segment deletion)
730        self.wait_for_merges().await;
731        // Then merge all segments
732        self.do_merge().await
733    }
734
735    // Vector index methods (build_vector_index, rebuild_vector_index, etc.)
736    // are implemented in vector_builder.rs
737}