Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter - async document indexing with parallel segment building
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use rustc_hash::FxHashMap;
10use tokio::sync::Mutex as AsyncMutex;
11use tokio::sync::{mpsc, oneshot};
12use tokio::task::JoinHandle;
13
14use crate::DocId;
15use crate::directories::DirectoryWriter;
16use crate::dsl::{Document, Field, Schema};
17use crate::error::{Error, Result};
18use crate::segment::{
19    SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
20};
21use crate::tokenizer::BoxedTokenizer;
22
23use super::IndexConfig;
24
25/// Message sent to worker tasks
26enum WorkerMessage {
27    /// A document to index
28    Document(Document),
29    /// Signal to flush current builder and respond when done
30    Flush(oneshot::Sender<()>),
31}
32
33/// Async IndexWriter for adding documents and committing segments
34///
35/// Features:
36/// - Queue-based parallel indexing with worker tasks
37/// - Streams documents to disk immediately (no in-memory document storage)
38/// - Uses string interning for terms (reduced allocations)
39/// - Uses hashbrown HashMap (faster than BTreeMap)
40///
41/// **Architecture:**
42/// - `add_document()` sends to per-worker unbounded channels (non-blocking)
43/// - Round-robin distribution across workers - no mutex contention
44/// - Each worker owns a SegmentBuilder and flushes when memory threshold is reached
45///
46/// **State management:**
47/// - Building segments: Managed here (pending_builds)
48/// - Committed segments + metadata: Managed by SegmentManager (sole owner of metadata.json)
49pub struct IndexWriter<D: DirectoryWriter + 'static> {
50    pub(super) directory: Arc<D>,
51    pub(super) schema: Arc<Schema>,
52    pub(super) config: IndexConfig,
53    #[allow(dead_code)] // Used for creating new builders in worker_state
54    builder_config: SegmentBuilderConfig,
55    tokenizers: FxHashMap<Field, BoxedTokenizer>,
56    /// Per-worker channel senders - round-robin distribution
57    worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
58    /// Round-robin counter for worker selection
59    next_worker: AtomicUsize,
60    /// Worker task handles - kept alive to prevent premature shutdown
61    #[allow(dead_code)]
62    workers: Vec<JoinHandle<()>>,
63    /// Shared state for workers
64    #[allow(dead_code)]
65    worker_state: Arc<WorkerState<D>>,
66    /// Segment manager - owns metadata.json, handles segments and background merging
67    pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
68    /// Channel receiver for completed segment IDs
69    segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<String>>,
70    /// Count of in-flight background builds
71    pending_builds: Arc<AtomicUsize>,
72    /// Global memory usage across all builders (bytes)
73    #[allow(dead_code)]
74    global_memory_bytes: Arc<AtomicUsize>,
75}
76
77/// Shared state for worker tasks
78struct WorkerState<D: DirectoryWriter + 'static> {
79    directory: Arc<D>,
80    schema: Arc<Schema>,
81    config: IndexConfig,
82    builder_config: SegmentBuilderConfig,
83    tokenizers: FxHashMap<Field, BoxedTokenizer>,
84    segment_id_sender: mpsc::UnboundedSender<String>,
85    segment_manager: Arc<crate::merge::SegmentManager<D>>,
86    pending_builds: Arc<AtomicUsize>,
87}
88
89impl<D: DirectoryWriter + 'static> IndexWriter<D> {
90    /// Create a new index in the directory
91    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
92        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
93    }
94
95    /// Create a new index with custom builder config
96    pub async fn create_with_config(
97        directory: D,
98        schema: Schema,
99        config: IndexConfig,
100        builder_config: SegmentBuilderConfig,
101    ) -> Result<Self> {
102        let directory = Arc::new(directory);
103        let schema = Arc::new(schema);
104
105        // Write schema
106        let schema_bytes =
107            serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
108        directory
109            .write(Path::new("schema.json"), &schema_bytes)
110            .await?;
111
112        // Write empty segments list
113        let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
114            .map_err(|e| Error::Serialization(e.to_string()))?;
115        directory
116            .write(Path::new("segments.json"), &segments_bytes)
117            .await?;
118
119        // Create channel for background builds to report completed segment IDs
120        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
121
122        // Initialize empty metadata for new index
123        let metadata = super::IndexMetadata::new();
124
125        // Create segment manager - owns metadata.json
126        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
127            Arc::clone(&directory),
128            Arc::clone(&schema),
129            metadata,
130            config.merge_policy.clone_box(),
131            config.term_cache_blocks,
132        ));
133
134        // Save initial metadata
135        segment_manager.update_metadata(|_| {}).await?;
136
137        let pending_builds = Arc::new(AtomicUsize::new(0));
138        let global_memory_bytes = Arc::new(AtomicUsize::new(0));
139
140        // Create shared worker state
141        let worker_state = Arc::new(WorkerState {
142            directory: Arc::clone(&directory),
143            schema: Arc::clone(&schema),
144            config: config.clone(),
145            builder_config: builder_config.clone(),
146            tokenizers: FxHashMap::default(),
147            segment_id_sender,
148            segment_manager: Arc::clone(&segment_manager),
149            pending_builds: Arc::clone(&pending_builds),
150        });
151
152        // Create per-worker unbounded channels and spawn workers
153        let num_workers = config.num_indexing_threads.max(1);
154        let mut worker_senders = Vec::with_capacity(num_workers);
155        let mut workers = Vec::with_capacity(num_workers);
156
157        for _ in 0..num_workers {
158            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
159            worker_senders.push(tx);
160
161            let state = Arc::clone(&worker_state);
162            let handle = tokio::spawn(async move {
163                Self::worker_loop(state, rx).await;
164            });
165            workers.push(handle);
166        }
167
168        Ok(Self {
169            directory,
170            schema,
171            config,
172            builder_config,
173            tokenizers: FxHashMap::default(),
174            worker_senders,
175            next_worker: AtomicUsize::new(0),
176            workers,
177            worker_state,
178            segment_manager,
179            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
180            pending_builds,
181            global_memory_bytes,
182        })
183    }
184
185    /// Open an existing index for writing
186    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
187        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
188    }
189
190    /// Open an existing index with custom builder config
191    pub async fn open_with_config(
192        directory: D,
193        config: IndexConfig,
194        builder_config: SegmentBuilderConfig,
195    ) -> Result<Self> {
196        let directory = Arc::new(directory);
197
198        // Read schema
199        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
200        let schema_bytes = schema_slice.read_bytes().await?;
201        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
202            .map_err(|e| Error::Serialization(e.to_string()))?;
203        let schema = Arc::new(schema);
204
205        // Load unified metadata
206        let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
207
208        // Create channel for background builds to report completed segment IDs
209        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
210
211        // Create segment manager - owns metadata.json
212        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
213            Arc::clone(&directory),
214            Arc::clone(&schema),
215            metadata,
216            config.merge_policy.clone_box(),
217            config.term_cache_blocks,
218        ));
219
220        let pending_builds = Arc::new(AtomicUsize::new(0));
221        let global_memory_bytes = Arc::new(AtomicUsize::new(0));
222
223        // Create shared worker state
224        let worker_state = Arc::new(WorkerState {
225            directory: Arc::clone(&directory),
226            schema: Arc::clone(&schema),
227            config: config.clone(),
228            builder_config: builder_config.clone(),
229            tokenizers: FxHashMap::default(),
230            segment_id_sender,
231            segment_manager: Arc::clone(&segment_manager),
232            pending_builds: Arc::clone(&pending_builds),
233        });
234
235        // Create per-worker unbounded channels and spawn workers
236        let num_workers = config.num_indexing_threads.max(1);
237        let mut worker_senders = Vec::with_capacity(num_workers);
238        let mut workers = Vec::with_capacity(num_workers);
239
240        for _ in 0..num_workers {
241            let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
242            worker_senders.push(tx);
243
244            let state = Arc::clone(&worker_state);
245            let handle = tokio::spawn(async move {
246                Self::worker_loop(state, rx).await;
247            });
248            workers.push(handle);
249        }
250
251        Ok(Self {
252            directory,
253            schema,
254            config,
255            builder_config,
256            tokenizers: FxHashMap::default(),
257            worker_senders,
258            next_worker: AtomicUsize::new(0),
259            workers,
260            worker_state,
261            segment_manager,
262            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
263            pending_builds,
264            global_memory_bytes,
265        })
266    }
267
268    /// Get the schema
269    pub fn schema(&self) -> &Schema {
270        &self.schema
271    }
272
273    /// Set tokenizer for a field
274    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
275        self.tokenizers.insert(field, Box::new(tokenizer));
276    }
277
278    /// Add a document to the indexing queue
279    ///
280    /// Documents are sent to per-worker unbounded channels.
281    /// This is O(1) and never blocks - returns immediately.
282    /// Workers handle the actual indexing in parallel.
283    pub fn add_document(&self, doc: Document) -> Result<DocId> {
284        // Round-robin select worker
285        let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
286        self.worker_senders[idx]
287            .send(WorkerMessage::Document(doc))
288            .map_err(|_| Error::Internal("Document channel closed".into()))?;
289        Ok(0)
290    }
291
292    /// Add multiple documents to the indexing queue
293    ///
294    /// Documents are distributed round-robin to workers.
295    /// Returns immediately - never blocks.
296    pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
297        let num_workers = self.worker_senders.len();
298        let count = documents.len();
299        let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
300        for (i, doc) in documents.into_iter().enumerate() {
301            let idx = (base + i) % num_workers;
302            let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
303        }
304        Ok(count)
305    }
306
307    /// Worker loop - polls messages from its own channel and indexes documents
308    async fn worker_loop(
309        state: Arc<WorkerState<D>>,
310        mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
311    ) {
312        let mut builder: Option<SegmentBuilder> = None;
313        let mut doc_count = 0u32;
314
315        loop {
316            // Receive from own channel - no mutex contention
317            let msg = receiver.recv().await;
318
319            let Some(msg) = msg else {
320                // Channel closed - flush remaining docs and exit
321                if let Some(b) = builder.take()
322                    && b.num_docs() > 0
323                {
324                    Self::spawn_segment_build(&state, b);
325                }
326                return;
327            };
328
329            match msg {
330                WorkerMessage::Document(doc) => {
331                    // Initialize builder if needed
332                    if builder.is_none() {
333                        match SegmentBuilder::new(
334                            (*state.schema).clone(),
335                            state.builder_config.clone(),
336                        ) {
337                            Ok(mut b) => {
338                                for (field, tokenizer) in &state.tokenizers {
339                                    b.set_tokenizer(*field, tokenizer.clone_box());
340                                }
341                                builder = Some(b);
342                            }
343                            Err(e) => {
344                                eprintln!("Failed to create segment builder: {:?}", e);
345                                continue;
346                            }
347                        }
348                    }
349
350                    // Index the document
351                    let b = builder.as_mut().unwrap();
352                    if let Err(e) = b.add_document(doc) {
353                        eprintln!("Failed to index document: {:?}", e);
354                        continue;
355                    }
356
357                    doc_count += 1;
358
359                    // Check memory periodically
360                    // Use smaller interval for small memory limits (for testing)
361                    let per_worker_limit = state.config.max_indexing_memory_bytes
362                        / state.config.num_indexing_threads.max(1);
363                    let check_interval = if per_worker_limit < 1024 * 1024 {
364                        1
365                    } else {
366                        100
367                    };
368
369                    if doc_count.is_multiple_of(check_interval) {
370                        let builder_memory = b.stats().estimated_memory_bytes;
371
372                        if builder_memory >= per_worker_limit {
373                            let full_builder = builder.take().unwrap();
374                            Self::spawn_segment_build(&state, full_builder);
375                            doc_count = 0;
376                        }
377                    }
378                }
379                WorkerMessage::Flush(respond) => {
380                    // Flush current builder if it has documents
381                    if let Some(b) = builder.take()
382                        && b.num_docs() > 0
383                    {
384                        Self::spawn_segment_build(&state, b);
385                    }
386                    doc_count = 0;
387                    // Signal that flush is complete for this worker
388                    let _ = respond.send(());
389                }
390            }
391        }
392    }
393    fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
394        let directory = Arc::clone(&state.directory);
395        let segment_id = SegmentId::new();
396        let segment_hex = segment_id.to_hex();
397        let sender = state.segment_id_sender.clone();
398        let segment_manager = Arc::clone(&state.segment_manager);
399        let pending_builds = Arc::clone(&state.pending_builds);
400
401        pending_builds.fetch_add(1, Ordering::SeqCst);
402
403        tokio::spawn(async move {
404            match builder.build(directory.as_ref(), segment_id).await {
405                Ok(_) => {
406                    let _ = segment_manager.register_segment(segment_hex.clone()).await;
407                }
408                Err(e) => {
409                    eprintln!("Background segment build failed: {:?}", e);
410                }
411            }
412            // Always send to channel and decrement - even on failure
413            // This ensures commit() doesn't hang waiting for messages
414            let _ = sender.send(segment_hex);
415            pending_builds.fetch_sub(1, Ordering::SeqCst);
416        });
417    }
418
419    /// Collect any completed segment IDs from the channel (non-blocking)
420    async fn collect_completed_segments(&self) {
421        let mut receiver = self.segment_id_receiver.lock().await;
422        while receiver.try_recv().is_ok() {
423            // Segment already registered by spawn_segment_build
424        }
425    }
426
427    /// Get the number of pending background builds
428    pub fn pending_build_count(&self) -> usize {
429        self.pending_builds.load(Ordering::SeqCst)
430    }
431
432    /// Get the number of pending background merges
433    pub fn pending_merge_count(&self) -> usize {
434        self.segment_manager.pending_merge_count()
435    }
436
437    /// Check merge policy and spawn background merges if needed
438    ///
439    /// This is called automatically after segment builds complete via SegmentManager.
440    /// Can also be called manually to trigger merge checking.
441    pub async fn maybe_merge(&self) {
442        self.segment_manager.maybe_merge().await;
443    }
444
445    /// Wait for all pending merges to complete
446    pub async fn wait_for_merges(&self) {
447        self.segment_manager.wait_for_merges().await;
448    }
449
450    /// Clean up orphan segment files that are not registered
451    ///
452    /// This can happen if the process halts after segment files are written
453    /// but before they are registered in segments.json. Call this after opening
454    /// an index to reclaim disk space from incomplete operations.
455    ///
456    /// Returns the number of orphan segments deleted.
457    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
458        self.segment_manager.cleanup_orphan_segments().await
459    }
460
461    /// Flush all workers - signals them to build their current segments
462    ///
463    /// Sends flush signals to all workers and waits for them to acknowledge.
464    /// Workers continue running and can accept new documents after flush.
465    pub async fn flush(&self) -> Result<()> {
466        // Send flush signal to each worker's channel
467        let mut responses = Vec::with_capacity(self.worker_senders.len());
468
469        for sender in &self.worker_senders {
470            let (tx, rx) = oneshot::channel();
471            if sender.send(WorkerMessage::Flush(tx)).is_err() {
472                // Channel closed, worker may have exited
473                continue;
474            }
475            responses.push(rx);
476        }
477
478        // Wait for all workers to acknowledge flush
479        for rx in responses {
480            let _ = rx.await;
481        }
482
483        // Collect any completed segments
484        self.collect_completed_segments().await;
485
486        Ok(())
487    }
488
489    /// Commit all pending segments to disk and wait for completion
490    ///
491    /// This flushes workers and waits for ALL background builds to complete.
492    /// Provides durability guarantees - all data is persisted.
493    ///
494    /// **Auto-triggers vector index build** when threshold is crossed for any field.
495    pub async fn commit(&self) -> Result<()> {
496        // Flush all workers first
497        self.flush().await?;
498
499        // Wait for all pending builds to complete
500        let mut receiver = self.segment_id_receiver.lock().await;
501        while self.pending_builds.load(Ordering::SeqCst) > 0 {
502            if receiver.recv().await.is_none() {
503                break; // Channel closed
504            }
505        }
506        drop(receiver);
507
508        // Auto-trigger vector index build if threshold crossed
509        self.maybe_build_vector_index().await?;
510
511        Ok(())
512    }
513
514    // Vector index building methods are in vector_builder.rs
515
516    /// Merge all segments into one (called explicitly via force_merge)
517    async fn do_merge(&self) -> Result<()> {
518        let segment_ids = self.segment_manager.get_segment_ids().await;
519
520        if segment_ids.len() < 2 {
521            return Ok(());
522        }
523
524        let ids_to_merge: Vec<String> = segment_ids.clone();
525        drop(segment_ids);
526
527        // Load segment readers
528        let mut readers = Vec::new();
529        let mut doc_offset = 0u32;
530
531        for id_str in &ids_to_merge {
532            let segment_id = SegmentId::from_hex(id_str)
533                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
534            let reader = SegmentReader::open(
535                self.directory.as_ref(),
536                segment_id,
537                Arc::clone(&self.schema),
538                doc_offset,
539                self.config.term_cache_blocks,
540            )
541            .await?;
542            doc_offset += reader.meta().num_docs;
543            readers.push(reader);
544        }
545
546        // Merge into new segment
547        let merger = SegmentMerger::new(Arc::clone(&self.schema));
548        let new_segment_id = SegmentId::new();
549        merger
550            .merge(self.directory.as_ref(), &readers, new_segment_id)
551            .await?;
552
553        // Atomically update segments and delete old ones via SegmentManager
554        self.segment_manager
555            .replace_segments(vec![new_segment_id.to_hex()], ids_to_merge)
556            .await?;
557
558        Ok(())
559    }
560
561    /// Force merge all segments into one
562    pub async fn force_merge(&self) -> Result<()> {
563        // First commit all pending documents (waits for completion)
564        self.commit().await?;
565        // Then merge all segments
566        self.do_merge().await
567    }
568
569    // Vector index methods (build_vector_index, rebuild_vector_index, etc.)
570    // are implemented in vector_builder.rs
571}