Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter - async document indexing with parallel segment building
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8use rustc_hash::FxHashMap;
9use tokio::sync::Mutex as AsyncMutex;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::JoinHandle;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
18use crate::tokenizer::BoxedTokenizer;
19
20use super::IndexConfig;
21
22/// Message sent to worker tasks
23enum WorkerMessage {
24    /// A document to index
25    Document(Document),
26    /// Signal to flush current builder and respond when done
27    Flush(oneshot::Sender<()>),
28}
29
30/// Async IndexWriter for adding documents and committing segments
31///
32/// Features:
33/// - Queue-based parallel indexing with worker tasks
34/// - Streams documents to disk immediately (no in-memory document storage)
35/// - Uses string interning for terms (reduced allocations)
36/// - Uses hashbrown HashMap (faster than BTreeMap)
37///
38/// **Architecture:**
39/// - `add_document()` sends to per-worker unbounded channels (non-blocking)
40/// - Round-robin distribution across workers - no mutex contention
41/// - Each worker owns a SegmentBuilder and flushes when memory threshold is reached
42///
43/// **State management:**
44/// - Building segments: Managed here (pending_builds)
45/// - Committed segments + metadata: Managed by SegmentManager (sole owner of metadata.json)
46pub struct IndexWriter<D: DirectoryWriter + 'static> {
47    pub(super) directory: Arc<D>,
48    pub(super) schema: Arc<Schema>,
49    pub(super) config: IndexConfig,
50    #[allow(dead_code)]
51    builder_config: SegmentBuilderConfig,
52    tokenizers: FxHashMap<Field, BoxedTokenizer>,
53    /// Per-worker channel senders - round-robin distribution
54    worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
55    /// Round-robin counter for worker selection
56    next_worker: AtomicUsize,
57    /// Worker task handles - kept alive to prevent premature shutdown
58    #[allow(dead_code)]
59    workers: Vec<JoinHandle<()>>,
60    /// Shared state for workers (used by flush to collect build handles)
61    worker_state: Arc<WorkerState<D>>,
62    /// Segment manager - owns metadata.json, handles segments and background merging
63    pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
64    /// Segments flushed to disk but not yet registered in metadata
65    flushed_segments: AsyncMutex<Vec<(String, u32)>>,
66}
67
68/// Shared state for worker tasks
69struct WorkerState<D: DirectoryWriter + 'static> {
70    directory: Arc<D>,
71    schema: Arc<Schema>,
72    config: IndexConfig,
73    builder_config: SegmentBuilderConfig,
74    tokenizers: FxHashMap<Field, BoxedTokenizer>,
75    /// In-flight build count for memory-pressure heuristic (informational only).
76    pending_builds: Arc<AtomicUsize>,
77    /// Limits concurrent segment builds to prevent OOM from unbounded build parallelism.
78    /// Workers block at acquire, providing natural backpressure.
79    build_semaphore: Arc<tokio::sync::Semaphore>,
80    /// JoinHandles for in-flight segment builds. flush() takes + awaits them.
81    /// This replaces the old channel approach which had a TOCTOU race causing hangs.
82    build_handles: AsyncMutex<Vec<JoinHandle<(String, u32)>>>,
83    /// Segment manager — workers read trained structures from its ArcSwap (lock-free).
84    segment_manager: Arc<crate::merge::SegmentManager<D>>,
85}
86
87impl<D: DirectoryWriter + 'static> IndexWriter<D> {
88    /// Create a new index in the directory
89    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
90        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
91    }
92
93    /// Create a new index with custom builder config
94    pub async fn create_with_config(
95        directory: D,
96        schema: Schema,
97        config: IndexConfig,
98        builder_config: SegmentBuilderConfig,
99    ) -> Result<Self> {
100        let directory = Arc::new(directory);
101        let schema = Arc::new(schema);
102
103        // Initialize metadata with schema
104        let metadata = super::IndexMetadata::new((*schema).clone());
105
106        // Create segment manager - owns metadata.json
107        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
108            Arc::clone(&directory),
109            Arc::clone(&schema),
110            metadata,
111            config.merge_policy.clone_box(),
112            config.term_cache_blocks,
113        ));
114
115        // Save initial metadata
116        segment_manager.update_metadata(|_| {}).await?;
117
118        let (worker_state, worker_senders, workers) = Self::spawn_workers(
119            &directory,
120            &schema,
121            &config,
122            &builder_config,
123            &segment_manager,
124        );
125
126        Ok(Self {
127            directory,
128            schema,
129            config,
130            builder_config,
131            tokenizers: FxHashMap::default(),
132            worker_senders,
133            next_worker: AtomicUsize::new(0),
134            workers,
135            worker_state,
136            segment_manager,
137            flushed_segments: AsyncMutex::new(Vec::new()),
138        })
139    }
140
141    /// Open an existing index for writing
142    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
143        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
144    }
145
146    /// Open an existing index with custom builder config
147    pub async fn open_with_config(
148        directory: D,
149        config: IndexConfig,
150        builder_config: SegmentBuilderConfig,
151    ) -> Result<Self> {
152        let directory = Arc::new(directory);
153
154        // Load unified metadata (includes schema)
155        let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
156        let schema = Arc::new(metadata.schema.clone());
157
158        // Create segment manager - owns metadata.json
159        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
160            Arc::clone(&directory),
161            Arc::clone(&schema),
162            metadata,
163            config.merge_policy.clone_box(),
164            config.term_cache_blocks,
165        ));
166
167        // Load previously trained structures so merges use ANN
168        segment_manager.load_and_publish_trained().await;
169
170        let (worker_state, worker_senders, workers) = Self::spawn_workers(
171            &directory,
172            &schema,
173            &config,
174            &builder_config,
175            &segment_manager,
176        );
177
178        Ok(Self {
179            directory,
180            schema,
181            config,
182            builder_config,
183            tokenizers: FxHashMap::default(),
184            worker_senders,
185            next_worker: AtomicUsize::new(0),
186            workers,
187            worker_state,
188            segment_manager,
189            flushed_segments: AsyncMutex::new(Vec::new()),
190        })
191    }
192
193    /// Create an IndexWriter from an existing Index
194    ///
195    /// This shares the SegmentManager with the Index, ensuring consistent
196    /// segment lifecycle management.
197    pub fn from_index(index: &super::Index<D>) -> Self {
198        let segment_manager = Arc::clone(&index.segment_manager);
199        let directory = Arc::clone(&index.directory);
200        let schema = Arc::clone(&index.schema);
201        let config = index.config.clone();
202        let builder_config = crate::segment::SegmentBuilderConfig::default();
203
204        let (worker_state, worker_senders, workers) = Self::spawn_workers(
205            &directory,
206            &schema,
207            &config,
208            &builder_config,
209            &segment_manager,
210        );
211
212        Self {
213            directory,
214            schema,
215            config,
216            builder_config,
217            tokenizers: FxHashMap::default(),
218            worker_senders,
219            next_worker: AtomicUsize::new(0),
220            workers,
221            worker_state,
222            segment_manager,
223            flushed_segments: AsyncMutex::new(Vec::new()),
224        }
225    }
226
227    /// Shared worker setup — deduplicates create/open/from_index.
228    #[allow(clippy::type_complexity)]
229    fn spawn_workers(
230        directory: &Arc<D>,
231        schema: &Arc<Schema>,
232        config: &IndexConfig,
233        builder_config: &SegmentBuilderConfig,
234        segment_manager: &Arc<crate::merge::SegmentManager<D>>,
235    ) -> (
236        Arc<WorkerState<D>>,
237        Vec<tokio::sync::mpsc::UnboundedSender<WorkerMessage>>,
238        Vec<JoinHandle<()>>,
239    ) {
240        let num_workers = config.num_indexing_threads.max(1);
241        let max_concurrent_builds = num_workers.div_ceil(2).max(1);
242
243        let worker_state = Arc::new(WorkerState {
244            directory: Arc::clone(directory),
245            schema: Arc::clone(schema),
246            config: config.clone(),
247            builder_config: builder_config.clone(),
248            tokenizers: FxHashMap::default(),
249            pending_builds: Arc::new(AtomicUsize::new(0)),
250            build_semaphore: Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds)),
251            build_handles: AsyncMutex::new(Vec::new()),
252            segment_manager: Arc::clone(segment_manager),
253        });
254
255        let mut worker_senders = Vec::with_capacity(num_workers);
256        let mut workers = Vec::with_capacity(num_workers);
257
258        for _ in 0..num_workers {
259            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<WorkerMessage>();
260            worker_senders.push(tx);
261
262            let state = Arc::clone(&worker_state);
263            let handle = tokio::spawn(async move {
264                Self::worker_loop(state, rx).await;
265            });
266            workers.push(handle);
267        }
268
269        (worker_state, worker_senders, workers)
270    }
271
272    /// Get the schema
273    pub fn schema(&self) -> &Schema {
274        &self.schema
275    }
276
277    /// Set tokenizer for a field
278    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
279        self.tokenizers.insert(field, Box::new(tokenizer));
280    }
281
282    /// Add a document to the indexing queue
283    ///
284    /// Documents are sent to per-worker unbounded channels.
285    /// This is O(1) and never blocks - returns immediately.
286    /// Workers handle the actual indexing in parallel.
287    pub fn add_document(&self, doc: Document) -> Result<DocId> {
288        // Round-robin select worker
289        let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
290        self.worker_senders[idx]
291            .send(WorkerMessage::Document(doc))
292            .map_err(|_| Error::Internal("Document channel closed".into()))?;
293        Ok(0)
294    }
295
296    /// Add multiple documents to the indexing queue
297    ///
298    /// Documents are distributed round-robin to workers.
299    /// Returns immediately - never blocks.
300    pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
301        let num_workers = self.worker_senders.len();
302        let count = documents.len();
303        let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
304        for (i, doc) in documents.into_iter().enumerate() {
305            let idx = (base + i) % num_workers;
306            let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
307        }
308        Ok(count)
309    }
310
311    /// Worker loop - polls messages from its own channel and indexes documents
312    async fn worker_loop(
313        state: Arc<WorkerState<D>>,
314        mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
315    ) {
316        let mut builder: Option<SegmentBuilder> = None;
317        let mut doc_count = 0u32;
318
319        loop {
320            // Receive from own channel - no mutex contention
321            let msg = receiver.recv().await;
322
323            let Some(msg) = msg else {
324                // Channel closed - flush remaining docs and exit
325                if let Some(b) = builder.take()
326                    && b.num_docs() > 0
327                {
328                    Self::spawn_segment_build(&state, b).await;
329                }
330                return;
331            };
332
333            match msg {
334                WorkerMessage::Document(doc) => {
335                    // Initialize builder if needed
336                    if builder.is_none() {
337                        match SegmentBuilder::new(
338                            (*state.schema).clone(),
339                            state.builder_config.clone(),
340                        ) {
341                            Ok(mut b) => {
342                                for (field, tokenizer) in &state.tokenizers {
343                                    b.set_tokenizer(*field, tokenizer.clone_box());
344                                }
345                                builder = Some(b);
346                            }
347                            Err(e) => {
348                                log::error!("Failed to create segment builder: {:?}", e);
349                                continue;
350                            }
351                        }
352                    }
353
354                    // Index the document
355                    let b = builder.as_mut().unwrap();
356                    if let Err(e) = b.add_document(doc) {
357                        log::error!("Failed to index document: {:?}", e);
358                        continue;
359                    }
360
361                    doc_count += 1;
362
363                    // Periodically recalibrate memory estimate using capacity-based
364                    // calculation. The incremental tracker undercounts by ~33% because
365                    // Vec::push doubles capacity but we only track element sizes.
366                    if b.num_docs().is_multiple_of(1000) {
367                        b.recalibrate_memory();
368                    }
369
370                    // Check memory after every document - O(1) with incremental tracking.
371                    // Always reserve 2x headroom for build-phase memory amplification:
372                    // when commit flushes all workers simultaneously, their builders stay
373                    // in memory during serialization, effectively doubling peak usage.
374                    let in_flight = state.pending_builds.load(Ordering::Relaxed);
375                    let num_workers = state.config.num_indexing_threads.max(1);
376                    let effective_slots = num_workers * 2 + in_flight * 2;
377                    let per_worker_limit = state.config.max_indexing_memory_bytes / effective_slots;
378                    let builder_memory = b.estimated_memory_bytes();
379
380                    // Log memory usage periodically
381                    if doc_count.is_multiple_of(10_000) {
382                        log::debug!(
383                            "[indexing] docs={}, memory={:.2} MB, limit={:.2} MB",
384                            b.num_docs(),
385                            builder_memory as f64 / (1024.0 * 1024.0),
386                            per_worker_limit as f64 / (1024.0 * 1024.0)
387                        );
388                    }
389
390                    // Require minimum 100 docs before flushing to avoid tiny segments
391                    // (sparse vectors with many dims can hit memory limit quickly)
392                    const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
393                    let num_docs = b.num_docs();
394
395                    if builder_memory >= per_worker_limit && num_docs >= MIN_DOCS_BEFORE_FLUSH {
396                        // Get detailed stats for debugging memory issues
397                        let stats = b.stats();
398                        let mb = stats.memory_breakdown;
399                        log::info!(
400                            "[indexing] flushing segment: docs={}, est_mem={:.2} MB, actual_mem={:.2} MB, \
401                             postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB, interner={:.2} MB, \
402                             unique_terms={}, sparse_dims={}",
403                            doc_count,
404                            builder_memory as f64 / (1024.0 * 1024.0),
405                            stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
406                            mb.postings_bytes as f64 / (1024.0 * 1024.0),
407                            mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
408                            mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
409                            mb.interner_bytes as f64 / (1024.0 * 1024.0),
410                            stats.unique_terms,
411                            b.sparse_dim_count(),
412                        );
413                        let full_builder = builder.take().unwrap();
414                        Self::spawn_segment_build(&state, full_builder).await;
415                        doc_count = 0;
416                    }
417                }
418                WorkerMessage::Flush(respond) => {
419                    // Flush current builder if it has documents
420                    if let Some(b) = builder.take()
421                        && b.num_docs() > 0
422                    {
423                        // Log detailed memory breakdown on flush
424                        let stats = b.stats();
425                        let mb = stats.memory_breakdown;
426                        log::info!(
427                            "[indexing_flush] docs={}, total_mem={:.2} MB, \
428                             postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB ({} vectors), \
429                             interner={:.2} MB, positions={:.2} MB, unique_terms={}",
430                            b.num_docs(),
431                            stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
432                            mb.postings_bytes as f64 / (1024.0 * 1024.0),
433                            mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
434                            mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
435                            mb.dense_vector_count,
436                            mb.interner_bytes as f64 / (1024.0 * 1024.0),
437                            mb.position_index_bytes as f64 / (1024.0 * 1024.0),
438                            stats.unique_terms,
439                        );
440                        Self::spawn_segment_build(&state, b).await;
441                    }
442                    doc_count = 0;
443                    // Signal that flush is complete for this worker
444                    let _ = respond.send(());
445                }
446            }
447        }
448    }
449    async fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
450        // Acquire semaphore permit before spawning - blocks if too many builds in flight.
451        // This provides backpressure: workers pause indexing until a build slot opens.
452        let permit = state.build_semaphore.clone().acquire_owned().await.unwrap();
453
454        let directory = Arc::clone(&state.directory);
455        let segment_id = SegmentId::new();
456        let segment_hex = segment_id.to_hex();
457        let pending_builds = Arc::clone(&state.pending_builds);
458
459        // Snapshot trained structures from SegmentManager's ArcSwap (lock-free)
460        let trained = state.segment_manager.trained();
461
462        let doc_count = builder.num_docs();
463        let memory_bytes = builder.estimated_memory_bytes();
464
465        log::info!(
466            "[segment_build_started] segment_id={} doc_count={} memory_bytes={} ann={}",
467            segment_hex,
468            doc_count,
469            memory_bytes,
470            trained.is_some()
471        );
472
473        pending_builds.fetch_add(1, Ordering::SeqCst);
474
475        let handle = tokio::spawn(async move {
476            let _permit = permit; // held for build duration, released on drop
477            let build_start = std::time::Instant::now();
478            let result = match builder
479                .build(directory.as_ref(), segment_id, trained.as_deref())
480                .await
481            {
482                Ok(meta) => {
483                    let build_duration_ms = build_start.elapsed().as_millis() as u64;
484                    log::info!(
485                        "[segment_build_completed] segment_id={} doc_count={} duration_ms={}",
486                        segment_hex,
487                        meta.num_docs,
488                        build_duration_ms
489                    );
490                    (segment_hex, meta.num_docs)
491                }
492                Err(e) => {
493                    log::error!(
494                        "[segment_build_failed] segment_id={} error={:?}",
495                        segment_hex,
496                        e
497                    );
498                    (segment_hex, 0)
499                }
500            };
501            pending_builds.fetch_sub(1, Ordering::SeqCst);
502            result
503        });
504
505        // Push JoinHandle — flush() takes + awaits these. No TOCTOU race.
506        state.build_handles.lock().await.push(handle);
507    }
508
509    /// Get the number of pending background builds
510    pub fn pending_build_count(&self) -> usize {
511        self.worker_state.pending_builds.load(Ordering::SeqCst)
512    }
513
514    /// Check merge policy and spawn background merges if needed
515    ///
516    /// This is called automatically after commit via SegmentManager.
517    /// Can also be called manually to trigger merge checking.
518    pub async fn maybe_merge(&self) {
519        self.segment_manager.maybe_merge().await;
520    }
521
522    /// Wait for all pending merges to complete
523    pub async fn wait_for_merges(&self) {
524        self.segment_manager.wait_for_merges().await;
525    }
526
527    /// Get the segment tracker for sharing with readers
528    /// This allows readers to acquire snapshots that prevent segment deletion
529    pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
530        self.segment_manager.tracker()
531    }
532
533    /// Acquire a snapshot of current segments for reading
534    /// The snapshot holds references - segments won't be deleted while snapshot exists
535    pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot {
536        self.segment_manager.acquire_snapshot().await
537    }
538
539    /// Clean up orphan segment files that are not registered
540    ///
541    /// This can happen if the process halts after segment files are written
542    /// but before they are registered in segments.json. Call this after opening
543    /// an index to reclaim disk space from incomplete operations.
544    ///
545    /// Returns the number of orphan segments deleted.
546    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
547        self.segment_manager.cleanup_orphan_segments().await
548    }
549
550    /// Flush all workers - serializes in-memory data to segment files on disk
551    ///
552    /// Sends flush signals to all workers, waits for them to acknowledge,
553    /// then awaits ALL in-flight build JoinHandles.
554    /// Completed segments are accumulated in `flushed_segments` but NOT
555    /// registered in metadata - only `commit()` does that.
556    ///
557    /// Workers continue running and can accept new documents after flush.
558    pub async fn flush(&self) -> Result<()> {
559        // 1. Send flush signal to each worker's channel
560        let mut responses = Vec::with_capacity(self.worker_senders.len());
561
562        for sender in &self.worker_senders {
563            let (tx, rx) = oneshot::channel();
564            if sender.send(WorkerMessage::Flush(tx)).is_err() {
565                continue;
566            }
567            responses.push(rx);
568        }
569
570        // 2. Wait for all workers to acknowledge flush.
571        //    After ack, all spawn_segment_build calls have completed
572        //    (handles pushed to build_handles).
573        for rx in responses {
574            let _ = rx.await;
575        }
576
577        // 3. Take all build JoinHandles and await them.
578        //    JoinHandle can't lose signals — no TOCTOU race.
579        let handles: Vec<JoinHandle<(String, u32)>> =
580            std::mem::take(&mut *self.worker_state.build_handles.lock().await);
581        for handle in handles {
582            match handle.await {
583                Ok((segment_hex, num_docs)) if num_docs > 0 => {
584                    self.flushed_segments
585                        .lock()
586                        .await
587                        .push((segment_hex, num_docs));
588                }
589                Ok(_) => {} // build failed (0 docs)
590                Err(e) => log::error!("[flush] build task panicked: {:?}", e),
591            }
592        }
593
594        Ok(())
595    }
596
597    /// Commit all pending segments to metadata.
598    ///
599    /// Tantivy-style: flush → atomic commit → merge evaluation. Nothing else.
600    /// Vector training is decoupled — call `build_vector_index()` manually.
601    pub async fn commit(&self) -> Result<()> {
602        // 1. Flush workers (like tantivy's prepare_commit joining threads)
603        self.flush().await?;
604
605        // 2. Atomic commit through SegmentManager (like tantivy: SM.commit + save_metas)
606        let segments = std::mem::take(&mut *self.flushed_segments.lock().await);
607        self.segment_manager.commit(segments).await?;
608
609        // 3. Merge evaluation (like tantivy: consider_merge_options)
610        self.segment_manager.maybe_merge().await;
611
612        Ok(())
613    }
614
615    /// Force merge all segments into one.
616    /// Flushes + commits pending docs, then merges. Does NOT trigger background
617    /// merges — force_merge handles everything itself.
618    pub async fn force_merge(&self) -> Result<()> {
619        // Flush + commit without triggering maybe_merge (would be wasteful & racy)
620        self.flush().await?;
621        let segments = std::mem::take(&mut *self.flushed_segments.lock().await);
622        self.segment_manager.commit(segments).await?;
623        self.segment_manager.force_merge().await
624    }
625
626    // Vector index methods (build_vector_index, etc.) are in vector_builder.rs
627}