Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter - async document indexing with parallel segment building
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8use rustc_hash::FxHashMap;
9use tokio::sync::Mutex as AsyncMutex;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::JoinHandle;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{
18    SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
19};
20use crate::tokenizer::BoxedTokenizer;
21
22use super::IndexConfig;
23
24/// Message sent to worker tasks
25enum WorkerMessage {
26    /// A document to index
27    Document(Document),
28    /// Signal to flush current builder and respond when done
29    Flush(oneshot::Sender<()>),
30}
31
32/// Async IndexWriter for adding documents and committing segments
33///
34/// Features:
35/// - Queue-based parallel indexing with worker tasks
36/// - Streams documents to disk immediately (no in-memory document storage)
37/// - Uses string interning for terms (reduced allocations)
38/// - Uses hashbrown HashMap (faster than BTreeMap)
39///
40/// **Architecture:**
41/// - `add_document()` sends to per-worker unbounded channels (non-blocking)
42/// - Round-robin distribution across workers - no mutex contention
43/// - Each worker owns a SegmentBuilder and flushes when memory threshold is reached
44///
45/// **State management:**
46/// - Building segments: Managed here (pending_builds)
47/// - Committed segments + metadata: Managed by SegmentManager (sole owner of metadata.json)
48pub struct IndexWriter<D: DirectoryWriter + 'static> {
49    pub(super) directory: Arc<D>,
50    pub(super) schema: Arc<Schema>,
51    pub(super) config: IndexConfig,
52    #[allow(dead_code)] // Used for creating new builders in worker_state
53    builder_config: SegmentBuilderConfig,
54    tokenizers: FxHashMap<Field, BoxedTokenizer>,
55    /// Per-worker channel senders - round-robin distribution
56    worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
57    /// Round-robin counter for worker selection
58    next_worker: AtomicUsize,
59    /// Worker task handles - kept alive to prevent premature shutdown
60    #[allow(dead_code)]
61    workers: Vec<JoinHandle<()>>,
62    /// Shared state for workers
63    #[allow(dead_code)]
64    worker_state: Arc<WorkerState<D>>,
65    /// Segment manager - owns metadata.json, handles segments and background merging
66    pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
67    /// Channel receiver for completed segment IDs and doc counts
68    segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<(String, u32)>>,
69    /// Count of in-flight background builds
70    pending_builds: Arc<AtomicUsize>,
71    /// Segments flushed to disk but not yet registered in metadata
72    flushed_segments: AsyncMutex<Vec<(String, u32)>>,
73}
74
75/// Shared state for worker tasks
76struct WorkerState<D: DirectoryWriter + 'static> {
77    directory: Arc<D>,
78    schema: Arc<Schema>,
79    config: IndexConfig,
80    builder_config: SegmentBuilderConfig,
81    tokenizers: FxHashMap<Field, BoxedTokenizer>,
82    segment_id_sender: mpsc::UnboundedSender<(String, u32)>,
83    pending_builds: Arc<AtomicUsize>,
84    /// Limits concurrent segment builds to prevent OOM from unbounded build parallelism.
85    /// Workers block at acquire, providing natural backpressure.
86    build_semaphore: Arc<tokio::sync::Semaphore>,
87}
88
89impl<D: DirectoryWriter + 'static> IndexWriter<D> {
90    /// Create a new index in the directory
91    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
92        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
93    }
94
95    /// Create a new index with custom builder config
96    pub async fn create_with_config(
97        directory: D,
98        schema: Schema,
99        config: IndexConfig,
100        builder_config: SegmentBuilderConfig,
101    ) -> Result<Self> {
102        let directory = Arc::new(directory);
103        let schema = Arc::new(schema);
104
105        // Create channel for background builds to report completed segment IDs
106        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
107
108        // Initialize metadata with schema
109        let metadata = super::IndexMetadata::new((*schema).clone());
110
111        // Create segment manager - owns metadata.json
112        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
113            Arc::clone(&directory),
114            Arc::clone(&schema),
115            metadata,
116            config.merge_policy.clone_box(),
117            config.term_cache_blocks,
118        ));
119
120        // Save initial metadata
121        segment_manager.update_metadata(|_| {}).await?;
122
123        let pending_builds = Arc::new(AtomicUsize::new(0));
124
125        // Limit concurrent segment builds to prevent OOM.
126        // With N workers, allow at most ceil(N/2) concurrent builds.
127        let num_workers = config.num_indexing_threads.max(1);
128        let max_concurrent_builds = num_workers.div_ceil(2).max(1);
129        let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
130
131        // Create shared worker state
132        let worker_state = Arc::new(WorkerState {
133            directory: Arc::clone(&directory),
134            schema: Arc::clone(&schema),
135            config: config.clone(),
136            builder_config: builder_config.clone(),
137            tokenizers: FxHashMap::default(),
138            segment_id_sender,
139            pending_builds: Arc::clone(&pending_builds),
140            build_semaphore,
141        });
142
143        // Create per-worker unbounded channels and spawn workers
144        let mut worker_senders = Vec::with_capacity(num_workers);
145        let mut workers = Vec::with_capacity(num_workers);
146
147        for _ in 0..num_workers {
148            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
149            worker_senders.push(tx);
150
151            let state = Arc::clone(&worker_state);
152            let handle = tokio::spawn(async move {
153                Self::worker_loop(state, rx).await;
154            });
155            workers.push(handle);
156        }
157
158        Ok(Self {
159            directory,
160            schema,
161            config,
162            builder_config,
163            tokenizers: FxHashMap::default(),
164            worker_senders,
165            next_worker: AtomicUsize::new(0),
166            workers,
167            worker_state,
168            segment_manager,
169            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
170            pending_builds,
171            flushed_segments: AsyncMutex::new(Vec::new()),
172        })
173    }
174
175    /// Open an existing index for writing
176    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
177        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
178    }
179
180    /// Open an existing index with custom builder config
181    pub async fn open_with_config(
182        directory: D,
183        config: IndexConfig,
184        builder_config: SegmentBuilderConfig,
185    ) -> Result<Self> {
186        let directory = Arc::new(directory);
187
188        // Load unified metadata (includes schema)
189        let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
190        let schema = Arc::new(metadata.schema.clone());
191
192        // Create channel for background builds to report completed segment IDs
193        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
194
195        // Create segment manager - owns metadata.json
196        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
197            Arc::clone(&directory),
198            Arc::clone(&schema),
199            metadata,
200            config.merge_policy.clone_box(),
201            config.term_cache_blocks,
202        ));
203
204        let pending_builds = Arc::new(AtomicUsize::new(0));
205
206        // Limit concurrent segment builds to prevent OOM.
207        let num_workers = config.num_indexing_threads.max(1);
208        let max_concurrent_builds = num_workers.div_ceil(2).max(1);
209        let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
210
211        // Create shared worker state
212        let worker_state = Arc::new(WorkerState {
213            directory: Arc::clone(&directory),
214            schema: Arc::clone(&schema),
215            config: config.clone(),
216            builder_config: builder_config.clone(),
217            tokenizers: FxHashMap::default(),
218            segment_id_sender,
219            pending_builds: Arc::clone(&pending_builds),
220            build_semaphore,
221        });
222
223        // Create per-worker unbounded channels and spawn workers
224        let mut worker_senders = Vec::with_capacity(num_workers);
225        let mut workers = Vec::with_capacity(num_workers);
226
227        for _ in 0..num_workers {
228            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
229            worker_senders.push(tx);
230
231            let state = Arc::clone(&worker_state);
232            let handle = tokio::spawn(async move {
233                Self::worker_loop(state, rx).await;
234            });
235            workers.push(handle);
236        }
237
238        Ok(Self {
239            directory,
240            schema,
241            config,
242            builder_config,
243            tokenizers: FxHashMap::default(),
244            worker_senders,
245            next_worker: AtomicUsize::new(0),
246            workers,
247            worker_state,
248            segment_manager,
249            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
250            pending_builds,
251            flushed_segments: AsyncMutex::new(Vec::new()),
252        })
253    }
254
255    /// Create an IndexWriter from an existing Index
256    ///
257    /// This shares the SegmentManager with the Index, ensuring consistent
258    /// segment lifecycle management.
259    pub fn from_index(index: &super::Index<D>) -> Self {
260        let segment_manager = Arc::clone(&index.segment_manager);
261        let directory = Arc::clone(&index.directory);
262        let schema = Arc::clone(&index.schema);
263        let config = index.config.clone();
264        let builder_config = crate::segment::SegmentBuilderConfig::default();
265
266        // Create channel for background builds
267        let (segment_id_sender, segment_id_receiver) = tokio::sync::mpsc::unbounded_channel();
268
269        let pending_builds = Arc::new(AtomicUsize::new(0));
270
271        // Limit concurrent segment builds to prevent OOM.
272        let num_workers = config.num_indexing_threads.max(1);
273        let max_concurrent_builds = num_workers.div_ceil(2).max(1);
274        let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
275
276        // Create shared worker state
277        let worker_state = Arc::new(WorkerState {
278            directory: Arc::clone(&directory),
279            schema: Arc::clone(&schema),
280            config: config.clone(),
281            builder_config: builder_config.clone(),
282            tokenizers: FxHashMap::default(),
283            segment_id_sender,
284            pending_builds: Arc::clone(&pending_builds),
285            build_semaphore,
286        });
287
288        // Create per-worker channels and spawn workers
289        let mut worker_senders = Vec::with_capacity(num_workers);
290        let mut workers = Vec::with_capacity(num_workers);
291
292        for _ in 0..num_workers {
293            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<WorkerMessage>();
294            worker_senders.push(tx);
295
296            let state = Arc::clone(&worker_state);
297            let handle = tokio::spawn(async move {
298                Self::worker_loop(state, rx).await;
299            });
300            workers.push(handle);
301        }
302
303        Self {
304            directory,
305            schema,
306            config,
307            builder_config,
308            tokenizers: FxHashMap::default(),
309            worker_senders,
310            next_worker: AtomicUsize::new(0),
311            workers,
312            worker_state,
313            segment_manager,
314            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
315            pending_builds,
316            flushed_segments: AsyncMutex::new(Vec::new()),
317        }
318    }
319
320    /// Get the schema
321    pub fn schema(&self) -> &Schema {
322        &self.schema
323    }
324
325    /// Set tokenizer for a field
326    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
327        self.tokenizers.insert(field, Box::new(tokenizer));
328    }
329
330    /// Add a document to the indexing queue
331    ///
332    /// Documents are sent to per-worker unbounded channels.
333    /// This is O(1) and never blocks - returns immediately.
334    /// Workers handle the actual indexing in parallel.
335    pub fn add_document(&self, doc: Document) -> Result<DocId> {
336        // Round-robin select worker
337        let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
338        self.worker_senders[idx]
339            .send(WorkerMessage::Document(doc))
340            .map_err(|_| Error::Internal("Document channel closed".into()))?;
341        Ok(0)
342    }
343
344    /// Add multiple documents to the indexing queue
345    ///
346    /// Documents are distributed round-robin to workers.
347    /// Returns immediately - never blocks.
348    pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
349        let num_workers = self.worker_senders.len();
350        let count = documents.len();
351        let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
352        for (i, doc) in documents.into_iter().enumerate() {
353            let idx = (base + i) % num_workers;
354            let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
355        }
356        Ok(count)
357    }
358
359    /// Worker loop - polls messages from its own channel and indexes documents
360    async fn worker_loop(
361        state: Arc<WorkerState<D>>,
362        mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
363    ) {
364        let mut builder: Option<SegmentBuilder> = None;
365        let mut _doc_count = 0u32;
366
367        loop {
368            // Receive from own channel - no mutex contention
369            let msg = receiver.recv().await;
370
371            let Some(msg) = msg else {
372                // Channel closed - flush remaining docs and exit
373                if let Some(b) = builder.take()
374                    && b.num_docs() > 0
375                {
376                    Self::spawn_segment_build(&state, b).await;
377                }
378                return;
379            };
380
381            match msg {
382                WorkerMessage::Document(doc) => {
383                    // Initialize builder if needed
384                    if builder.is_none() {
385                        match SegmentBuilder::new(
386                            (*state.schema).clone(),
387                            state.builder_config.clone(),
388                        ) {
389                            Ok(mut b) => {
390                                for (field, tokenizer) in &state.tokenizers {
391                                    b.set_tokenizer(*field, tokenizer.clone_box());
392                                }
393                                builder = Some(b);
394                            }
395                            Err(e) => {
396                                eprintln!("Failed to create segment builder: {:?}", e);
397                                continue;
398                            }
399                        }
400                    }
401
402                    // Index the document
403                    let b = builder.as_mut().unwrap();
404                    if let Err(e) = b.add_document(doc) {
405                        eprintln!("Failed to index document: {:?}", e);
406                        continue;
407                    }
408
409                    _doc_count += 1;
410
411                    // Periodically recalibrate memory estimate using capacity-based
412                    // calculation. The incremental tracker undercounts by ~33% because
413                    // Vec::push doubles capacity but we only track element sizes.
414                    if b.num_docs().is_multiple_of(1000) {
415                        b.recalibrate_memory();
416                    }
417
418                    // Check memory after every document - O(1) with incremental tracking.
419                    // Always reserve 2x headroom for build-phase memory amplification:
420                    // when commit flushes all workers simultaneously, their builders stay
421                    // in memory during serialization, effectively doubling peak usage.
422                    let in_flight = state.pending_builds.load(Ordering::Relaxed);
423                    let num_workers = state.config.num_indexing_threads.max(1);
424                    let effective_slots = num_workers * 2 + in_flight * 2;
425                    let per_worker_limit = state.config.max_indexing_memory_bytes / effective_slots;
426                    let builder_memory = b.estimated_memory_bytes();
427
428                    // Log memory usage periodically
429                    if _doc_count.is_multiple_of(10_000) {
430                        log::debug!(
431                            "[indexing] docs={}, memory={:.2} MB, limit={:.2} MB",
432                            b.num_docs(),
433                            builder_memory as f64 / (1024.0 * 1024.0),
434                            per_worker_limit as f64 / (1024.0 * 1024.0)
435                        );
436                    }
437
438                    // Require minimum 100 docs before flushing to avoid tiny segments
439                    // (sparse vectors with many dims can hit memory limit quickly)
440                    const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
441                    let doc_count = b.num_docs();
442
443                    if builder_memory >= per_worker_limit && doc_count >= MIN_DOCS_BEFORE_FLUSH {
444                        // Get detailed stats for debugging memory issues
445                        let stats = b.stats();
446                        let mb = stats.memory_breakdown;
447                        log::info!(
448                            "[indexing] flushing segment: docs={}, est_mem={:.2} MB, actual_mem={:.2} MB, \
449                             postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB, interner={:.2} MB, \
450                             unique_terms={}, sparse_dims={}",
451                            doc_count,
452                            builder_memory as f64 / (1024.0 * 1024.0),
453                            stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
454                            mb.postings_bytes as f64 / (1024.0 * 1024.0),
455                            mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
456                            mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
457                            mb.interner_bytes as f64 / (1024.0 * 1024.0),
458                            stats.unique_terms,
459                            b.sparse_dim_count(),
460                        );
461                        let full_builder = builder.take().unwrap();
462                        Self::spawn_segment_build(&state, full_builder).await;
463                        _doc_count = 0;
464                    }
465                }
466                WorkerMessage::Flush(respond) => {
467                    // Flush current builder if it has documents
468                    if let Some(b) = builder.take()
469                        && b.num_docs() > 0
470                    {
471                        // Log detailed memory breakdown on flush
472                        let stats = b.stats();
473                        let mb = stats.memory_breakdown;
474                        log::info!(
475                            "[indexing_flush] docs={}, total_mem={:.2} MB, \
476                             postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB ({} vectors), \
477                             interner={:.2} MB, positions={:.2} MB, unique_terms={}",
478                            b.num_docs(),
479                            stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
480                            mb.postings_bytes as f64 / (1024.0 * 1024.0),
481                            mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
482                            mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
483                            mb.dense_vector_count,
484                            mb.interner_bytes as f64 / (1024.0 * 1024.0),
485                            mb.position_index_bytes as f64 / (1024.0 * 1024.0),
486                            stats.unique_terms,
487                        );
488                        Self::spawn_segment_build(&state, b).await;
489                    }
490                    _doc_count = 0;
491                    // Signal that flush is complete for this worker
492                    let _ = respond.send(());
493                }
494            }
495        }
496    }
497    async fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
498        // Acquire semaphore permit before spawning - blocks if too many builds in flight.
499        // This provides backpressure: workers pause indexing until a build slot opens.
500        let permit = state.build_semaphore.clone().acquire_owned().await.unwrap();
501
502        let directory = Arc::clone(&state.directory);
503        let segment_id = SegmentId::new();
504        let segment_hex = segment_id.to_hex();
505        let sender = state.segment_id_sender.clone();
506        let pending_builds = Arc::clone(&state.pending_builds);
507
508        let doc_count = builder.num_docs();
509        let memory_bytes = builder.estimated_memory_bytes();
510
511        log::info!(
512            "[segment_build_started] segment_id={} doc_count={} memory_bytes={}",
513            segment_hex,
514            doc_count,
515            memory_bytes
516        );
517
518        pending_builds.fetch_add(1, Ordering::SeqCst);
519
520        tokio::spawn(async move {
521            let _permit = permit; // held for build duration, released on drop
522            let build_start = std::time::Instant::now();
523            let result = match builder.build(directory.as_ref(), segment_id).await {
524                Ok(meta) => {
525                    let build_duration_ms = build_start.elapsed().as_millis() as u64;
526                    log::info!(
527                        "[segment_build_completed] segment_id={} doc_count={} duration_ms={}",
528                        segment_hex,
529                        meta.num_docs,
530                        build_duration_ms
531                    );
532                    (segment_hex, meta.num_docs)
533                }
534                Err(e) => {
535                    log::error!(
536                        "[segment_build_failed] segment_id={} error={}",
537                        segment_hex,
538                        e
539                    );
540                    eprintln!("Background segment build failed: {:?}", e);
541                    // Signal failure with num_docs=0 so waiters don't block
542                    (segment_hex, 0)
543                }
544            };
545            // Always send to channel and decrement - even on failure
546            // This ensures flush()/commit() doesn't hang waiting for messages
547            let _ = sender.send(result);
548            pending_builds.fetch_sub(1, Ordering::SeqCst);
549        });
550    }
551
552    /// Get the number of pending background builds
553    pub fn pending_build_count(&self) -> usize {
554        self.pending_builds.load(Ordering::SeqCst)
555    }
556
557    /// Get the number of pending background merges
558    pub fn pending_merge_count(&self) -> usize {
559        self.segment_manager.pending_merge_count()
560    }
561
562    /// Check merge policy and spawn background merges if needed
563    ///
564    /// This is called automatically after segment builds complete via SegmentManager.
565    /// Can also be called manually to trigger merge checking.
566    pub async fn maybe_merge(&self) {
567        self.segment_manager.maybe_merge().await;
568    }
569
570    /// Wait for all pending merges to complete
571    pub async fn wait_for_merges(&self) {
572        self.segment_manager.wait_for_merges().await;
573    }
574
575    /// Get the segment tracker for sharing with readers
576    /// This allows readers to acquire snapshots that prevent segment deletion
577    pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
578        self.segment_manager.tracker()
579    }
580
581    /// Acquire a snapshot of current segments for reading
582    /// The snapshot holds references - segments won't be deleted while snapshot exists
583    pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot<D> {
584        self.segment_manager.acquire_snapshot().await
585    }
586
587    /// Clean up orphan segment files that are not registered
588    ///
589    /// This can happen if the process halts after segment files are written
590    /// but before they are registered in segments.json. Call this after opening
591    /// an index to reclaim disk space from incomplete operations.
592    ///
593    /// Returns the number of orphan segments deleted.
594    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
595        self.segment_manager.cleanup_orphan_segments().await
596    }
597
598    /// Flush all workers - serializes in-memory data to segment files on disk
599    ///
600    /// Sends flush signals to all workers, waits for them to acknowledge,
601    /// then waits for ALL pending background builds to complete.
602    /// Completed segments are accumulated in `flushed_segments` but NOT
603    /// registered in metadata - only `commit()` does that.
604    ///
605    /// Workers continue running and can accept new documents after flush.
606    pub async fn flush(&self) -> Result<()> {
607        // Send flush signal to each worker's channel
608        let mut responses = Vec::with_capacity(self.worker_senders.len());
609
610        for sender in &self.worker_senders {
611            let (tx, rx) = oneshot::channel();
612            if sender.send(WorkerMessage::Flush(tx)).is_err() {
613                // Channel closed, worker may have exited
614                continue;
615            }
616            responses.push(rx);
617        }
618
619        // Wait for all workers to acknowledge flush
620        for rx in responses {
621            let _ = rx.await;
622        }
623
624        // Wait for ALL pending builds to complete and collect results
625        let mut receiver = self.segment_id_receiver.lock().await;
626        while self.pending_builds.load(Ordering::SeqCst) > 0 {
627            if let Some((segment_hex, num_docs)) = receiver.recv().await {
628                if num_docs > 0 {
629                    self.flushed_segments
630                        .lock()
631                        .await
632                        .push((segment_hex, num_docs));
633                }
634            } else {
635                break; // Channel closed
636            }
637        }
638
639        // Drain any remaining messages (builds that completed between checks)
640        while let Ok((segment_hex, num_docs)) = receiver.try_recv() {
641            if num_docs > 0 {
642                self.flushed_segments
643                    .lock()
644                    .await
645                    .push((segment_hex, num_docs));
646            }
647        }
648
649        Ok(())
650    }
651
652    /// Commit all pending segments to metadata and wait for completion
653    ///
654    /// Calls `flush()` to serialize all in-memory data to disk, then
655    /// registers flushed segments in metadata. This provides transactional
656    /// semantics: on crash before commit, orphan files are cleaned up by
657    /// `cleanup_orphan_segments()`.
658    ///
659    /// **Auto-triggers vector index build** when threshold is crossed for any field.
660    pub async fn commit(&self) -> Result<()> {
661        // Flush all workers and wait for builds to complete
662        self.flush().await?;
663
664        // Register all flushed segments in metadata
665        let segments = std::mem::take(&mut *self.flushed_segments.lock().await);
666        for (segment_hex, num_docs) in segments {
667            self.segment_manager
668                .register_segment(segment_hex, num_docs)
669                .await?;
670        }
671
672        // Auto-trigger vector index build if threshold crossed
673        self.maybe_build_vector_index().await?;
674
675        Ok(())
676    }
677
678    // Vector index building methods are in vector_builder.rs
679
680    /// Merge all segments into one (called explicitly via force_merge)
681    async fn do_merge(&self) -> Result<()> {
682        let segment_ids = self.segment_manager.get_segment_ids().await;
683
684        if segment_ids.len() < 2 {
685            return Ok(());
686        }
687
688        let ids_to_merge: Vec<String> = segment_ids;
689
690        // Load segment readers
691        let mut readers = Vec::new();
692        let mut doc_offset = 0u32;
693
694        for id_str in &ids_to_merge {
695            let segment_id = SegmentId::from_hex(id_str)
696                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
697            let reader = SegmentReader::open(
698                self.directory.as_ref(),
699                segment_id,
700                Arc::clone(&self.schema),
701                doc_offset,
702                self.config.term_cache_blocks,
703            )
704            .await?;
705            doc_offset += reader.meta().num_docs;
706            readers.push(reader);
707        }
708
709        // Calculate total doc count for the merged segment
710        let total_docs: u32 = readers.iter().map(|r| r.meta().num_docs).sum();
711
712        // Merge into new segment
713        let merger = SegmentMerger::new(Arc::clone(&self.schema));
714        let new_segment_id = SegmentId::new();
715        merger
716            .merge(self.directory.as_ref(), &readers, new_segment_id)
717            .await
718            .map(|_| ())?;
719
720        // Atomically update segments and delete old ones via SegmentManager
721        self.segment_manager
722            .replace_segments(vec![(new_segment_id.to_hex(), total_docs)], ids_to_merge)
723            .await?;
724
725        Ok(())
726    }
727
728    /// Force merge all segments into one
729    pub async fn force_merge(&self) -> Result<()> {
730        // First commit all pending documents (waits for completion)
731        self.commit().await?;
732        // Wait for any background merges to complete (avoid race with segment deletion)
733        self.wait_for_merges().await;
734        // Then merge all segments
735        self.do_merge().await
736    }
737
738    // Vector index methods (build_vector_index, rebuild_vector_index, etc.)
739    // are implemented in vector_builder.rs
740}