Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter - async document indexing with parallel segment building
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8use rustc_hash::FxHashMap;
9use tokio::sync::Mutex as AsyncMutex;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::JoinHandle;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
18use crate::tokenizer::BoxedTokenizer;
19
20use super::IndexConfig;
21
22/// Message sent to worker tasks
23enum WorkerMessage {
24    /// A document to index
25    Document(Document),
26    /// Signal to flush current builder and respond when done
27    Flush(oneshot::Sender<()>),
28}
29
30/// Async IndexWriter for adding documents and committing segments
31///
32/// Features:
33/// - Queue-based parallel indexing with worker tasks
34/// - Streams documents to disk immediately (no in-memory document storage)
35/// - Uses string interning for terms (reduced allocations)
36/// - Uses hashbrown HashMap (faster than BTreeMap)
37///
38/// **Architecture:**
39/// - `add_document()` sends to per-worker unbounded channels (non-blocking)
40/// - Round-robin distribution across workers - no mutex contention
41/// - Each worker owns a SegmentBuilder and flushes when memory threshold is reached
42///
43/// **State management:**
44/// - Building segments: Managed here (pending_builds)
45/// - Committed segments + metadata: Managed by SegmentManager (sole owner of metadata.json)
46pub struct IndexWriter<D: DirectoryWriter + 'static> {
47    pub(super) directory: Arc<D>,
48    pub(super) schema: Arc<Schema>,
49    pub(super) config: IndexConfig,
50    #[allow(dead_code)] // Used for creating new builders in worker_state
51    builder_config: SegmentBuilderConfig,
52    tokenizers: FxHashMap<Field, BoxedTokenizer>,
53    /// Per-worker channel senders - round-robin distribution
54    worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
55    /// Round-robin counter for worker selection
56    next_worker: AtomicUsize,
57    /// Worker task handles - kept alive to prevent premature shutdown
58    #[allow(dead_code)]
59    workers: Vec<JoinHandle<()>>,
60    /// Shared state for workers
61    #[allow(dead_code)]
62    worker_state: Arc<WorkerState<D>>,
63    /// Segment manager - owns metadata.json, handles segments and background merging
64    pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
65    /// Shared trained structures — same Arc as in WorkerState
66    pub(super) trained_structures:
67        Arc<std::sync::RwLock<Option<crate::segment::TrainedVectorStructures>>>,
68    /// Channel receiver for completed segment IDs and doc counts
69    segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<(String, u32)>>,
70    /// Count of in-flight background builds
71    pending_builds: Arc<AtomicUsize>,
72    /// Segments flushed to disk but not yet registered in metadata
73    flushed_segments: AsyncMutex<Vec<(String, u32)>>,
74}
75
76/// Shared state for worker tasks
77struct WorkerState<D: DirectoryWriter + 'static> {
78    directory: Arc<D>,
79    schema: Arc<Schema>,
80    config: IndexConfig,
81    builder_config: SegmentBuilderConfig,
82    tokenizers: FxHashMap<Field, BoxedTokenizer>,
83    segment_id_sender: mpsc::UnboundedSender<(String, u32)>,
84    pending_builds: Arc<AtomicUsize>,
85    /// Limits concurrent segment builds to prevent OOM from unbounded build parallelism.
86    /// Workers block at acquire, providing natural backpressure.
87    build_semaphore: Arc<tokio::sync::Semaphore>,
88    /// Trained vector structures (centroids/codebooks) shared with workers.
89    /// When present, new segments are built with ANN indexes inline.
90    /// Updated after training completes; read by spawn_segment_build.
91    trained_structures: Arc<std::sync::RwLock<Option<crate::segment::TrainedVectorStructures>>>,
92}
93
94impl<D: DirectoryWriter + 'static> IndexWriter<D> {
95    /// Create a new index in the directory
96    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
97        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
98    }
99
100    /// Create a new index with custom builder config
101    pub async fn create_with_config(
102        directory: D,
103        schema: Schema,
104        config: IndexConfig,
105        builder_config: SegmentBuilderConfig,
106    ) -> Result<Self> {
107        let directory = Arc::new(directory);
108        let schema = Arc::new(schema);
109
110        // Create channel for background builds to report completed segment IDs
111        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
112
113        // Initialize metadata with schema
114        let metadata = super::IndexMetadata::new((*schema).clone());
115
116        // Create segment manager - owns metadata.json
117        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
118            Arc::clone(&directory),
119            Arc::clone(&schema),
120            metadata,
121            config.merge_policy.clone_box(),
122            config.term_cache_blocks,
123        ));
124
125        // Save initial metadata
126        segment_manager.update_metadata(|_| {}).await?;
127
128        let pending_builds = Arc::new(AtomicUsize::new(0));
129
130        // Limit concurrent segment builds to prevent OOM.
131        // With N workers, allow at most ceil(N/2) concurrent builds.
132        let num_workers = config.num_indexing_threads.max(1);
133        let max_concurrent_builds = num_workers.div_ceil(2).max(1);
134        let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
135
136        // Create shared worker state
137        let trained_structures = Arc::new(std::sync::RwLock::new(None));
138        let worker_state = Arc::new(WorkerState {
139            directory: Arc::clone(&directory),
140            schema: Arc::clone(&schema),
141            config: config.clone(),
142            builder_config: builder_config.clone(),
143            tokenizers: FxHashMap::default(),
144            segment_id_sender,
145            pending_builds: Arc::clone(&pending_builds),
146            build_semaphore,
147            trained_structures: Arc::clone(&trained_structures),
148        });
149
150        // Create per-worker unbounded channels and spawn workers
151        let mut worker_senders = Vec::with_capacity(num_workers);
152        let mut workers = Vec::with_capacity(num_workers);
153
154        for _ in 0..num_workers {
155            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
156            worker_senders.push(tx);
157
158            let state = Arc::clone(&worker_state);
159            let handle = tokio::spawn(async move {
160                Self::worker_loop(state, rx).await;
161            });
162            workers.push(handle);
163        }
164
165        Ok(Self {
166            directory,
167            schema,
168            config,
169            builder_config,
170            tokenizers: FxHashMap::default(),
171            worker_senders,
172            next_worker: AtomicUsize::new(0),
173            workers,
174            worker_state,
175            segment_manager,
176            trained_structures,
177            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
178            pending_builds,
179            flushed_segments: AsyncMutex::new(Vec::new()),
180        })
181    }
182
183    /// Open an existing index for writing
184    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
185        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
186    }
187
188    /// Open an existing index with custom builder config
189    pub async fn open_with_config(
190        directory: D,
191        config: IndexConfig,
192        builder_config: SegmentBuilderConfig,
193    ) -> Result<Self> {
194        let directory = Arc::new(directory);
195
196        // Load unified metadata (includes schema)
197        let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
198        let schema = Arc::new(metadata.schema.clone());
199
200        // Create channel for background builds to report completed segment IDs
201        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
202
203        // Create segment manager - owns metadata.json
204        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
205            Arc::clone(&directory),
206            Arc::clone(&schema),
207            metadata,
208            config.merge_policy.clone_box(),
209            config.term_cache_blocks,
210        ));
211
212        let pending_builds = Arc::new(AtomicUsize::new(0));
213
214        // Limit concurrent segment builds to prevent OOM.
215        let num_workers = config.num_indexing_threads.max(1);
216        let max_concurrent_builds = num_workers.div_ceil(2).max(1);
217        let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
218
219        // Create shared worker state (trained structures loaded after construction)
220        let trained_structures = Arc::new(std::sync::RwLock::new(None));
221        let worker_state = Arc::new(WorkerState {
222            directory: Arc::clone(&directory),
223            schema: Arc::clone(&schema),
224            config: config.clone(),
225            builder_config: builder_config.clone(),
226            tokenizers: FxHashMap::default(),
227            segment_id_sender,
228            pending_builds: Arc::clone(&pending_builds),
229            build_semaphore,
230            trained_structures: Arc::clone(&trained_structures),
231        });
232
233        // Create per-worker unbounded channels and spawn workers
234        let mut worker_senders = Vec::with_capacity(num_workers);
235        let mut workers = Vec::with_capacity(num_workers);
236
237        for _ in 0..num_workers {
238            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
239            worker_senders.push(tx);
240
241            let state = Arc::clone(&worker_state);
242            let handle = tokio::spawn(async move {
243                Self::worker_loop(state, rx).await;
244            });
245            workers.push(handle);
246        }
247
248        let writer = Self {
249            directory,
250            schema,
251            config,
252            builder_config,
253            tokenizers: FxHashMap::default(),
254            worker_senders,
255            next_worker: AtomicUsize::new(0),
256            workers,
257            worker_state,
258            segment_manager,
259            trained_structures,
260            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
261            pending_builds,
262            flushed_segments: AsyncMutex::new(Vec::new()),
263        };
264
265        // Load any previously trained structures so new segments get ANN inline
266        writer.publish_trained_structures().await;
267
268        Ok(writer)
269    }
270
271    /// Create an IndexWriter from an existing Index
272    ///
273    /// This shares the SegmentManager with the Index, ensuring consistent
274    /// segment lifecycle management.
275    pub fn from_index(index: &super::Index<D>) -> Self {
276        let segment_manager = Arc::clone(&index.segment_manager);
277        let directory = Arc::clone(&index.directory);
278        let schema = Arc::clone(&index.schema);
279        let config = index.config.clone();
280        let builder_config = crate::segment::SegmentBuilderConfig::default();
281
282        // Create channel for background builds
283        let (segment_id_sender, segment_id_receiver) = tokio::sync::mpsc::unbounded_channel();
284
285        let pending_builds = Arc::new(AtomicUsize::new(0));
286
287        // Limit concurrent segment builds to prevent OOM.
288        let num_workers = config.num_indexing_threads.max(1);
289        let max_concurrent_builds = num_workers.div_ceil(2).max(1);
290        let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
291
292        // Seed trained structures from Index (which already loaded them)
293        let initial_trained = if !index.trained_centroids.is_empty() {
294            Some(crate::segment::TrainedVectorStructures {
295                centroids: index.trained_centroids.clone(),
296                codebooks: index.trained_codebooks.clone(),
297            })
298        } else {
299            None
300        };
301        let trained_structures = Arc::new(std::sync::RwLock::new(initial_trained));
302
303        let worker_state = Arc::new(WorkerState {
304            directory: Arc::clone(&directory),
305            schema: Arc::clone(&schema),
306            config: config.clone(),
307            builder_config: builder_config.clone(),
308            tokenizers: FxHashMap::default(),
309            segment_id_sender,
310            pending_builds: Arc::clone(&pending_builds),
311            build_semaphore,
312            trained_structures: Arc::clone(&trained_structures),
313        });
314
315        // Create per-worker channels and spawn workers
316        let mut worker_senders = Vec::with_capacity(num_workers);
317        let mut workers = Vec::with_capacity(num_workers);
318
319        for _ in 0..num_workers {
320            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<WorkerMessage>();
321            worker_senders.push(tx);
322
323            let state = Arc::clone(&worker_state);
324            let handle = tokio::spawn(async move {
325                Self::worker_loop(state, rx).await;
326            });
327            workers.push(handle);
328        }
329
330        Self {
331            directory,
332            schema,
333            config,
334            builder_config,
335            tokenizers: FxHashMap::default(),
336            worker_senders,
337            next_worker: AtomicUsize::new(0),
338            workers,
339            worker_state,
340            segment_manager,
341            trained_structures,
342            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
343            pending_builds,
344            flushed_segments: AsyncMutex::new(Vec::new()),
345        }
346    }
347
348    /// Get the schema
349    pub fn schema(&self) -> &Schema {
350        &self.schema
351    }
352
353    /// Set tokenizer for a field
354    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
355        self.tokenizers.insert(field, Box::new(tokenizer));
356    }
357
358    /// Add a document to the indexing queue
359    ///
360    /// Documents are sent to per-worker unbounded channels.
361    /// This is O(1) and never blocks - returns immediately.
362    /// Workers handle the actual indexing in parallel.
363    pub fn add_document(&self, doc: Document) -> Result<DocId> {
364        // Round-robin select worker
365        let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
366        self.worker_senders[idx]
367            .send(WorkerMessage::Document(doc))
368            .map_err(|_| Error::Internal("Document channel closed".into()))?;
369        Ok(0)
370    }
371
372    /// Add multiple documents to the indexing queue
373    ///
374    /// Documents are distributed round-robin to workers.
375    /// Returns immediately - never blocks.
376    pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
377        let num_workers = self.worker_senders.len();
378        let count = documents.len();
379        let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
380        for (i, doc) in documents.into_iter().enumerate() {
381            let idx = (base + i) % num_workers;
382            let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
383        }
384        Ok(count)
385    }
386
387    /// Worker loop - polls messages from its own channel and indexes documents
388    async fn worker_loop(
389        state: Arc<WorkerState<D>>,
390        mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
391    ) {
392        let mut builder: Option<SegmentBuilder> = None;
393        let mut _doc_count = 0u32;
394
395        loop {
396            // Receive from own channel - no mutex contention
397            let msg = receiver.recv().await;
398
399            let Some(msg) = msg else {
400                // Channel closed - flush remaining docs and exit
401                if let Some(b) = builder.take()
402                    && b.num_docs() > 0
403                {
404                    Self::spawn_segment_build(&state, b).await;
405                }
406                return;
407            };
408
409            match msg {
410                WorkerMessage::Document(doc) => {
411                    // Initialize builder if needed
412                    if builder.is_none() {
413                        match SegmentBuilder::new(
414                            (*state.schema).clone(),
415                            state.builder_config.clone(),
416                        ) {
417                            Ok(mut b) => {
418                                for (field, tokenizer) in &state.tokenizers {
419                                    b.set_tokenizer(*field, tokenizer.clone_box());
420                                }
421                                builder = Some(b);
422                            }
423                            Err(e) => {
424                                eprintln!("Failed to create segment builder: {:?}", e);
425                                continue;
426                            }
427                        }
428                    }
429
430                    // Index the document
431                    let b = builder.as_mut().unwrap();
432                    if let Err(e) = b.add_document(doc) {
433                        eprintln!("Failed to index document: {:?}", e);
434                        continue;
435                    }
436
437                    _doc_count += 1;
438
439                    // Periodically recalibrate memory estimate using capacity-based
440                    // calculation. The incremental tracker undercounts by ~33% because
441                    // Vec::push doubles capacity but we only track element sizes.
442                    if b.num_docs().is_multiple_of(1000) {
443                        b.recalibrate_memory();
444                    }
445
446                    // Check memory after every document - O(1) with incremental tracking.
447                    // Always reserve 2x headroom for build-phase memory amplification:
448                    // when commit flushes all workers simultaneously, their builders stay
449                    // in memory during serialization, effectively doubling peak usage.
450                    let in_flight = state.pending_builds.load(Ordering::Relaxed);
451                    let num_workers = state.config.num_indexing_threads.max(1);
452                    let effective_slots = num_workers * 2 + in_flight * 2;
453                    let per_worker_limit = state.config.max_indexing_memory_bytes / effective_slots;
454                    let builder_memory = b.estimated_memory_bytes();
455
456                    // Log memory usage periodically
457                    if _doc_count.is_multiple_of(10_000) {
458                        log::debug!(
459                            "[indexing] docs={}, memory={:.2} MB, limit={:.2} MB",
460                            b.num_docs(),
461                            builder_memory as f64 / (1024.0 * 1024.0),
462                            per_worker_limit as f64 / (1024.0 * 1024.0)
463                        );
464                    }
465
466                    // Require minimum 100 docs before flushing to avoid tiny segments
467                    // (sparse vectors with many dims can hit memory limit quickly)
468                    const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
469                    let doc_count = b.num_docs();
470
471                    if builder_memory >= per_worker_limit && doc_count >= MIN_DOCS_BEFORE_FLUSH {
472                        // Get detailed stats for debugging memory issues
473                        let stats = b.stats();
474                        let mb = stats.memory_breakdown;
475                        log::info!(
476                            "[indexing] flushing segment: docs={}, est_mem={:.2} MB, actual_mem={:.2} MB, \
477                             postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB, interner={:.2} MB, \
478                             unique_terms={}, sparse_dims={}",
479                            doc_count,
480                            builder_memory as f64 / (1024.0 * 1024.0),
481                            stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
482                            mb.postings_bytes as f64 / (1024.0 * 1024.0),
483                            mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
484                            mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
485                            mb.interner_bytes as f64 / (1024.0 * 1024.0),
486                            stats.unique_terms,
487                            b.sparse_dim_count(),
488                        );
489                        let full_builder = builder.take().unwrap();
490                        Self::spawn_segment_build(&state, full_builder).await;
491                        _doc_count = 0;
492                    }
493                }
494                WorkerMessage::Flush(respond) => {
495                    // Flush current builder if it has documents
496                    if let Some(b) = builder.take()
497                        && b.num_docs() > 0
498                    {
499                        // Log detailed memory breakdown on flush
500                        let stats = b.stats();
501                        let mb = stats.memory_breakdown;
502                        log::info!(
503                            "[indexing_flush] docs={}, total_mem={:.2} MB, \
504                             postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB ({} vectors), \
505                             interner={:.2} MB, positions={:.2} MB, unique_terms={}",
506                            b.num_docs(),
507                            stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
508                            mb.postings_bytes as f64 / (1024.0 * 1024.0),
509                            mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
510                            mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
511                            mb.dense_vector_count,
512                            mb.interner_bytes as f64 / (1024.0 * 1024.0),
513                            mb.position_index_bytes as f64 / (1024.0 * 1024.0),
514                            stats.unique_terms,
515                        );
516                        Self::spawn_segment_build(&state, b).await;
517                    }
518                    _doc_count = 0;
519                    // Signal that flush is complete for this worker
520                    let _ = respond.send(());
521                }
522            }
523        }
524    }
525    async fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
526        // Acquire semaphore permit before spawning - blocks if too many builds in flight.
527        // This provides backpressure: workers pause indexing until a build slot opens.
528        let permit = state.build_semaphore.clone().acquire_owned().await.unwrap();
529
530        let directory = Arc::clone(&state.directory);
531        let segment_id = SegmentId::new();
532        let segment_hex = segment_id.to_hex();
533        let sender = state.segment_id_sender.clone();
534        let pending_builds = Arc::clone(&state.pending_builds);
535
536        // Snapshot trained structures for this build (cheap Arc clone)
537        let trained = state
538            .trained_structures
539            .read()
540            .ok()
541            .and_then(|guard| guard.clone());
542
543        let doc_count = builder.num_docs();
544        let memory_bytes = builder.estimated_memory_bytes();
545
546        log::info!(
547            "[segment_build_started] segment_id={} doc_count={} memory_bytes={} ann={}",
548            segment_hex,
549            doc_count,
550            memory_bytes,
551            trained.is_some()
552        );
553
554        pending_builds.fetch_add(1, Ordering::SeqCst);
555
556        tokio::spawn(async move {
557            let _permit = permit; // held for build duration, released on drop
558            let build_start = std::time::Instant::now();
559            let result = match builder
560                .build(directory.as_ref(), segment_id, trained.as_ref())
561                .await
562            {
563                Ok(meta) => {
564                    let build_duration_ms = build_start.elapsed().as_millis() as u64;
565                    log::info!(
566                        "[segment_build_completed] segment_id={} doc_count={} duration_ms={}",
567                        segment_hex,
568                        meta.num_docs,
569                        build_duration_ms
570                    );
571                    (segment_hex, meta.num_docs)
572                }
573                Err(e) => {
574                    log::error!(
575                        "[segment_build_failed] segment_id={} error={}",
576                        segment_hex,
577                        e
578                    );
579                    eprintln!("Background segment build failed: {:?}", e);
580                    // Signal failure with num_docs=0 so waiters don't block
581                    (segment_hex, 0)
582                }
583            };
584            // Always send to channel and decrement - even on failure
585            // This ensures flush()/commit() doesn't hang waiting for messages
586            let _ = sender.send(result);
587            pending_builds.fetch_sub(1, Ordering::SeqCst);
588        });
589    }
590
591    /// Get the number of pending background builds
592    pub fn pending_build_count(&self) -> usize {
593        self.pending_builds.load(Ordering::SeqCst)
594    }
595
596    /// Get the number of pending background merges
597    pub fn pending_merge_count(&self) -> usize {
598        self.segment_manager.pending_merge_count()
599    }
600
601    /// Check merge policy and spawn background merges if needed
602    ///
603    /// This is called automatically after segment builds complete via SegmentManager.
604    /// Can also be called manually to trigger merge checking.
605    pub async fn maybe_merge(&self) {
606        self.segment_manager.maybe_merge().await;
607    }
608
609    /// Wait for all pending merges to complete
610    pub async fn wait_for_merges(&self) {
611        self.segment_manager.wait_for_merges().await;
612    }
613
614    /// Get the segment tracker for sharing with readers
615    /// This allows readers to acquire snapshots that prevent segment deletion
616    pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
617        self.segment_manager.tracker()
618    }
619
620    /// Acquire a snapshot of current segments for reading
621    /// The snapshot holds references - segments won't be deleted while snapshot exists
622    pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot<D> {
623        self.segment_manager.acquire_snapshot().await
624    }
625
626    /// Clean up orphan segment files that are not registered
627    ///
628    /// This can happen if the process halts after segment files are written
629    /// but before they are registered in segments.json. Call this after opening
630    /// an index to reclaim disk space from incomplete operations.
631    ///
632    /// Returns the number of orphan segments deleted.
633    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
634        self.segment_manager.cleanup_orphan_segments().await
635    }
636
637    /// Flush all workers - serializes in-memory data to segment files on disk
638    ///
639    /// Sends flush signals to all workers, waits for them to acknowledge,
640    /// then waits for ALL pending background builds to complete.
641    /// Completed segments are accumulated in `flushed_segments` but NOT
642    /// registered in metadata - only `commit()` does that.
643    ///
644    /// Workers continue running and can accept new documents after flush.
645    pub async fn flush(&self) -> Result<()> {
646        // Send flush signal to each worker's channel
647        let mut responses = Vec::with_capacity(self.worker_senders.len());
648
649        for sender in &self.worker_senders {
650            let (tx, rx) = oneshot::channel();
651            if sender.send(WorkerMessage::Flush(tx)).is_err() {
652                // Channel closed, worker may have exited
653                continue;
654            }
655            responses.push(rx);
656        }
657
658        // Wait for all workers to acknowledge flush
659        for rx in responses {
660            let _ = rx.await;
661        }
662
663        // Wait for ALL pending builds to complete and collect results
664        let mut receiver = self.segment_id_receiver.lock().await;
665        while self.pending_builds.load(Ordering::SeqCst) > 0 {
666            if let Some((segment_hex, num_docs)) = receiver.recv().await {
667                if num_docs > 0 {
668                    self.flushed_segments
669                        .lock()
670                        .await
671                        .push((segment_hex, num_docs));
672                }
673            } else {
674                break; // Channel closed
675            }
676        }
677
678        // Drain any remaining messages (builds that completed between checks)
679        while let Ok((segment_hex, num_docs)) = receiver.try_recv() {
680            if num_docs > 0 {
681                self.flushed_segments
682                    .lock()
683                    .await
684                    .push((segment_hex, num_docs));
685            }
686        }
687
688        Ok(())
689    }
690
691    /// Commit all pending segments to metadata and wait for completion
692    ///
693    /// Calls `flush()` to serialize all in-memory data to disk, then
694    /// registers flushed segments in metadata. This provides transactional
695    /// semantics: on crash before commit, orphan files are cleaned up by
696    /// `cleanup_orphan_segments()`.
697    ///
698    /// **Auto-triggers vector index build** when threshold is crossed for any field.
699    pub async fn commit(&self) -> Result<()> {
700        // Flush all workers and wait for builds to complete
701        self.flush().await?;
702
703        // Register all flushed segments in metadata
704        let segments = std::mem::take(&mut *self.flushed_segments.lock().await);
705        for (segment_hex, num_docs) in segments {
706            self.segment_manager
707                .register_segment(segment_hex, num_docs)
708                .await?;
709        }
710
711        // Auto-trigger vector index build if threshold crossed
712        self.maybe_build_vector_index().await?;
713
714        Ok(())
715    }
716
717    // Vector index building methods are in vector_builder.rs
718
719    /// Merge all segments into one (called explicitly via force_merge).
720    /// Delegates to SegmentManager::do_merge to avoid duplication.
721    async fn do_merge(&self) -> Result<()> {
722        let ids_to_merge = self.segment_manager.get_segment_ids().await;
723
724        if ids_to_merge.len() < 2 {
725            return Ok(());
726        }
727
728        let metadata_arc = self.segment_manager.metadata();
729        let (new_segment_id, total_docs) = crate::merge::SegmentManager::do_merge(
730            self.directory.as_ref(),
731            &self.schema,
732            &ids_to_merge,
733            self.config.term_cache_blocks,
734            &metadata_arc,
735        )
736        .await?;
737
738        // Atomically update segments and delete old ones via SegmentManager
739        self.segment_manager
740            .replace_segments(vec![(new_segment_id, total_docs)], ids_to_merge)
741            .await?;
742
743        Ok(())
744    }
745
746    /// Force merge all segments into one
747    pub async fn force_merge(&self) -> Result<()> {
748        // First commit all pending documents (waits for completion)
749        self.commit().await?;
750        // Wait for any background merges to complete (avoid race with segment deletion)
751        self.wait_for_merges().await;
752        // Then merge all segments
753        self.do_merge().await
754    }
755
756    // Vector index methods (build_vector_index, rebuild_vector_index, etc.)
757    // are implemented in vector_builder.rs
758}