Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter - async document indexing with parallel segment building
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8use rustc_hash::FxHashMap;
9use tokio::sync::Mutex as AsyncMutex;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::JoinHandle;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{
18    SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
19};
20use crate::tokenizer::BoxedTokenizer;
21
22use super::IndexConfig;
23
24/// Message sent to worker tasks
25enum WorkerMessage {
26    /// A document to index
27    Document(Document),
28    /// Signal to flush current builder and respond when done
29    Flush(oneshot::Sender<()>),
30}
31
32/// Async IndexWriter for adding documents and committing segments
33///
34/// Features:
35/// - Queue-based parallel indexing with worker tasks
36/// - Streams documents to disk immediately (no in-memory document storage)
37/// - Uses string interning for terms (reduced allocations)
38/// - Uses hashbrown HashMap (faster than BTreeMap)
39///
40/// **Architecture:**
41/// - `add_document()` sends to per-worker unbounded channels (non-blocking)
42/// - Round-robin distribution across workers - no mutex contention
43/// - Each worker owns a SegmentBuilder and flushes when memory threshold is reached
44///
45/// **State management:**
46/// - Building segments: Managed here (pending_builds)
47/// - Committed segments + metadata: Managed by SegmentManager (sole owner of metadata.json)
48pub struct IndexWriter<D: DirectoryWriter + 'static> {
49    pub(super) directory: Arc<D>,
50    pub(super) schema: Arc<Schema>,
51    pub(super) config: IndexConfig,
52    #[allow(dead_code)] // Used for creating new builders in worker_state
53    builder_config: SegmentBuilderConfig,
54    tokenizers: FxHashMap<Field, BoxedTokenizer>,
55    /// Per-worker channel senders - round-robin distribution
56    worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
57    /// Round-robin counter for worker selection
58    next_worker: AtomicUsize,
59    /// Worker task handles - kept alive to prevent premature shutdown
60    #[allow(dead_code)]
61    workers: Vec<JoinHandle<()>>,
62    /// Shared state for workers
63    #[allow(dead_code)]
64    worker_state: Arc<WorkerState<D>>,
65    /// Segment manager - owns metadata.json, handles segments and background merging
66    pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
67    /// Channel receiver for completed segment IDs
68    segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<String>>,
69    /// Count of in-flight background builds
70    pending_builds: Arc<AtomicUsize>,
71    /// Global memory usage across all builders (bytes)
72    #[allow(dead_code)]
73    global_memory_bytes: Arc<AtomicUsize>,
74}
75
76/// Shared state for worker tasks
77struct WorkerState<D: DirectoryWriter + 'static> {
78    directory: Arc<D>,
79    schema: Arc<Schema>,
80    config: IndexConfig,
81    builder_config: SegmentBuilderConfig,
82    tokenizers: FxHashMap<Field, BoxedTokenizer>,
83    segment_id_sender: mpsc::UnboundedSender<String>,
84    segment_manager: Arc<crate::merge::SegmentManager<D>>,
85    pending_builds: Arc<AtomicUsize>,
86}
87
88impl<D: DirectoryWriter + 'static> IndexWriter<D> {
89    /// Create a new index in the directory
90    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
91        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
92    }
93
94    /// Create a new index with custom builder config
95    pub async fn create_with_config(
96        directory: D,
97        schema: Schema,
98        config: IndexConfig,
99        builder_config: SegmentBuilderConfig,
100    ) -> Result<Self> {
101        let directory = Arc::new(directory);
102        let schema = Arc::new(schema);
103
104        // Create channel for background builds to report completed segment IDs
105        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
106
107        // Initialize metadata with schema
108        let metadata = super::IndexMetadata::new((*schema).clone());
109
110        // Create segment manager - owns metadata.json
111        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
112            Arc::clone(&directory),
113            Arc::clone(&schema),
114            metadata,
115            config.merge_policy.clone_box(),
116            config.term_cache_blocks,
117        ));
118
119        // Save initial metadata
120        segment_manager.update_metadata(|_| {}).await?;
121
122        let pending_builds = Arc::new(AtomicUsize::new(0));
123        let global_memory_bytes = Arc::new(AtomicUsize::new(0));
124
125        // Create shared worker state
126        let worker_state = Arc::new(WorkerState {
127            directory: Arc::clone(&directory),
128            schema: Arc::clone(&schema),
129            config: config.clone(),
130            builder_config: builder_config.clone(),
131            tokenizers: FxHashMap::default(),
132            segment_id_sender,
133            segment_manager: Arc::clone(&segment_manager),
134            pending_builds: Arc::clone(&pending_builds),
135        });
136
137        // Create per-worker unbounded channels and spawn workers
138        let num_workers = config.num_indexing_threads.max(1);
139        let mut worker_senders = Vec::with_capacity(num_workers);
140        let mut workers = Vec::with_capacity(num_workers);
141
142        for _ in 0..num_workers {
143            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
144            worker_senders.push(tx);
145
146            let state = Arc::clone(&worker_state);
147            let handle = tokio::spawn(async move {
148                Self::worker_loop(state, rx).await;
149            });
150            workers.push(handle);
151        }
152
153        Ok(Self {
154            directory,
155            schema,
156            config,
157            builder_config,
158            tokenizers: FxHashMap::default(),
159            worker_senders,
160            next_worker: AtomicUsize::new(0),
161            workers,
162            worker_state,
163            segment_manager,
164            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
165            pending_builds,
166            global_memory_bytes,
167        })
168    }
169
170    /// Open an existing index for writing
171    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
172        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
173    }
174
175    /// Open an existing index with custom builder config
176    pub async fn open_with_config(
177        directory: D,
178        config: IndexConfig,
179        builder_config: SegmentBuilderConfig,
180    ) -> Result<Self> {
181        let directory = Arc::new(directory);
182
183        // Load unified metadata (includes schema)
184        let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
185        let schema = Arc::new(metadata.schema.clone());
186
187        // Create channel for background builds to report completed segment IDs
188        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
189
190        // Create segment manager - owns metadata.json
191        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
192            Arc::clone(&directory),
193            Arc::clone(&schema),
194            metadata,
195            config.merge_policy.clone_box(),
196            config.term_cache_blocks,
197        ));
198
199        let pending_builds = Arc::new(AtomicUsize::new(0));
200        let global_memory_bytes = Arc::new(AtomicUsize::new(0));
201
202        // Create shared worker state
203        let worker_state = Arc::new(WorkerState {
204            directory: Arc::clone(&directory),
205            schema: Arc::clone(&schema),
206            config: config.clone(),
207            builder_config: builder_config.clone(),
208            tokenizers: FxHashMap::default(),
209            segment_id_sender,
210            segment_manager: Arc::clone(&segment_manager),
211            pending_builds: Arc::clone(&pending_builds),
212        });
213
214        // Create per-worker unbounded channels and spawn workers
215        let num_workers = config.num_indexing_threads.max(1);
216        let mut worker_senders = Vec::with_capacity(num_workers);
217        let mut workers = Vec::with_capacity(num_workers);
218
219        for _ in 0..num_workers {
220            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
221            worker_senders.push(tx);
222
223            let state = Arc::clone(&worker_state);
224            let handle = tokio::spawn(async move {
225                Self::worker_loop(state, rx).await;
226            });
227            workers.push(handle);
228        }
229
230        Ok(Self {
231            directory,
232            schema,
233            config,
234            builder_config,
235            tokenizers: FxHashMap::default(),
236            worker_senders,
237            next_worker: AtomicUsize::new(0),
238            workers,
239            worker_state,
240            segment_manager,
241            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
242            pending_builds,
243            global_memory_bytes,
244        })
245    }
246
247    /// Create an IndexWriter from an existing Index
248    ///
249    /// This shares the SegmentManager with the Index, ensuring consistent
250    /// segment lifecycle management.
251    pub fn from_index(index: &super::Index<D>) -> Self {
252        let segment_manager = Arc::clone(&index.segment_manager);
253        let directory = Arc::clone(&index.directory);
254        let schema = Arc::clone(&index.schema);
255        let config = index.config.clone();
256        let builder_config = crate::segment::SegmentBuilderConfig::default();
257
258        // Create channel for background builds
259        let (segment_id_sender, segment_id_receiver) = tokio::sync::mpsc::unbounded_channel();
260
261        let pending_builds = Arc::new(AtomicUsize::new(0));
262        let global_memory_bytes = Arc::new(AtomicUsize::new(0));
263
264        // Create shared worker state
265        let worker_state = Arc::new(WorkerState {
266            directory: Arc::clone(&directory),
267            schema: Arc::clone(&schema),
268            config: config.clone(),
269            builder_config: builder_config.clone(),
270            tokenizers: FxHashMap::default(),
271            segment_id_sender,
272            segment_manager: Arc::clone(&segment_manager),
273            pending_builds: Arc::clone(&pending_builds),
274        });
275
276        // Create per-worker channels and spawn workers
277        let num_workers = config.num_indexing_threads.max(1);
278        let mut worker_senders = Vec::with_capacity(num_workers);
279        let mut workers = Vec::with_capacity(num_workers);
280
281        for _ in 0..num_workers {
282            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<WorkerMessage>();
283            worker_senders.push(tx);
284
285            let state = Arc::clone(&worker_state);
286            let handle = tokio::spawn(async move {
287                Self::worker_loop(state, rx).await;
288            });
289            workers.push(handle);
290        }
291
292        Self {
293            directory,
294            schema,
295            config,
296            builder_config,
297            tokenizers: FxHashMap::default(),
298            worker_senders,
299            next_worker: AtomicUsize::new(0),
300            workers,
301            worker_state,
302            segment_manager,
303            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
304            pending_builds,
305            global_memory_bytes,
306        }
307    }
308
309    /// Get the schema
310    pub fn schema(&self) -> &Schema {
311        &self.schema
312    }
313
314    /// Set tokenizer for a field
315    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
316        self.tokenizers.insert(field, Box::new(tokenizer));
317    }
318
319    /// Add a document to the indexing queue
320    ///
321    /// Documents are sent to per-worker unbounded channels.
322    /// This is O(1) and never blocks - returns immediately.
323    /// Workers handle the actual indexing in parallel.
324    pub fn add_document(&self, doc: Document) -> Result<DocId> {
325        // Round-robin select worker
326        let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
327        self.worker_senders[idx]
328            .send(WorkerMessage::Document(doc))
329            .map_err(|_| Error::Internal("Document channel closed".into()))?;
330        Ok(0)
331    }
332
333    /// Add multiple documents to the indexing queue
334    ///
335    /// Documents are distributed round-robin to workers.
336    /// Returns immediately - never blocks.
337    pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
338        let num_workers = self.worker_senders.len();
339        let count = documents.len();
340        let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
341        for (i, doc) in documents.into_iter().enumerate() {
342            let idx = (base + i) % num_workers;
343            let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
344        }
345        Ok(count)
346    }
347
348    /// Worker loop - polls messages from its own channel and indexes documents
349    async fn worker_loop(
350        state: Arc<WorkerState<D>>,
351        mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
352    ) {
353        let mut builder: Option<SegmentBuilder> = None;
354        let mut _doc_count = 0u32;
355
356        loop {
357            // Receive from own channel - no mutex contention
358            let msg = receiver.recv().await;
359
360            let Some(msg) = msg else {
361                // Channel closed - flush remaining docs and exit
362                if let Some(b) = builder.take()
363                    && b.num_docs() > 0
364                {
365                    Self::spawn_segment_build(&state, b);
366                }
367                return;
368            };
369
370            match msg {
371                WorkerMessage::Document(doc) => {
372                    // Initialize builder if needed
373                    if builder.is_none() {
374                        match SegmentBuilder::new(
375                            (*state.schema).clone(),
376                            state.builder_config.clone(),
377                        ) {
378                            Ok(mut b) => {
379                                for (field, tokenizer) in &state.tokenizers {
380                                    b.set_tokenizer(*field, tokenizer.clone_box());
381                                }
382                                builder = Some(b);
383                            }
384                            Err(e) => {
385                                eprintln!("Failed to create segment builder: {:?}", e);
386                                continue;
387                            }
388                        }
389                    }
390
391                    // Index the document
392                    let b = builder.as_mut().unwrap();
393                    if let Err(e) = b.add_document(doc) {
394                        eprintln!("Failed to index document: {:?}", e);
395                        continue;
396                    }
397
398                    _doc_count += 1;
399
400                    // Check memory after every document - O(1) with incremental tracking
401                    let per_worker_limit = state.config.max_indexing_memory_bytes
402                        / state.config.num_indexing_threads.max(1);
403                    let builder_memory = b.estimated_memory_bytes();
404
405                    // Log memory usage periodically
406                    if _doc_count.is_multiple_of(10_000) {
407                        log::debug!(
408                            "[indexing] docs={}, memory={:.2} MB, limit={:.2} MB",
409                            b.num_docs(),
410                            builder_memory as f64 / (1024.0 * 1024.0),
411                            per_worker_limit as f64 / (1024.0 * 1024.0)
412                        );
413                    }
414
415                    // Require minimum 100 docs before flushing to avoid tiny segments
416                    // (sparse vectors with many dims can hit memory limit quickly)
417                    const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
418                    let doc_count = b.num_docs();
419
420                    if builder_memory >= per_worker_limit && doc_count >= MIN_DOCS_BEFORE_FLUSH {
421                        // Get detailed stats for debugging memory issues
422                        let stats = b.stats();
423                        let mb = stats.memory_breakdown;
424                        log::info!(
425                            "[indexing] flushing segment: docs={}, est_mem={:.2} MB, actual_mem={:.2} MB, \
426                             postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB, interner={:.2} MB, \
427                             unique_terms={}, sparse_dims={}",
428                            doc_count,
429                            builder_memory as f64 / (1024.0 * 1024.0),
430                            stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
431                            mb.postings_bytes as f64 / (1024.0 * 1024.0),
432                            mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
433                            mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
434                            mb.interner_bytes as f64 / (1024.0 * 1024.0),
435                            stats.unique_terms,
436                            b.sparse_dim_count(),
437                        );
438                        let full_builder = builder.take().unwrap();
439                        Self::spawn_segment_build(&state, full_builder);
440                        _doc_count = 0;
441                    }
442                }
443                WorkerMessage::Flush(respond) => {
444                    // Flush current builder if it has documents
445                    if let Some(b) = builder.take()
446                        && b.num_docs() > 0
447                    {
448                        Self::spawn_segment_build(&state, b);
449                    }
450                    _doc_count = 0;
451                    // Signal that flush is complete for this worker
452                    let _ = respond.send(());
453                }
454            }
455        }
456    }
457    fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
458        let directory = Arc::clone(&state.directory);
459        let segment_id = SegmentId::new();
460        let segment_hex = segment_id.to_hex();
461        let sender = state.segment_id_sender.clone();
462        let segment_manager = Arc::clone(&state.segment_manager);
463        let pending_builds = Arc::clone(&state.pending_builds);
464
465        pending_builds.fetch_add(1, Ordering::SeqCst);
466
467        tokio::spawn(async move {
468            match builder.build(directory.as_ref(), segment_id).await {
469                Ok(_) => {
470                    let _ = segment_manager.register_segment(segment_hex.clone()).await;
471                }
472                Err(e) => {
473                    eprintln!("Background segment build failed: {:?}", e);
474                }
475            }
476            // Always send to channel and decrement - even on failure
477            // This ensures commit() doesn't hang waiting for messages
478            let _ = sender.send(segment_hex);
479            pending_builds.fetch_sub(1, Ordering::SeqCst);
480        });
481    }
482
483    /// Collect any completed segment IDs from the channel (non-blocking)
484    async fn collect_completed_segments(&self) {
485        let mut receiver = self.segment_id_receiver.lock().await;
486        while receiver.try_recv().is_ok() {
487            // Segment already registered by spawn_segment_build
488        }
489    }
490
491    /// Get the number of pending background builds
492    pub fn pending_build_count(&self) -> usize {
493        self.pending_builds.load(Ordering::SeqCst)
494    }
495
496    /// Get the number of pending background merges
497    pub fn pending_merge_count(&self) -> usize {
498        self.segment_manager.pending_merge_count()
499    }
500
501    /// Check merge policy and spawn background merges if needed
502    ///
503    /// This is called automatically after segment builds complete via SegmentManager.
504    /// Can also be called manually to trigger merge checking.
505    pub async fn maybe_merge(&self) {
506        self.segment_manager.maybe_merge().await;
507    }
508
509    /// Wait for all pending merges to complete
510    pub async fn wait_for_merges(&self) {
511        self.segment_manager.wait_for_merges().await;
512    }
513
514    /// Get the segment tracker for sharing with readers
515    /// This allows readers to acquire snapshots that prevent segment deletion
516    pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
517        self.segment_manager.tracker()
518    }
519
520    /// Acquire a snapshot of current segments for reading
521    /// The snapshot holds references - segments won't be deleted while snapshot exists
522    pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot<D> {
523        self.segment_manager.acquire_snapshot().await
524    }
525
526    /// Clean up orphan segment files that are not registered
527    ///
528    /// This can happen if the process halts after segment files are written
529    /// but before they are registered in segments.json. Call this after opening
530    /// an index to reclaim disk space from incomplete operations.
531    ///
532    /// Returns the number of orphan segments deleted.
533    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
534        self.segment_manager.cleanup_orphan_segments().await
535    }
536
537    /// Flush all workers - signals them to build their current segments
538    ///
539    /// Sends flush signals to all workers and waits for them to acknowledge.
540    /// Workers continue running and can accept new documents after flush.
541    pub async fn flush(&self) -> Result<()> {
542        // Send flush signal to each worker's channel
543        let mut responses = Vec::with_capacity(self.worker_senders.len());
544
545        for sender in &self.worker_senders {
546            let (tx, rx) = oneshot::channel();
547            if sender.send(WorkerMessage::Flush(tx)).is_err() {
548                // Channel closed, worker may have exited
549                continue;
550            }
551            responses.push(rx);
552        }
553
554        // Wait for all workers to acknowledge flush
555        for rx in responses {
556            let _ = rx.await;
557        }
558
559        // Collect any completed segments
560        self.collect_completed_segments().await;
561
562        Ok(())
563    }
564
565    /// Commit all pending segments to disk and wait for completion
566    ///
567    /// This flushes workers and waits for ALL background builds to complete.
568    /// Provides durability guarantees - all data is persisted.
569    ///
570    /// **Auto-triggers vector index build** when threshold is crossed for any field.
571    pub async fn commit(&self) -> Result<()> {
572        // Flush all workers first
573        self.flush().await?;
574
575        // Wait for all pending builds to complete
576        let mut receiver = self.segment_id_receiver.lock().await;
577        while self.pending_builds.load(Ordering::SeqCst) > 0 {
578            if receiver.recv().await.is_none() {
579                break; // Channel closed
580            }
581        }
582        drop(receiver);
583
584        // Auto-trigger vector index build if threshold crossed
585        self.maybe_build_vector_index().await?;
586
587        Ok(())
588    }
589
590    // Vector index building methods are in vector_builder.rs
591
592    /// Merge all segments into one (called explicitly via force_merge)
593    async fn do_merge(&self) -> Result<()> {
594        let segment_ids = self.segment_manager.get_segment_ids().await;
595
596        if segment_ids.len() < 2 {
597            return Ok(());
598        }
599
600        let ids_to_merge: Vec<String> = segment_ids.clone();
601        drop(segment_ids);
602
603        // Load segment readers
604        let mut readers = Vec::new();
605        let mut doc_offset = 0u32;
606
607        for id_str in &ids_to_merge {
608            let segment_id = SegmentId::from_hex(id_str)
609                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
610            let reader = SegmentReader::open(
611                self.directory.as_ref(),
612                segment_id,
613                Arc::clone(&self.schema),
614                doc_offset,
615                self.config.term_cache_blocks,
616            )
617            .await?;
618            doc_offset += reader.meta().num_docs;
619            readers.push(reader);
620        }
621
622        // Merge into new segment
623        let merger = SegmentMerger::new(Arc::clone(&self.schema));
624        let new_segment_id = SegmentId::new();
625        merger
626            .merge(self.directory.as_ref(), &readers, new_segment_id)
627            .await?;
628
629        // Atomically update segments and delete old ones via SegmentManager
630        self.segment_manager
631            .replace_segments(vec![new_segment_id.to_hex()], ids_to_merge)
632            .await?;
633
634        Ok(())
635    }
636
637    /// Force merge all segments into one
638    pub async fn force_merge(&self) -> Result<()> {
639        // First commit all pending documents (waits for completion)
640        self.commit().await?;
641        // Wait for any background merges to complete (avoid race with segment deletion)
642        self.wait_for_merges().await;
643        // Then merge all segments
644        self.do_merge().await
645    }
646
647    // Vector index methods (build_vector_index, rebuild_vector_index, etc.)
648    // are implemented in vector_builder.rs
649}