Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter - async document indexing with parallel segment building
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8use rustc_hash::FxHashMap;
9use tokio::sync::Mutex as AsyncMutex;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::JoinHandle;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{
18    SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
19};
20use crate::tokenizer::BoxedTokenizer;
21
22use super::IndexConfig;
23
24/// Message sent to worker tasks
25enum WorkerMessage {
26    /// A document to index
27    Document(Document),
28    /// Signal to flush current builder and respond when done
29    Flush(oneshot::Sender<()>),
30}
31
32/// Async IndexWriter for adding documents and committing segments
33///
34/// Features:
35/// - Queue-based parallel indexing with worker tasks
36/// - Streams documents to disk immediately (no in-memory document storage)
37/// - Uses string interning for terms (reduced allocations)
38/// - Uses hashbrown HashMap (faster than BTreeMap)
39///
40/// **Architecture:**
41/// - `add_document()` sends to per-worker unbounded channels (non-blocking)
42/// - Round-robin distribution across workers - no mutex contention
43/// - Each worker owns a SegmentBuilder and flushes when memory threshold is reached
44///
45/// **State management:**
46/// - Building segments: Managed here (pending_builds)
47/// - Committed segments + metadata: Managed by SegmentManager (sole owner of metadata.json)
48pub struct IndexWriter<D: DirectoryWriter + 'static> {
49    pub(super) directory: Arc<D>,
50    pub(super) schema: Arc<Schema>,
51    pub(super) config: IndexConfig,
52    #[allow(dead_code)] // Used for creating new builders in worker_state
53    builder_config: SegmentBuilderConfig,
54    tokenizers: FxHashMap<Field, BoxedTokenizer>,
55    /// Per-worker channel senders - round-robin distribution
56    worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
57    /// Round-robin counter for worker selection
58    next_worker: AtomicUsize,
59    /// Worker task handles - kept alive to prevent premature shutdown
60    #[allow(dead_code)]
61    workers: Vec<JoinHandle<()>>,
62    /// Shared state for workers
63    #[allow(dead_code)]
64    worker_state: Arc<WorkerState<D>>,
65    /// Segment manager - owns metadata.json, handles segments and background merging
66    pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
67    /// Channel receiver for completed segment IDs and doc counts
68    segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<(String, u32)>>,
69    /// Count of in-flight background builds
70    pending_builds: Arc<AtomicUsize>,
71    /// Segments flushed to disk but not yet registered in metadata
72    flushed_segments: AsyncMutex<Vec<(String, u32)>>,
73}
74
75/// Shared state for worker tasks
76struct WorkerState<D: DirectoryWriter + 'static> {
77    directory: Arc<D>,
78    schema: Arc<Schema>,
79    config: IndexConfig,
80    builder_config: SegmentBuilderConfig,
81    tokenizers: FxHashMap<Field, BoxedTokenizer>,
82    segment_id_sender: mpsc::UnboundedSender<(String, u32)>,
83    pending_builds: Arc<AtomicUsize>,
84    /// Limits concurrent segment builds to prevent OOM from unbounded build parallelism.
85    /// Workers block at acquire, providing natural backpressure.
86    build_semaphore: Arc<tokio::sync::Semaphore>,
87}
88
89impl<D: DirectoryWriter + 'static> IndexWriter<D> {
90    /// Create a new index in the directory
91    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
92        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
93    }
94
95    /// Create a new index with custom builder config
96    pub async fn create_with_config(
97        directory: D,
98        schema: Schema,
99        config: IndexConfig,
100        builder_config: SegmentBuilderConfig,
101    ) -> Result<Self> {
102        let directory = Arc::new(directory);
103        let schema = Arc::new(schema);
104
105        // Create channel for background builds to report completed segment IDs
106        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
107
108        // Initialize metadata with schema
109        let metadata = super::IndexMetadata::new((*schema).clone());
110
111        // Create segment manager - owns metadata.json
112        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
113            Arc::clone(&directory),
114            Arc::clone(&schema),
115            metadata,
116            config.merge_policy.clone_box(),
117            config.term_cache_blocks,
118        ));
119
120        // Save initial metadata
121        segment_manager.update_metadata(|_| {}).await?;
122
123        let pending_builds = Arc::new(AtomicUsize::new(0));
124
125        // Limit concurrent segment builds to prevent OOM.
126        // With N workers, allow at most ceil(N/2) concurrent builds.
127        let num_workers = config.num_indexing_threads.max(1);
128        let max_concurrent_builds = num_workers.div_ceil(2).max(1);
129        let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
130
131        // Create shared worker state
132        let worker_state = Arc::new(WorkerState {
133            directory: Arc::clone(&directory),
134            schema: Arc::clone(&schema),
135            config: config.clone(),
136            builder_config: builder_config.clone(),
137            tokenizers: FxHashMap::default(),
138            segment_id_sender,
139            pending_builds: Arc::clone(&pending_builds),
140            build_semaphore,
141        });
142
143        // Create per-worker unbounded channels and spawn workers
144        let mut worker_senders = Vec::with_capacity(num_workers);
145        let mut workers = Vec::with_capacity(num_workers);
146
147        for _ in 0..num_workers {
148            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
149            worker_senders.push(tx);
150
151            let state = Arc::clone(&worker_state);
152            let handle = tokio::spawn(async move {
153                Self::worker_loop(state, rx).await;
154            });
155            workers.push(handle);
156        }
157
158        Ok(Self {
159            directory,
160            schema,
161            config,
162            builder_config,
163            tokenizers: FxHashMap::default(),
164            worker_senders,
165            next_worker: AtomicUsize::new(0),
166            workers,
167            worker_state,
168            segment_manager,
169            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
170            pending_builds,
171            flushed_segments: AsyncMutex::new(Vec::new()),
172        })
173    }
174
175    /// Open an existing index for writing
176    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
177        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
178    }
179
180    /// Open an existing index with custom builder config
181    pub async fn open_with_config(
182        directory: D,
183        config: IndexConfig,
184        builder_config: SegmentBuilderConfig,
185    ) -> Result<Self> {
186        let directory = Arc::new(directory);
187
188        // Load unified metadata (includes schema)
189        let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
190        let schema = Arc::new(metadata.schema.clone());
191
192        // Create channel for background builds to report completed segment IDs
193        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
194
195        // Create segment manager - owns metadata.json
196        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
197            Arc::clone(&directory),
198            Arc::clone(&schema),
199            metadata,
200            config.merge_policy.clone_box(),
201            config.term_cache_blocks,
202        ));
203
204        let pending_builds = Arc::new(AtomicUsize::new(0));
205
206        // Limit concurrent segment builds to prevent OOM.
207        let num_workers = config.num_indexing_threads.max(1);
208        let max_concurrent_builds = num_workers.div_ceil(2).max(1);
209        let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
210
211        // Create shared worker state
212        let worker_state = Arc::new(WorkerState {
213            directory: Arc::clone(&directory),
214            schema: Arc::clone(&schema),
215            config: config.clone(),
216            builder_config: builder_config.clone(),
217            tokenizers: FxHashMap::default(),
218            segment_id_sender,
219            pending_builds: Arc::clone(&pending_builds),
220            build_semaphore,
221        });
222
223        // Create per-worker unbounded channels and spawn workers
224        let mut worker_senders = Vec::with_capacity(num_workers);
225        let mut workers = Vec::with_capacity(num_workers);
226
227        for _ in 0..num_workers {
228            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
229            worker_senders.push(tx);
230
231            let state = Arc::clone(&worker_state);
232            let handle = tokio::spawn(async move {
233                Self::worker_loop(state, rx).await;
234            });
235            workers.push(handle);
236        }
237
238        Ok(Self {
239            directory,
240            schema,
241            config,
242            builder_config,
243            tokenizers: FxHashMap::default(),
244            worker_senders,
245            next_worker: AtomicUsize::new(0),
246            workers,
247            worker_state,
248            segment_manager,
249            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
250            pending_builds,
251            flushed_segments: AsyncMutex::new(Vec::new()),
252        })
253    }
254
255    /// Create an IndexWriter from an existing Index
256    ///
257    /// This shares the SegmentManager with the Index, ensuring consistent
258    /// segment lifecycle management.
259    pub fn from_index(index: &super::Index<D>) -> Self {
260        let segment_manager = Arc::clone(&index.segment_manager);
261        let directory = Arc::clone(&index.directory);
262        let schema = Arc::clone(&index.schema);
263        let config = index.config.clone();
264        let builder_config = crate::segment::SegmentBuilderConfig::default();
265
266        // Create channel for background builds
267        let (segment_id_sender, segment_id_receiver) = tokio::sync::mpsc::unbounded_channel();
268
269        let pending_builds = Arc::new(AtomicUsize::new(0));
270
271        // Limit concurrent segment builds to prevent OOM.
272        let num_workers = config.num_indexing_threads.max(1);
273        let max_concurrent_builds = num_workers.div_ceil(2).max(1);
274        let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
275
276        // Create shared worker state
277        let worker_state = Arc::new(WorkerState {
278            directory: Arc::clone(&directory),
279            schema: Arc::clone(&schema),
280            config: config.clone(),
281            builder_config: builder_config.clone(),
282            tokenizers: FxHashMap::default(),
283            segment_id_sender,
284            pending_builds: Arc::clone(&pending_builds),
285            build_semaphore,
286        });
287
288        // Create per-worker channels and spawn workers
289        let mut worker_senders = Vec::with_capacity(num_workers);
290        let mut workers = Vec::with_capacity(num_workers);
291
292        for _ in 0..num_workers {
293            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<WorkerMessage>();
294            worker_senders.push(tx);
295
296            let state = Arc::clone(&worker_state);
297            let handle = tokio::spawn(async move {
298                Self::worker_loop(state, rx).await;
299            });
300            workers.push(handle);
301        }
302
303        Self {
304            directory,
305            schema,
306            config,
307            builder_config,
308            tokenizers: FxHashMap::default(),
309            worker_senders,
310            next_worker: AtomicUsize::new(0),
311            workers,
312            worker_state,
313            segment_manager,
314            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
315            pending_builds,
316            flushed_segments: AsyncMutex::new(Vec::new()),
317        }
318    }
319
320    /// Get the schema
321    pub fn schema(&self) -> &Schema {
322        &self.schema
323    }
324
325    /// Set tokenizer for a field
326    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
327        self.tokenizers.insert(field, Box::new(tokenizer));
328    }
329
330    /// Add a document to the indexing queue
331    ///
332    /// Documents are sent to per-worker unbounded channels.
333    /// This is O(1) and never blocks - returns immediately.
334    /// Workers handle the actual indexing in parallel.
335    pub fn add_document(&self, doc: Document) -> Result<DocId> {
336        // Round-robin select worker
337        let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
338        self.worker_senders[idx]
339            .send(WorkerMessage::Document(doc))
340            .map_err(|_| Error::Internal("Document channel closed".into()))?;
341        Ok(0)
342    }
343
344    /// Add multiple documents to the indexing queue
345    ///
346    /// Documents are distributed round-robin to workers.
347    /// Returns immediately - never blocks.
348    pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
349        let num_workers = self.worker_senders.len();
350        let count = documents.len();
351        let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
352        for (i, doc) in documents.into_iter().enumerate() {
353            let idx = (base + i) % num_workers;
354            let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
355        }
356        Ok(count)
357    }
358
359    /// Worker loop - polls messages from its own channel and indexes documents
360    async fn worker_loop(
361        state: Arc<WorkerState<D>>,
362        mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
363    ) {
364        let mut builder: Option<SegmentBuilder> = None;
365        let mut _doc_count = 0u32;
366
367        loop {
368            // Receive from own channel - no mutex contention
369            let msg = receiver.recv().await;
370
371            let Some(msg) = msg else {
372                // Channel closed - flush remaining docs and exit
373                if let Some(b) = builder.take()
374                    && b.num_docs() > 0
375                {
376                    Self::spawn_segment_build(&state, b).await;
377                }
378                return;
379            };
380
381            match msg {
382                WorkerMessage::Document(doc) => {
383                    // Initialize builder if needed
384                    if builder.is_none() {
385                        match SegmentBuilder::new(
386                            (*state.schema).clone(),
387                            state.builder_config.clone(),
388                        ) {
389                            Ok(mut b) => {
390                                for (field, tokenizer) in &state.tokenizers {
391                                    b.set_tokenizer(*field, tokenizer.clone_box());
392                                }
393                                builder = Some(b);
394                            }
395                            Err(e) => {
396                                eprintln!("Failed to create segment builder: {:?}", e);
397                                continue;
398                            }
399                        }
400                    }
401
402                    // Index the document
403                    let b = builder.as_mut().unwrap();
404                    if let Err(e) = b.add_document(doc) {
405                        eprintln!("Failed to index document: {:?}", e);
406                        continue;
407                    }
408
409                    _doc_count += 1;
410
411                    // Check memory after every document - O(1) with incremental tracking.
412                    // Shrink per-worker budget when builds are in-flight to account for
413                    // build-phase memory amplification (posting duplication, sparse serialization).
414                    let in_flight = state.pending_builds.load(Ordering::Relaxed);
415                    let effective_slots = state.config.num_indexing_threads.max(1) + in_flight * 2;
416                    let per_worker_limit = state.config.max_indexing_memory_bytes / effective_slots;
417                    let builder_memory = b.estimated_memory_bytes();
418
419                    // Log memory usage periodically
420                    if _doc_count.is_multiple_of(10_000) {
421                        log::debug!(
422                            "[indexing] docs={}, memory={:.2} MB, limit={:.2} MB",
423                            b.num_docs(),
424                            builder_memory as f64 / (1024.0 * 1024.0),
425                            per_worker_limit as f64 / (1024.0 * 1024.0)
426                        );
427                    }
428
429                    // Require minimum 100 docs before flushing to avoid tiny segments
430                    // (sparse vectors with many dims can hit memory limit quickly)
431                    const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
432                    let doc_count = b.num_docs();
433
434                    if builder_memory >= per_worker_limit && doc_count >= MIN_DOCS_BEFORE_FLUSH {
435                        // Get detailed stats for debugging memory issues
436                        let stats = b.stats();
437                        let mb = stats.memory_breakdown;
438                        log::info!(
439                            "[indexing] flushing segment: docs={}, est_mem={:.2} MB, actual_mem={:.2} MB, \
440                             postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB, interner={:.2} MB, \
441                             unique_terms={}, sparse_dims={}",
442                            doc_count,
443                            builder_memory as f64 / (1024.0 * 1024.0),
444                            stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
445                            mb.postings_bytes as f64 / (1024.0 * 1024.0),
446                            mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
447                            mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
448                            mb.interner_bytes as f64 / (1024.0 * 1024.0),
449                            stats.unique_terms,
450                            b.sparse_dim_count(),
451                        );
452                        let full_builder = builder.take().unwrap();
453                        Self::spawn_segment_build(&state, full_builder).await;
454                        _doc_count = 0;
455                    }
456                }
457                WorkerMessage::Flush(respond) => {
458                    // Flush current builder if it has documents
459                    if let Some(b) = builder.take()
460                        && b.num_docs() > 0
461                    {
462                        // Log detailed memory breakdown on flush
463                        let stats = b.stats();
464                        let mb = stats.memory_breakdown;
465                        log::info!(
466                            "[indexing_flush] docs={}, total_mem={:.2} MB, \
467                             postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB ({} vectors), \
468                             interner={:.2} MB, positions={:.2} MB, unique_terms={}",
469                            b.num_docs(),
470                            stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
471                            mb.postings_bytes as f64 / (1024.0 * 1024.0),
472                            mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
473                            mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
474                            mb.dense_vector_count,
475                            mb.interner_bytes as f64 / (1024.0 * 1024.0),
476                            mb.position_index_bytes as f64 / (1024.0 * 1024.0),
477                            stats.unique_terms,
478                        );
479                        Self::spawn_segment_build(&state, b).await;
480                    }
481                    _doc_count = 0;
482                    // Signal that flush is complete for this worker
483                    let _ = respond.send(());
484                }
485            }
486        }
487    }
488    async fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
489        // Acquire semaphore permit before spawning - blocks if too many builds in flight.
490        // This provides backpressure: workers pause indexing until a build slot opens.
491        let permit = state.build_semaphore.clone().acquire_owned().await.unwrap();
492
493        let directory = Arc::clone(&state.directory);
494        let segment_id = SegmentId::new();
495        let segment_hex = segment_id.to_hex();
496        let sender = state.segment_id_sender.clone();
497        let pending_builds = Arc::clone(&state.pending_builds);
498
499        let doc_count = builder.num_docs();
500        let memory_bytes = builder.estimated_memory_bytes();
501
502        log::info!(
503            "[segment_build_started] segment_id={} doc_count={} memory_bytes={}",
504            segment_hex,
505            doc_count,
506            memory_bytes
507        );
508
509        pending_builds.fetch_add(1, Ordering::SeqCst);
510
511        tokio::spawn(async move {
512            let _permit = permit; // held for build duration, released on drop
513            let build_start = std::time::Instant::now();
514            let result = match builder.build(directory.as_ref(), segment_id).await {
515                Ok(meta) => {
516                    let build_duration_ms = build_start.elapsed().as_millis() as u64;
517                    log::info!(
518                        "[segment_build_completed] segment_id={} doc_count={} duration_ms={}",
519                        segment_hex,
520                        meta.num_docs,
521                        build_duration_ms
522                    );
523                    (segment_hex, meta.num_docs)
524                }
525                Err(e) => {
526                    log::error!(
527                        "[segment_build_failed] segment_id={} error={}",
528                        segment_hex,
529                        e
530                    );
531                    eprintln!("Background segment build failed: {:?}", e);
532                    // Signal failure with num_docs=0 so waiters don't block
533                    (segment_hex, 0)
534                }
535            };
536            // Always send to channel and decrement - even on failure
537            // This ensures flush()/commit() doesn't hang waiting for messages
538            let _ = sender.send(result);
539            pending_builds.fetch_sub(1, Ordering::SeqCst);
540        });
541    }
542
543    /// Get the number of pending background builds
544    pub fn pending_build_count(&self) -> usize {
545        self.pending_builds.load(Ordering::SeqCst)
546    }
547
548    /// Get the number of pending background merges
549    pub fn pending_merge_count(&self) -> usize {
550        self.segment_manager.pending_merge_count()
551    }
552
553    /// Check merge policy and spawn background merges if needed
554    ///
555    /// This is called automatically after segment builds complete via SegmentManager.
556    /// Can also be called manually to trigger merge checking.
557    pub async fn maybe_merge(&self) {
558        self.segment_manager.maybe_merge().await;
559    }
560
561    /// Wait for all pending merges to complete
562    pub async fn wait_for_merges(&self) {
563        self.segment_manager.wait_for_merges().await;
564    }
565
566    /// Get the segment tracker for sharing with readers
567    /// This allows readers to acquire snapshots that prevent segment deletion
568    pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
569        self.segment_manager.tracker()
570    }
571
572    /// Acquire a snapshot of current segments for reading
573    /// The snapshot holds references - segments won't be deleted while snapshot exists
574    pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot<D> {
575        self.segment_manager.acquire_snapshot().await
576    }
577
578    /// Clean up orphan segment files that are not registered
579    ///
580    /// This can happen if the process halts after segment files are written
581    /// but before they are registered in segments.json. Call this after opening
582    /// an index to reclaim disk space from incomplete operations.
583    ///
584    /// Returns the number of orphan segments deleted.
585    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
586        self.segment_manager.cleanup_orphan_segments().await
587    }
588
589    /// Flush all workers - serializes in-memory data to segment files on disk
590    ///
591    /// Sends flush signals to all workers, waits for them to acknowledge,
592    /// then waits for ALL pending background builds to complete.
593    /// Completed segments are accumulated in `flushed_segments` but NOT
594    /// registered in metadata - only `commit()` does that.
595    ///
596    /// Workers continue running and can accept new documents after flush.
597    pub async fn flush(&self) -> Result<()> {
598        // Send flush signal to each worker's channel
599        let mut responses = Vec::with_capacity(self.worker_senders.len());
600
601        for sender in &self.worker_senders {
602            let (tx, rx) = oneshot::channel();
603            if sender.send(WorkerMessage::Flush(tx)).is_err() {
604                // Channel closed, worker may have exited
605                continue;
606            }
607            responses.push(rx);
608        }
609
610        // Wait for all workers to acknowledge flush
611        for rx in responses {
612            let _ = rx.await;
613        }
614
615        // Wait for ALL pending builds to complete and collect results
616        let mut receiver = self.segment_id_receiver.lock().await;
617        while self.pending_builds.load(Ordering::SeqCst) > 0 {
618            if let Some((segment_hex, num_docs)) = receiver.recv().await {
619                if num_docs > 0 {
620                    self.flushed_segments
621                        .lock()
622                        .await
623                        .push((segment_hex, num_docs));
624                }
625            } else {
626                break; // Channel closed
627            }
628        }
629
630        // Drain any remaining messages (builds that completed between checks)
631        while let Ok((segment_hex, num_docs)) = receiver.try_recv() {
632            if num_docs > 0 {
633                self.flushed_segments
634                    .lock()
635                    .await
636                    .push((segment_hex, num_docs));
637            }
638        }
639
640        Ok(())
641    }
642
643    /// Commit all pending segments to metadata and wait for completion
644    ///
645    /// Calls `flush()` to serialize all in-memory data to disk, then
646    /// registers flushed segments in metadata. This provides transactional
647    /// semantics: on crash before commit, orphan files are cleaned up by
648    /// `cleanup_orphan_segments()`.
649    ///
650    /// **Auto-triggers vector index build** when threshold is crossed for any field.
651    pub async fn commit(&self) -> Result<()> {
652        // Flush all workers and wait for builds to complete
653        self.flush().await?;
654
655        // Register all flushed segments in metadata
656        let segments = std::mem::take(&mut *self.flushed_segments.lock().await);
657        for (segment_hex, num_docs) in segments {
658            self.segment_manager
659                .register_segment(segment_hex, num_docs)
660                .await?;
661        }
662
663        // Auto-trigger vector index build if threshold crossed
664        self.maybe_build_vector_index().await?;
665
666        Ok(())
667    }
668
669    // Vector index building methods are in vector_builder.rs
670
671    /// Merge all segments into one (called explicitly via force_merge)
672    async fn do_merge(&self) -> Result<()> {
673        let segment_ids = self.segment_manager.get_segment_ids().await;
674
675        if segment_ids.len() < 2 {
676            return Ok(());
677        }
678
679        let ids_to_merge: Vec<String> = segment_ids;
680
681        // Load segment readers
682        let mut readers = Vec::new();
683        let mut doc_offset = 0u32;
684
685        for id_str in &ids_to_merge {
686            let segment_id = SegmentId::from_hex(id_str)
687                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
688            let reader = SegmentReader::open(
689                self.directory.as_ref(),
690                segment_id,
691                Arc::clone(&self.schema),
692                doc_offset,
693                self.config.term_cache_blocks,
694            )
695            .await?;
696            doc_offset += reader.meta().num_docs;
697            readers.push(reader);
698        }
699
700        // Calculate total doc count for the merged segment
701        let total_docs: u32 = readers.iter().map(|r| r.meta().num_docs).sum();
702
703        // Merge into new segment
704        let merger = SegmentMerger::new(Arc::clone(&self.schema));
705        let new_segment_id = SegmentId::new();
706        merger
707            .merge(self.directory.as_ref(), &readers, new_segment_id)
708            .await?;
709
710        // Atomically update segments and delete old ones via SegmentManager
711        self.segment_manager
712            .replace_segments(vec![(new_segment_id.to_hex(), total_docs)], ids_to_merge)
713            .await?;
714
715        Ok(())
716    }
717
718    /// Force merge all segments into one
719    pub async fn force_merge(&self) -> Result<()> {
720        // First commit all pending documents (waits for completion)
721        self.commit().await?;
722        // Wait for any background merges to complete (avoid race with segment deletion)
723        self.wait_for_merges().await;
724        // Then merge all segments
725        self.do_merge().await
726    }
727
728    // Vector index methods (build_vector_index, rebuild_vector_index, etc.)
729    // are implemented in vector_builder.rs
730}