Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter - async document indexing with parallel segment building
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8use rustc_hash::FxHashMap;
9use tokio::sync::Mutex as AsyncMutex;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::JoinHandle;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{
18    SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
19};
20use crate::tokenizer::BoxedTokenizer;
21
22use super::IndexConfig;
23
24/// Message sent to worker tasks
25enum WorkerMessage {
26    /// A document to index
27    Document(Document),
28    /// Signal to flush current builder and respond when done
29    Flush(oneshot::Sender<()>),
30}
31
32/// Async IndexWriter for adding documents and committing segments
33///
34/// Features:
35/// - Queue-based parallel indexing with worker tasks
36/// - Streams documents to disk immediately (no in-memory document storage)
37/// - Uses string interning for terms (reduced allocations)
38/// - Uses hashbrown HashMap (faster than BTreeMap)
39///
40/// **Architecture:**
41/// - `add_document()` sends to per-worker unbounded channels (non-blocking)
42/// - Round-robin distribution across workers - no mutex contention
43/// - Each worker owns a SegmentBuilder and flushes when memory threshold is reached
44///
45/// **State management:**
46/// - Building segments: Managed here (pending_builds)
47/// - Committed segments + metadata: Managed by SegmentManager (sole owner of metadata.json)
48pub struct IndexWriter<D: DirectoryWriter + 'static> {
49    pub(super) directory: Arc<D>,
50    pub(super) schema: Arc<Schema>,
51    pub(super) config: IndexConfig,
52    #[allow(dead_code)] // Used for creating new builders in worker_state
53    builder_config: SegmentBuilderConfig,
54    tokenizers: FxHashMap<Field, BoxedTokenizer>,
55    /// Per-worker channel senders - round-robin distribution
56    worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
57    /// Round-robin counter for worker selection
58    next_worker: AtomicUsize,
59    /// Worker task handles - kept alive to prevent premature shutdown
60    #[allow(dead_code)]
61    workers: Vec<JoinHandle<()>>,
62    /// Shared state for workers
63    #[allow(dead_code)]
64    worker_state: Arc<WorkerState<D>>,
65    /// Segment manager - owns metadata.json, handles segments and background merging
66    pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
67    /// Channel receiver for completed segment IDs and doc counts
68    segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<(String, u32)>>,
69    /// Count of in-flight background builds
70    pending_builds: Arc<AtomicUsize>,
71    /// Segments flushed to disk but not yet registered in metadata
72    flushed_segments: AsyncMutex<Vec<(String, u32)>>,
73    /// Global memory usage across all builders (bytes)
74    #[allow(dead_code)]
75    global_memory_bytes: Arc<AtomicUsize>,
76}
77
78/// Shared state for worker tasks
79struct WorkerState<D: DirectoryWriter + 'static> {
80    directory: Arc<D>,
81    schema: Arc<Schema>,
82    config: IndexConfig,
83    builder_config: SegmentBuilderConfig,
84    tokenizers: FxHashMap<Field, BoxedTokenizer>,
85    segment_id_sender: mpsc::UnboundedSender<(String, u32)>,
86    pending_builds: Arc<AtomicUsize>,
87}
88
89impl<D: DirectoryWriter + 'static> IndexWriter<D> {
90    /// Create a new index in the directory
91    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
92        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
93    }
94
95    /// Create a new index with custom builder config
96    pub async fn create_with_config(
97        directory: D,
98        schema: Schema,
99        config: IndexConfig,
100        builder_config: SegmentBuilderConfig,
101    ) -> Result<Self> {
102        let directory = Arc::new(directory);
103        let schema = Arc::new(schema);
104
105        // Create channel for background builds to report completed segment IDs
106        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
107
108        // Initialize metadata with schema
109        let metadata = super::IndexMetadata::new((*schema).clone());
110
111        // Create segment manager - owns metadata.json
112        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
113            Arc::clone(&directory),
114            Arc::clone(&schema),
115            metadata,
116            config.merge_policy.clone_box(),
117            config.term_cache_blocks,
118        ));
119
120        // Save initial metadata
121        segment_manager.update_metadata(|_| {}).await?;
122
123        let pending_builds = Arc::new(AtomicUsize::new(0));
124        let global_memory_bytes = Arc::new(AtomicUsize::new(0));
125
126        // Create shared worker state
127        let worker_state = Arc::new(WorkerState {
128            directory: Arc::clone(&directory),
129            schema: Arc::clone(&schema),
130            config: config.clone(),
131            builder_config: builder_config.clone(),
132            tokenizers: FxHashMap::default(),
133            segment_id_sender,
134            pending_builds: Arc::clone(&pending_builds),
135        });
136
137        // Create per-worker unbounded channels and spawn workers
138        let num_workers = config.num_indexing_threads.max(1);
139        let mut worker_senders = Vec::with_capacity(num_workers);
140        let mut workers = Vec::with_capacity(num_workers);
141
142        for _ in 0..num_workers {
143            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
144            worker_senders.push(tx);
145
146            let state = Arc::clone(&worker_state);
147            let handle = tokio::spawn(async move {
148                Self::worker_loop(state, rx).await;
149            });
150            workers.push(handle);
151        }
152
153        Ok(Self {
154            directory,
155            schema,
156            config,
157            builder_config,
158            tokenizers: FxHashMap::default(),
159            worker_senders,
160            next_worker: AtomicUsize::new(0),
161            workers,
162            worker_state,
163            segment_manager,
164            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
165            pending_builds,
166            global_memory_bytes,
167            flushed_segments: AsyncMutex::new(Vec::new()),
168        })
169    }
170
171    /// Open an existing index for writing
172    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
173        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
174    }
175
176    /// Open an existing index with custom builder config
177    pub async fn open_with_config(
178        directory: D,
179        config: IndexConfig,
180        builder_config: SegmentBuilderConfig,
181    ) -> Result<Self> {
182        let directory = Arc::new(directory);
183
184        // Load unified metadata (includes schema)
185        let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
186        let schema = Arc::new(metadata.schema.clone());
187
188        // Create channel for background builds to report completed segment IDs
189        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
190
191        // Create segment manager - owns metadata.json
192        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
193            Arc::clone(&directory),
194            Arc::clone(&schema),
195            metadata,
196            config.merge_policy.clone_box(),
197            config.term_cache_blocks,
198        ));
199
200        let pending_builds = Arc::new(AtomicUsize::new(0));
201        let global_memory_bytes = Arc::new(AtomicUsize::new(0));
202
203        // Create shared worker state
204        let worker_state = Arc::new(WorkerState {
205            directory: Arc::clone(&directory),
206            schema: Arc::clone(&schema),
207            config: config.clone(),
208            builder_config: builder_config.clone(),
209            tokenizers: FxHashMap::default(),
210            segment_id_sender,
211            pending_builds: Arc::clone(&pending_builds),
212        });
213
214        // Create per-worker unbounded channels and spawn workers
215        let num_workers = config.num_indexing_threads.max(1);
216        let mut worker_senders = Vec::with_capacity(num_workers);
217        let mut workers = Vec::with_capacity(num_workers);
218
219        for _ in 0..num_workers {
220            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
221            worker_senders.push(tx);
222
223            let state = Arc::clone(&worker_state);
224            let handle = tokio::spawn(async move {
225                Self::worker_loop(state, rx).await;
226            });
227            workers.push(handle);
228        }
229
230        Ok(Self {
231            directory,
232            schema,
233            config,
234            builder_config,
235            tokenizers: FxHashMap::default(),
236            worker_senders,
237            next_worker: AtomicUsize::new(0),
238            workers,
239            worker_state,
240            segment_manager,
241            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
242            pending_builds,
243            global_memory_bytes,
244            flushed_segments: AsyncMutex::new(Vec::new()),
245        })
246    }
247
248    /// Create an IndexWriter from an existing Index
249    ///
250    /// This shares the SegmentManager with the Index, ensuring consistent
251    /// segment lifecycle management.
252    pub fn from_index(index: &super::Index<D>) -> Self {
253        let segment_manager = Arc::clone(&index.segment_manager);
254        let directory = Arc::clone(&index.directory);
255        let schema = Arc::clone(&index.schema);
256        let config = index.config.clone();
257        let builder_config = crate::segment::SegmentBuilderConfig::default();
258
259        // Create channel for background builds
260        let (segment_id_sender, segment_id_receiver) = tokio::sync::mpsc::unbounded_channel();
261
262        let pending_builds = Arc::new(AtomicUsize::new(0));
263        let global_memory_bytes = Arc::new(AtomicUsize::new(0));
264
265        // Create shared worker state
266        let worker_state = Arc::new(WorkerState {
267            directory: Arc::clone(&directory),
268            schema: Arc::clone(&schema),
269            config: config.clone(),
270            builder_config: builder_config.clone(),
271            tokenizers: FxHashMap::default(),
272            segment_id_sender,
273            pending_builds: Arc::clone(&pending_builds),
274        });
275
276        // Create per-worker channels and spawn workers
277        let num_workers = config.num_indexing_threads.max(1);
278        let mut worker_senders = Vec::with_capacity(num_workers);
279        let mut workers = Vec::with_capacity(num_workers);
280
281        for _ in 0..num_workers {
282            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<WorkerMessage>();
283            worker_senders.push(tx);
284
285            let state = Arc::clone(&worker_state);
286            let handle = tokio::spawn(async move {
287                Self::worker_loop(state, rx).await;
288            });
289            workers.push(handle);
290        }
291
292        Self {
293            directory,
294            schema,
295            config,
296            builder_config,
297            tokenizers: FxHashMap::default(),
298            worker_senders,
299            next_worker: AtomicUsize::new(0),
300            workers,
301            worker_state,
302            segment_manager,
303            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
304            pending_builds,
305            global_memory_bytes,
306            flushed_segments: AsyncMutex::new(Vec::new()),
307        }
308    }
309
310    /// Get the schema
311    pub fn schema(&self) -> &Schema {
312        &self.schema
313    }
314
315    /// Set tokenizer for a field
316    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
317        self.tokenizers.insert(field, Box::new(tokenizer));
318    }
319
320    /// Add a document to the indexing queue
321    ///
322    /// Documents are sent to per-worker unbounded channels.
323    /// This is O(1) and never blocks - returns immediately.
324    /// Workers handle the actual indexing in parallel.
325    pub fn add_document(&self, doc: Document) -> Result<DocId> {
326        // Round-robin select worker
327        let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
328        self.worker_senders[idx]
329            .send(WorkerMessage::Document(doc))
330            .map_err(|_| Error::Internal("Document channel closed".into()))?;
331        Ok(0)
332    }
333
334    /// Add multiple documents to the indexing queue
335    ///
336    /// Documents are distributed round-robin to workers.
337    /// Returns immediately - never blocks.
338    pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
339        let num_workers = self.worker_senders.len();
340        let count = documents.len();
341        let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
342        for (i, doc) in documents.into_iter().enumerate() {
343            let idx = (base + i) % num_workers;
344            let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
345        }
346        Ok(count)
347    }
348
349    /// Worker loop - polls messages from its own channel and indexes documents
350    async fn worker_loop(
351        state: Arc<WorkerState<D>>,
352        mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
353    ) {
354        let mut builder: Option<SegmentBuilder> = None;
355        let mut _doc_count = 0u32;
356
357        loop {
358            // Receive from own channel - no mutex contention
359            let msg = receiver.recv().await;
360
361            let Some(msg) = msg else {
362                // Channel closed - flush remaining docs and exit
363                if let Some(b) = builder.take()
364                    && b.num_docs() > 0
365                {
366                    Self::spawn_segment_build(&state, b);
367                }
368                return;
369            };
370
371            match msg {
372                WorkerMessage::Document(doc) => {
373                    // Initialize builder if needed
374                    if builder.is_none() {
375                        match SegmentBuilder::new(
376                            (*state.schema).clone(),
377                            state.builder_config.clone(),
378                        ) {
379                            Ok(mut b) => {
380                                for (field, tokenizer) in &state.tokenizers {
381                                    b.set_tokenizer(*field, tokenizer.clone_box());
382                                }
383                                builder = Some(b);
384                            }
385                            Err(e) => {
386                                eprintln!("Failed to create segment builder: {:?}", e);
387                                continue;
388                            }
389                        }
390                    }
391
392                    // Index the document
393                    let b = builder.as_mut().unwrap();
394                    if let Err(e) = b.add_document(doc) {
395                        eprintln!("Failed to index document: {:?}", e);
396                        continue;
397                    }
398
399                    _doc_count += 1;
400
401                    // Check memory after every document - O(1) with incremental tracking
402                    let per_worker_limit = state.config.max_indexing_memory_bytes
403                        / state.config.num_indexing_threads.max(1);
404                    let builder_memory = b.estimated_memory_bytes();
405
406                    // Log memory usage periodically
407                    if _doc_count.is_multiple_of(10_000) {
408                        log::debug!(
409                            "[indexing] docs={}, memory={:.2} MB, limit={:.2} MB",
410                            b.num_docs(),
411                            builder_memory as f64 / (1024.0 * 1024.0),
412                            per_worker_limit as f64 / (1024.0 * 1024.0)
413                        );
414                    }
415
416                    // Require minimum 100 docs before flushing to avoid tiny segments
417                    // (sparse vectors with many dims can hit memory limit quickly)
418                    const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
419                    let doc_count = b.num_docs();
420
421                    if builder_memory >= per_worker_limit && doc_count >= MIN_DOCS_BEFORE_FLUSH {
422                        // Get detailed stats for debugging memory issues
423                        let stats = b.stats();
424                        let mb = stats.memory_breakdown;
425                        log::info!(
426                            "[indexing] flushing segment: docs={}, est_mem={:.2} MB, actual_mem={:.2} MB, \
427                             postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB, interner={:.2} MB, \
428                             unique_terms={}, sparse_dims={}",
429                            doc_count,
430                            builder_memory as f64 / (1024.0 * 1024.0),
431                            stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
432                            mb.postings_bytes as f64 / (1024.0 * 1024.0),
433                            mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
434                            mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
435                            mb.interner_bytes as f64 / (1024.0 * 1024.0),
436                            stats.unique_terms,
437                            b.sparse_dim_count(),
438                        );
439                        let full_builder = builder.take().unwrap();
440                        Self::spawn_segment_build(&state, full_builder);
441                        _doc_count = 0;
442                    }
443                }
444                WorkerMessage::Flush(respond) => {
445                    // Flush current builder if it has documents
446                    if let Some(b) = builder.take()
447                        && b.num_docs() > 0
448                    {
449                        // Log detailed memory breakdown on flush
450                        let stats = b.stats();
451                        let mb = stats.memory_breakdown;
452                        log::info!(
453                            "[indexing_flush] docs={}, total_mem={:.2} MB, \
454                             postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB ({} vectors), \
455                             interner={:.2} MB, positions={:.2} MB, unique_terms={}",
456                            b.num_docs(),
457                            stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
458                            mb.postings_bytes as f64 / (1024.0 * 1024.0),
459                            mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
460                            mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
461                            mb.dense_vector_count,
462                            mb.interner_bytes as f64 / (1024.0 * 1024.0),
463                            mb.position_index_bytes as f64 / (1024.0 * 1024.0),
464                            stats.unique_terms,
465                        );
466                        Self::spawn_segment_build(&state, b);
467                    }
468                    _doc_count = 0;
469                    // Signal that flush is complete for this worker
470                    let _ = respond.send(());
471                }
472            }
473        }
474    }
475    fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
476        let directory = Arc::clone(&state.directory);
477        let segment_id = SegmentId::new();
478        let segment_hex = segment_id.to_hex();
479        let sender = state.segment_id_sender.clone();
480        let pending_builds = Arc::clone(&state.pending_builds);
481
482        let doc_count = builder.num_docs();
483        let memory_bytes = builder.estimated_memory_bytes();
484
485        log::info!(
486            "[segment_build_started] segment_id={} doc_count={} memory_bytes={}",
487            segment_hex,
488            doc_count,
489            memory_bytes
490        );
491
492        pending_builds.fetch_add(1, Ordering::SeqCst);
493
494        tokio::spawn(async move {
495            let build_start = std::time::Instant::now();
496            let result = match builder.build(directory.as_ref(), segment_id).await {
497                Ok(meta) => {
498                    let build_duration_ms = build_start.elapsed().as_millis() as u64;
499                    log::info!(
500                        "[segment_build_completed] segment_id={} doc_count={} duration_ms={}",
501                        segment_hex,
502                        meta.num_docs,
503                        build_duration_ms
504                    );
505                    (segment_hex, meta.num_docs)
506                }
507                Err(e) => {
508                    log::error!(
509                        "[segment_build_failed] segment_id={} error={}",
510                        segment_hex,
511                        e
512                    );
513                    eprintln!("Background segment build failed: {:?}", e);
514                    // Signal failure with num_docs=0 so waiters don't block
515                    (segment_hex, 0)
516                }
517            };
518            // Always send to channel and decrement - even on failure
519            // This ensures flush()/commit() doesn't hang waiting for messages
520            let _ = sender.send(result);
521            pending_builds.fetch_sub(1, Ordering::SeqCst);
522        });
523    }
524
525    /// Get the number of pending background builds
526    pub fn pending_build_count(&self) -> usize {
527        self.pending_builds.load(Ordering::SeqCst)
528    }
529
530    /// Get the number of pending background merges
531    pub fn pending_merge_count(&self) -> usize {
532        self.segment_manager.pending_merge_count()
533    }
534
535    /// Check merge policy and spawn background merges if needed
536    ///
537    /// This is called automatically after segment builds complete via SegmentManager.
538    /// Can also be called manually to trigger merge checking.
539    pub async fn maybe_merge(&self) {
540        self.segment_manager.maybe_merge().await;
541    }
542
543    /// Wait for all pending merges to complete
544    pub async fn wait_for_merges(&self) {
545        self.segment_manager.wait_for_merges().await;
546    }
547
548    /// Get the segment tracker for sharing with readers
549    /// This allows readers to acquire snapshots that prevent segment deletion
550    pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
551        self.segment_manager.tracker()
552    }
553
554    /// Acquire a snapshot of current segments for reading
555    /// The snapshot holds references - segments won't be deleted while snapshot exists
556    pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot<D> {
557        self.segment_manager.acquire_snapshot().await
558    }
559
560    /// Clean up orphan segment files that are not registered
561    ///
562    /// This can happen if the process halts after segment files are written
563    /// but before they are registered in segments.json. Call this after opening
564    /// an index to reclaim disk space from incomplete operations.
565    ///
566    /// Returns the number of orphan segments deleted.
567    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
568        self.segment_manager.cleanup_orphan_segments().await
569    }
570
571    /// Flush all workers - serializes in-memory data to segment files on disk
572    ///
573    /// Sends flush signals to all workers, waits for them to acknowledge,
574    /// then waits for ALL pending background builds to complete.
575    /// Completed segments are accumulated in `flushed_segments` but NOT
576    /// registered in metadata - only `commit()` does that.
577    ///
578    /// Workers continue running and can accept new documents after flush.
579    pub async fn flush(&self) -> Result<()> {
580        // Send flush signal to each worker's channel
581        let mut responses = Vec::with_capacity(self.worker_senders.len());
582
583        for sender in &self.worker_senders {
584            let (tx, rx) = oneshot::channel();
585            if sender.send(WorkerMessage::Flush(tx)).is_err() {
586                // Channel closed, worker may have exited
587                continue;
588            }
589            responses.push(rx);
590        }
591
592        // Wait for all workers to acknowledge flush
593        for rx in responses {
594            let _ = rx.await;
595        }
596
597        // Wait for ALL pending builds to complete and collect results
598        let mut receiver = self.segment_id_receiver.lock().await;
599        while self.pending_builds.load(Ordering::SeqCst) > 0 {
600            if let Some((segment_hex, num_docs)) = receiver.recv().await {
601                if num_docs > 0 {
602                    self.flushed_segments
603                        .lock()
604                        .await
605                        .push((segment_hex, num_docs));
606                }
607            } else {
608                break; // Channel closed
609            }
610        }
611
612        // Drain any remaining messages (builds that completed between checks)
613        while let Ok((segment_hex, num_docs)) = receiver.try_recv() {
614            if num_docs > 0 {
615                self.flushed_segments
616                    .lock()
617                    .await
618                    .push((segment_hex, num_docs));
619            }
620        }
621
622        Ok(())
623    }
624
625    /// Commit all pending segments to metadata and wait for completion
626    ///
627    /// Calls `flush()` to serialize all in-memory data to disk, then
628    /// registers flushed segments in metadata. This provides transactional
629    /// semantics: on crash before commit, orphan files are cleaned up by
630    /// `cleanup_orphan_segments()`.
631    ///
632    /// **Auto-triggers vector index build** when threshold is crossed for any field.
633    pub async fn commit(&self) -> Result<()> {
634        // Flush all workers and wait for builds to complete
635        self.flush().await?;
636
637        // Register all flushed segments in metadata
638        let segments = std::mem::take(&mut *self.flushed_segments.lock().await);
639        for (segment_hex, num_docs) in segments {
640            self.segment_manager
641                .register_segment(segment_hex, num_docs)
642                .await?;
643        }
644
645        // Auto-trigger vector index build if threshold crossed
646        self.maybe_build_vector_index().await?;
647
648        Ok(())
649    }
650
651    // Vector index building methods are in vector_builder.rs
652
653    /// Merge all segments into one (called explicitly via force_merge)
654    async fn do_merge(&self) -> Result<()> {
655        let segment_ids = self.segment_manager.get_segment_ids().await;
656
657        if segment_ids.len() < 2 {
658            return Ok(());
659        }
660
661        let ids_to_merge: Vec<String> = segment_ids;
662
663        // Load segment readers
664        let mut readers = Vec::new();
665        let mut doc_offset = 0u32;
666
667        for id_str in &ids_to_merge {
668            let segment_id = SegmentId::from_hex(id_str)
669                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
670            let reader = SegmentReader::open(
671                self.directory.as_ref(),
672                segment_id,
673                Arc::clone(&self.schema),
674                doc_offset,
675                self.config.term_cache_blocks,
676            )
677            .await?;
678            doc_offset += reader.meta().num_docs;
679            readers.push(reader);
680        }
681
682        // Calculate total doc count for the merged segment
683        let total_docs: u32 = readers.iter().map(|r| r.meta().num_docs).sum();
684
685        // Merge into new segment
686        let merger = SegmentMerger::new(Arc::clone(&self.schema));
687        let new_segment_id = SegmentId::new();
688        merger
689            .merge(self.directory.as_ref(), &readers, new_segment_id)
690            .await?;
691
692        // Atomically update segments and delete old ones via SegmentManager
693        self.segment_manager
694            .replace_segments(vec![(new_segment_id.to_hex(), total_docs)], ids_to_merge)
695            .await?;
696
697        Ok(())
698    }
699
700    /// Force merge all segments into one
701    pub async fn force_merge(&self) -> Result<()> {
702        // First commit all pending documents (waits for completion)
703        self.commit().await?;
704        // Wait for any background merges to complete (avoid race with segment deletion)
705        self.wait_for_merges().await;
706        // Then merge all segments
707        self.do_merge().await
708    }
709
710    // Vector index methods (build_vector_index, rebuild_vector_index, etc.)
711    // are implemented in vector_builder.rs
712}