Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter - async document indexing with parallel segment building
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8use rustc_hash::FxHashMap;
9use tokio::sync::Mutex as AsyncMutex;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::JoinHandle;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{
18    SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
19};
20use crate::tokenizer::BoxedTokenizer;
21
22use super::IndexConfig;
23
24/// Message sent to worker tasks
25enum WorkerMessage {
26    /// A document to index
27    Document(Document),
28    /// Signal to flush current builder and respond when done
29    Flush(oneshot::Sender<()>),
30}
31
32/// Async IndexWriter for adding documents and committing segments
33///
34/// Features:
35/// - Queue-based parallel indexing with worker tasks
36/// - Streams documents to disk immediately (no in-memory document storage)
37/// - Uses string interning for terms (reduced allocations)
38/// - Uses hashbrown HashMap (faster than BTreeMap)
39///
40/// **Architecture:**
41/// - `add_document()` sends to per-worker unbounded channels (non-blocking)
42/// - Round-robin distribution across workers - no mutex contention
43/// - Each worker owns a SegmentBuilder and flushes when memory threshold is reached
44///
45/// **State management:**
46/// - Building segments: Managed here (pending_builds)
47/// - Committed segments + metadata: Managed by SegmentManager (sole owner of metadata.json)
48pub struct IndexWriter<D: DirectoryWriter + 'static> {
49    pub(super) directory: Arc<D>,
50    pub(super) schema: Arc<Schema>,
51    pub(super) config: IndexConfig,
52    #[allow(dead_code)] // Used for creating new builders in worker_state
53    builder_config: SegmentBuilderConfig,
54    tokenizers: FxHashMap<Field, BoxedTokenizer>,
55    /// Per-worker channel senders - round-robin distribution
56    worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
57    /// Round-robin counter for worker selection
58    next_worker: AtomicUsize,
59    /// Worker task handles - kept alive to prevent premature shutdown
60    #[allow(dead_code)]
61    workers: Vec<JoinHandle<()>>,
62    /// Shared state for workers
63    #[allow(dead_code)]
64    worker_state: Arc<WorkerState<D>>,
65    /// Segment manager - owns metadata.json, handles segments and background merging
66    pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
67    /// Channel receiver for completed segment IDs
68    segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<String>>,
69    /// Count of in-flight background builds
70    pending_builds: Arc<AtomicUsize>,
71    /// Global memory usage across all builders (bytes)
72    #[allow(dead_code)]
73    global_memory_bytes: Arc<AtomicUsize>,
74}
75
76/// Shared state for worker tasks
77struct WorkerState<D: DirectoryWriter + 'static> {
78    directory: Arc<D>,
79    schema: Arc<Schema>,
80    config: IndexConfig,
81    builder_config: SegmentBuilderConfig,
82    tokenizers: FxHashMap<Field, BoxedTokenizer>,
83    segment_id_sender: mpsc::UnboundedSender<String>,
84    segment_manager: Arc<crate::merge::SegmentManager<D>>,
85    pending_builds: Arc<AtomicUsize>,
86}
87
88impl<D: DirectoryWriter + 'static> IndexWriter<D> {
89    /// Create a new index in the directory
90    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
91        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
92    }
93
94    /// Create a new index with custom builder config
95    pub async fn create_with_config(
96        directory: D,
97        schema: Schema,
98        config: IndexConfig,
99        builder_config: SegmentBuilderConfig,
100    ) -> Result<Self> {
101        let directory = Arc::new(directory);
102        let schema = Arc::new(schema);
103
104        // Create channel for background builds to report completed segment IDs
105        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
106
107        // Initialize metadata with schema
108        let metadata = super::IndexMetadata::new((*schema).clone());
109
110        // Create segment manager - owns metadata.json
111        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
112            Arc::clone(&directory),
113            Arc::clone(&schema),
114            metadata,
115            config.merge_policy.clone_box(),
116            config.term_cache_blocks,
117        ));
118
119        // Save initial metadata
120        segment_manager.update_metadata(|_| {}).await?;
121
122        let pending_builds = Arc::new(AtomicUsize::new(0));
123        let global_memory_bytes = Arc::new(AtomicUsize::new(0));
124
125        // Create shared worker state
126        let worker_state = Arc::new(WorkerState {
127            directory: Arc::clone(&directory),
128            schema: Arc::clone(&schema),
129            config: config.clone(),
130            builder_config: builder_config.clone(),
131            tokenizers: FxHashMap::default(),
132            segment_id_sender,
133            segment_manager: Arc::clone(&segment_manager),
134            pending_builds: Arc::clone(&pending_builds),
135        });
136
137        // Create per-worker unbounded channels and spawn workers
138        let num_workers = config.num_indexing_threads.max(1);
139        let mut worker_senders = Vec::with_capacity(num_workers);
140        let mut workers = Vec::with_capacity(num_workers);
141
142        for _ in 0..num_workers {
143            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
144            worker_senders.push(tx);
145
146            let state = Arc::clone(&worker_state);
147            let handle = tokio::spawn(async move {
148                Self::worker_loop(state, rx).await;
149            });
150            workers.push(handle);
151        }
152
153        Ok(Self {
154            directory,
155            schema,
156            config,
157            builder_config,
158            tokenizers: FxHashMap::default(),
159            worker_senders,
160            next_worker: AtomicUsize::new(0),
161            workers,
162            worker_state,
163            segment_manager,
164            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
165            pending_builds,
166            global_memory_bytes,
167        })
168    }
169
170    /// Open an existing index for writing
171    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
172        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
173    }
174
175    /// Open an existing index with custom builder config
176    pub async fn open_with_config(
177        directory: D,
178        config: IndexConfig,
179        builder_config: SegmentBuilderConfig,
180    ) -> Result<Self> {
181        let directory = Arc::new(directory);
182
183        // Load unified metadata (includes schema)
184        let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
185        let schema = Arc::new(metadata.schema.clone());
186
187        // Create channel for background builds to report completed segment IDs
188        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
189
190        // Create segment manager - owns metadata.json
191        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
192            Arc::clone(&directory),
193            Arc::clone(&schema),
194            metadata,
195            config.merge_policy.clone_box(),
196            config.term_cache_blocks,
197        ));
198
199        let pending_builds = Arc::new(AtomicUsize::new(0));
200        let global_memory_bytes = Arc::new(AtomicUsize::new(0));
201
202        // Create shared worker state
203        let worker_state = Arc::new(WorkerState {
204            directory: Arc::clone(&directory),
205            schema: Arc::clone(&schema),
206            config: config.clone(),
207            builder_config: builder_config.clone(),
208            tokenizers: FxHashMap::default(),
209            segment_id_sender,
210            segment_manager: Arc::clone(&segment_manager),
211            pending_builds: Arc::clone(&pending_builds),
212        });
213
214        // Create per-worker unbounded channels and spawn workers
215        let num_workers = config.num_indexing_threads.max(1);
216        let mut worker_senders = Vec::with_capacity(num_workers);
217        let mut workers = Vec::with_capacity(num_workers);
218
219        for _ in 0..num_workers {
220            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
221            worker_senders.push(tx);
222
223            let state = Arc::clone(&worker_state);
224            let handle = tokio::spawn(async move {
225                Self::worker_loop(state, rx).await;
226            });
227            workers.push(handle);
228        }
229
230        Ok(Self {
231            directory,
232            schema,
233            config,
234            builder_config,
235            tokenizers: FxHashMap::default(),
236            worker_senders,
237            next_worker: AtomicUsize::new(0),
238            workers,
239            worker_state,
240            segment_manager,
241            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
242            pending_builds,
243            global_memory_bytes,
244        })
245    }
246
247    /// Create an IndexWriter from an existing Index
248    ///
249    /// This shares the SegmentManager with the Index, ensuring consistent
250    /// segment lifecycle management.
251    pub fn from_index(index: &super::Index<D>) -> Self {
252        let segment_manager = Arc::clone(&index.segment_manager);
253        let directory = Arc::clone(&index.directory);
254        let schema = Arc::clone(&index.schema);
255        let config = index.config.clone();
256        let builder_config = crate::segment::SegmentBuilderConfig::default();
257
258        // Create channel for background builds
259        let (segment_id_sender, segment_id_receiver) = tokio::sync::mpsc::unbounded_channel();
260
261        let pending_builds = Arc::new(AtomicUsize::new(0));
262        let global_memory_bytes = Arc::new(AtomicUsize::new(0));
263
264        // Create shared worker state
265        let worker_state = Arc::new(WorkerState {
266            directory: Arc::clone(&directory),
267            schema: Arc::clone(&schema),
268            config: config.clone(),
269            builder_config: builder_config.clone(),
270            tokenizers: FxHashMap::default(),
271            segment_id_sender,
272            segment_manager: Arc::clone(&segment_manager),
273            pending_builds: Arc::clone(&pending_builds),
274        });
275
276        // Create per-worker channels and spawn workers
277        let num_workers = config.num_indexing_threads.max(1);
278        let mut worker_senders = Vec::with_capacity(num_workers);
279        let mut workers = Vec::with_capacity(num_workers);
280
281        for _ in 0..num_workers {
282            let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<WorkerMessage>();
283            worker_senders.push(tx);
284
285            let state = Arc::clone(&worker_state);
286            let handle = tokio::spawn(async move {
287                Self::worker_loop(state, rx).await;
288            });
289            workers.push(handle);
290        }
291
292        Self {
293            directory,
294            schema,
295            config,
296            builder_config,
297            tokenizers: FxHashMap::default(),
298            worker_senders,
299            next_worker: AtomicUsize::new(0),
300            workers,
301            worker_state,
302            segment_manager,
303            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
304            pending_builds,
305            global_memory_bytes,
306        }
307    }
308
309    /// Get the schema
310    pub fn schema(&self) -> &Schema {
311        &self.schema
312    }
313
314    /// Set tokenizer for a field
315    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
316        self.tokenizers.insert(field, Box::new(tokenizer));
317    }
318
319    /// Add a document to the indexing queue
320    ///
321    /// Documents are sent to per-worker unbounded channels.
322    /// This is O(1) and never blocks - returns immediately.
323    /// Workers handle the actual indexing in parallel.
324    pub fn add_document(&self, doc: Document) -> Result<DocId> {
325        // Round-robin select worker
326        let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
327        self.worker_senders[idx]
328            .send(WorkerMessage::Document(doc))
329            .map_err(|_| Error::Internal("Document channel closed".into()))?;
330        Ok(0)
331    }
332
333    /// Add multiple documents to the indexing queue
334    ///
335    /// Documents are distributed round-robin to workers.
336    /// Returns immediately - never blocks.
337    pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
338        let num_workers = self.worker_senders.len();
339        let count = documents.len();
340        let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
341        for (i, doc) in documents.into_iter().enumerate() {
342            let idx = (base + i) % num_workers;
343            let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
344        }
345        Ok(count)
346    }
347
348    /// Worker loop - polls messages from its own channel and indexes documents
349    async fn worker_loop(
350        state: Arc<WorkerState<D>>,
351        mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
352    ) {
353        let mut builder: Option<SegmentBuilder> = None;
354        let mut doc_count = 0u32;
355
356        loop {
357            // Receive from own channel - no mutex contention
358            let msg = receiver.recv().await;
359
360            let Some(msg) = msg else {
361                // Channel closed - flush remaining docs and exit
362                if let Some(b) = builder.take()
363                    && b.num_docs() > 0
364                {
365                    Self::spawn_segment_build(&state, b);
366                }
367                return;
368            };
369
370            match msg {
371                WorkerMessage::Document(doc) => {
372                    // Initialize builder if needed
373                    if builder.is_none() {
374                        match SegmentBuilder::new(
375                            (*state.schema).clone(),
376                            state.builder_config.clone(),
377                        ) {
378                            Ok(mut b) => {
379                                for (field, tokenizer) in &state.tokenizers {
380                                    b.set_tokenizer(*field, tokenizer.clone_box());
381                                }
382                                builder = Some(b);
383                            }
384                            Err(e) => {
385                                eprintln!("Failed to create segment builder: {:?}", e);
386                                continue;
387                            }
388                        }
389                    }
390
391                    // Index the document
392                    let b = builder.as_mut().unwrap();
393                    if let Err(e) = b.add_document(doc) {
394                        eprintln!("Failed to index document: {:?}", e);
395                        continue;
396                    }
397
398                    doc_count += 1;
399
400                    // Check memory periodically
401                    // Use smaller interval for small memory limits (for testing)
402                    let per_worker_limit = state.config.max_indexing_memory_bytes
403                        / state.config.num_indexing_threads.max(1);
404                    let check_interval = if per_worker_limit < 1024 * 1024 {
405                        1
406                    } else {
407                        100
408                    };
409
410                    if doc_count.is_multiple_of(check_interval) {
411                        let builder_memory = b.stats().estimated_memory_bytes;
412
413                        if builder_memory >= per_worker_limit {
414                            let full_builder = builder.take().unwrap();
415                            Self::spawn_segment_build(&state, full_builder);
416                            doc_count = 0;
417                        }
418                    }
419                }
420                WorkerMessage::Flush(respond) => {
421                    // Flush current builder if it has documents
422                    if let Some(b) = builder.take()
423                        && b.num_docs() > 0
424                    {
425                        Self::spawn_segment_build(&state, b);
426                    }
427                    doc_count = 0;
428                    // Signal that flush is complete for this worker
429                    let _ = respond.send(());
430                }
431            }
432        }
433    }
434    fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
435        let directory = Arc::clone(&state.directory);
436        let segment_id = SegmentId::new();
437        let segment_hex = segment_id.to_hex();
438        let sender = state.segment_id_sender.clone();
439        let segment_manager = Arc::clone(&state.segment_manager);
440        let pending_builds = Arc::clone(&state.pending_builds);
441
442        pending_builds.fetch_add(1, Ordering::SeqCst);
443
444        tokio::spawn(async move {
445            match builder.build(directory.as_ref(), segment_id).await {
446                Ok(_) => {
447                    let _ = segment_manager.register_segment(segment_hex.clone()).await;
448                }
449                Err(e) => {
450                    eprintln!("Background segment build failed: {:?}", e);
451                }
452            }
453            // Always send to channel and decrement - even on failure
454            // This ensures commit() doesn't hang waiting for messages
455            let _ = sender.send(segment_hex);
456            pending_builds.fetch_sub(1, Ordering::SeqCst);
457        });
458    }
459
460    /// Collect any completed segment IDs from the channel (non-blocking)
461    async fn collect_completed_segments(&self) {
462        let mut receiver = self.segment_id_receiver.lock().await;
463        while receiver.try_recv().is_ok() {
464            // Segment already registered by spawn_segment_build
465        }
466    }
467
468    /// Get the number of pending background builds
469    pub fn pending_build_count(&self) -> usize {
470        self.pending_builds.load(Ordering::SeqCst)
471    }
472
473    /// Get the number of pending background merges
474    pub fn pending_merge_count(&self) -> usize {
475        self.segment_manager.pending_merge_count()
476    }
477
478    /// Check merge policy and spawn background merges if needed
479    ///
480    /// This is called automatically after segment builds complete via SegmentManager.
481    /// Can also be called manually to trigger merge checking.
482    pub async fn maybe_merge(&self) {
483        self.segment_manager.maybe_merge().await;
484    }
485
486    /// Wait for all pending merges to complete
487    pub async fn wait_for_merges(&self) {
488        self.segment_manager.wait_for_merges().await;
489    }
490
491    /// Get the segment tracker for sharing with readers
492    /// This allows readers to acquire snapshots that prevent segment deletion
493    pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
494        self.segment_manager.tracker()
495    }
496
497    /// Acquire a snapshot of current segments for reading
498    /// The snapshot holds references - segments won't be deleted while snapshot exists
499    pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot<D> {
500        self.segment_manager.acquire_snapshot().await
501    }
502
503    /// Clean up orphan segment files that are not registered
504    ///
505    /// This can happen if the process halts after segment files are written
506    /// but before they are registered in segments.json. Call this after opening
507    /// an index to reclaim disk space from incomplete operations.
508    ///
509    /// Returns the number of orphan segments deleted.
510    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
511        self.segment_manager.cleanup_orphan_segments().await
512    }
513
514    /// Flush all workers - signals them to build their current segments
515    ///
516    /// Sends flush signals to all workers and waits for them to acknowledge.
517    /// Workers continue running and can accept new documents after flush.
518    pub async fn flush(&self) -> Result<()> {
519        // Send flush signal to each worker's channel
520        let mut responses = Vec::with_capacity(self.worker_senders.len());
521
522        for sender in &self.worker_senders {
523            let (tx, rx) = oneshot::channel();
524            if sender.send(WorkerMessage::Flush(tx)).is_err() {
525                // Channel closed, worker may have exited
526                continue;
527            }
528            responses.push(rx);
529        }
530
531        // Wait for all workers to acknowledge flush
532        for rx in responses {
533            let _ = rx.await;
534        }
535
536        // Collect any completed segments
537        self.collect_completed_segments().await;
538
539        Ok(())
540    }
541
542    /// Commit all pending segments to disk and wait for completion
543    ///
544    /// This flushes workers and waits for ALL background builds to complete.
545    /// Provides durability guarantees - all data is persisted.
546    ///
547    /// **Auto-triggers vector index build** when threshold is crossed for any field.
548    pub async fn commit(&self) -> Result<()> {
549        // Flush all workers first
550        self.flush().await?;
551
552        // Wait for all pending builds to complete
553        let mut receiver = self.segment_id_receiver.lock().await;
554        while self.pending_builds.load(Ordering::SeqCst) > 0 {
555            if receiver.recv().await.is_none() {
556                break; // Channel closed
557            }
558        }
559        drop(receiver);
560
561        // Auto-trigger vector index build if threshold crossed
562        self.maybe_build_vector_index().await?;
563
564        Ok(())
565    }
566
567    // Vector index building methods are in vector_builder.rs
568
569    /// Merge all segments into one (called explicitly via force_merge)
570    async fn do_merge(&self) -> Result<()> {
571        let segment_ids = self.segment_manager.get_segment_ids().await;
572
573        if segment_ids.len() < 2 {
574            return Ok(());
575        }
576
577        let ids_to_merge: Vec<String> = segment_ids.clone();
578        drop(segment_ids);
579
580        // Load segment readers
581        let mut readers = Vec::new();
582        let mut doc_offset = 0u32;
583
584        for id_str in &ids_to_merge {
585            let segment_id = SegmentId::from_hex(id_str)
586                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
587            let reader = SegmentReader::open(
588                self.directory.as_ref(),
589                segment_id,
590                Arc::clone(&self.schema),
591                doc_offset,
592                self.config.term_cache_blocks,
593            )
594            .await?;
595            doc_offset += reader.meta().num_docs;
596            readers.push(reader);
597        }
598
599        // Merge into new segment
600        let merger = SegmentMerger::new(Arc::clone(&self.schema));
601        let new_segment_id = SegmentId::new();
602        merger
603            .merge(self.directory.as_ref(), &readers, new_segment_id)
604            .await?;
605
606        // Atomically update segments and delete old ones via SegmentManager
607        self.segment_manager
608            .replace_segments(vec![new_segment_id.to_hex()], ids_to_merge)
609            .await?;
610
611        Ok(())
612    }
613
614    /// Force merge all segments into one
615    pub async fn force_merge(&self) -> Result<()> {
616        // First commit all pending documents (waits for completion)
617        self.commit().await?;
618        // Wait for any background merges to complete (avoid race with segment deletion)
619        self.wait_for_merges().await;
620        // Then merge all segments
621        self.do_merge().await
622    }
623
624    // Vector index methods (build_vector_index, rebuild_vector_index, etc.)
625    // are implemented in vector_builder.rs
626}