Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter - async document indexing with parallel segment building
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use rustc_hash::FxHashMap;
10use tokio::sync::Mutex as AsyncMutex;
11use tokio::sync::mpsc;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{
18    SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
19};
20use crate::tokenizer::BoxedTokenizer;
21
22use super::IndexConfig;
23
24/// Async IndexWriter for adding documents and committing segments
25///
26/// Features:
27/// - Parallel indexing with multiple segment builders
28/// - Streams documents to disk immediately (no in-memory document storage)
29/// - Uses string interning for terms (reduced allocations)
30/// - Uses hashbrown HashMap (faster than BTreeMap)
31///
32/// **State management:**
33/// - Building segments: Managed here (pending_builds)
34/// - Committed segments + metadata: Managed by SegmentManager (sole owner of metadata.json)
35pub struct IndexWriter<D: DirectoryWriter + 'static> {
36    pub(super) directory: Arc<D>,
37    pub(super) schema: Arc<Schema>,
38    pub(super) config: IndexConfig,
39    builder_config: SegmentBuilderConfig,
40    tokenizers: FxHashMap<Field, BoxedTokenizer>,
41    /// Multiple segment builders for parallel indexing
42    builders: Vec<AsyncMutex<Option<SegmentBuilder>>>,
43    /// Segment manager - owns metadata.json, handles segments and background merging
44    pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
45    /// Channel sender for completed segment IDs from background builds
46    segment_id_sender: mpsc::UnboundedSender<String>,
47    /// Channel receiver for completed segment IDs
48    segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<String>>,
49    /// Count of in-flight background builds
50    pending_builds: Arc<AtomicUsize>,
51}
52
53impl<D: DirectoryWriter + 'static> IndexWriter<D> {
54    /// Create a new index in the directory
55    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
56        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
57    }
58
59    /// Create a new index with custom builder config
60    pub async fn create_with_config(
61        directory: D,
62        schema: Schema,
63        config: IndexConfig,
64        builder_config: SegmentBuilderConfig,
65    ) -> Result<Self> {
66        let directory = Arc::new(directory);
67        let schema = Arc::new(schema);
68
69        // Write schema
70        let schema_bytes =
71            serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
72        directory
73            .write(Path::new("schema.json"), &schema_bytes)
74            .await?;
75
76        // Write empty segments list
77        let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
78            .map_err(|e| Error::Serialization(e.to_string()))?;
79        directory
80            .write(Path::new("segments.json"), &segments_bytes)
81            .await?;
82
83        // Create multiple builders for parallel indexing
84        let num_builders = config.num_indexing_threads.max(1);
85        let mut builders = Vec::with_capacity(num_builders);
86        for _ in 0..num_builders {
87            builders.push(AsyncMutex::new(None));
88        }
89
90        // Create channel for background builds to report completed segment IDs
91        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
92
93        // Initialize empty metadata for new index
94        let metadata = super::IndexMetadata::new();
95
96        // Create segment manager - owns metadata.json
97        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
98            Arc::clone(&directory),
99            Arc::clone(&schema),
100            metadata,
101            config.merge_policy.clone_box(),
102            config.term_cache_blocks,
103        ));
104
105        // Save initial metadata
106        segment_manager.update_metadata(|_| {}).await?;
107
108        Ok(Self {
109            directory,
110            schema,
111            config,
112            builder_config,
113            tokenizers: FxHashMap::default(),
114            builders,
115            segment_manager,
116            segment_id_sender,
117            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
118            pending_builds: Arc::new(AtomicUsize::new(0)),
119        })
120    }
121
122    /// Open an existing index for writing
123    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
124        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
125    }
126
127    /// Open an existing index with custom builder config
128    pub async fn open_with_config(
129        directory: D,
130        config: IndexConfig,
131        builder_config: SegmentBuilderConfig,
132    ) -> Result<Self> {
133        let directory = Arc::new(directory);
134
135        // Read schema
136        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
137        let schema_bytes = schema_slice.read_bytes().await?;
138        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
139            .map_err(|e| Error::Serialization(e.to_string()))?;
140        let schema = Arc::new(schema);
141
142        // Load unified metadata
143        let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
144
145        // Create multiple builders for parallel indexing
146        let num_builders = config.num_indexing_threads.max(1);
147        let mut builders = Vec::with_capacity(num_builders);
148        for _ in 0..num_builders {
149            builders.push(AsyncMutex::new(None));
150        }
151
152        // Create channel for background builds to report completed segment IDs
153        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
154
155        // Create segment manager - owns metadata.json
156        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
157            Arc::clone(&directory),
158            Arc::clone(&schema),
159            metadata,
160            config.merge_policy.clone_box(),
161            config.term_cache_blocks,
162        ));
163
164        Ok(Self {
165            directory,
166            schema,
167            config,
168            builder_config,
169            tokenizers: FxHashMap::default(),
170            builders,
171            segment_manager,
172            segment_id_sender,
173            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
174            pending_builds: Arc::new(AtomicUsize::new(0)),
175        })
176    }
177
178    /// Get the schema
179    pub fn schema(&self) -> &Schema {
180        &self.schema
181    }
182
183    /// Set tokenizer for a field
184    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
185        self.tokenizers.insert(field, Box::new(tokenizer));
186    }
187
188    /// Add a document
189    ///
190    /// Documents are distributed randomly across multiple builders for parallel indexing.
191    /// Random distribution avoids atomic contention and provides better load balancing.
192    /// When a builder reaches `max_docs_per_segment`, it is committed and a new one starts.
193    pub async fn add_document(&self, doc: Document) -> Result<DocId> {
194        use rand::Rng;
195
196        // Random selection of builder - avoids atomic contention
197        let builder_idx = rand::rng().random_range(0..self.builders.len());
198
199        let mut builder_guard = self.builders[builder_idx].lock().await;
200
201        // Initialize builder if needed
202        if builder_guard.is_none() {
203            let mut builder =
204                SegmentBuilder::new((*self.schema).clone(), self.builder_config.clone())?;
205            for (field, tokenizer) in &self.tokenizers {
206                builder.set_tokenizer(*field, tokenizer.clone_box());
207            }
208            *builder_guard = Some(builder);
209        }
210
211        let builder = builder_guard.as_mut().unwrap();
212        let doc_id = builder.add_document(doc)?;
213
214        // Check if we need to commit
215        if builder.num_docs() >= self.config.max_docs_per_segment {
216            let full_builder = builder_guard.take().unwrap();
217            drop(builder_guard); // Release lock before spawning background task
218            self.spawn_background_build(full_builder);
219        }
220
221        Ok(doc_id)
222    }
223
224    /// Spawn a background task to build a segment without blocking document ingestion
225    ///
226    /// The background task will send its segment ID through the channel when complete,
227    /// allowing indexing to continue immediately.
228    fn spawn_background_build(&self, builder: SegmentBuilder) {
229        let directory = Arc::clone(&self.directory);
230        let segment_id = SegmentId::new();
231        let segment_hex = segment_id.to_hex();
232        let sender = self.segment_id_sender.clone();
233        let segment_manager = Arc::clone(&self.segment_manager);
234
235        self.pending_builds.fetch_add(1, Ordering::SeqCst);
236
237        // Spawn a fully independent task that registers its own segment ID
238        tokio::spawn(async move {
239            match builder.build(directory.as_ref(), segment_id).await {
240                Ok(_) => {
241                    // Register segment via SegmentManager (also triggers merge check)
242                    let _ = segment_manager.register_segment(segment_hex.clone()).await;
243                    // Also send through channel for flush() to know when all are done
244                    let _ = sender.send(segment_hex);
245                }
246                Err(e) => {
247                    // Log error but don't crash - segment just won't be registered
248                    eprintln!("Background segment build failed: {:?}", e);
249                }
250            }
251        });
252    }
253
254    /// Collect any completed segment IDs from the channel (non-blocking)
255    ///
256    /// Merge checking is now handled by SegmentManager.register_segment().
257    async fn collect_completed_segments(&self) {
258        let mut receiver = self.segment_id_receiver.lock().await;
259        while let Ok(_segment_hex) = receiver.try_recv() {
260            // Segment ID already registered by the background task via SegmentManager
261            self.pending_builds.fetch_sub(1, Ordering::SeqCst);
262        }
263    }
264
265    /// Get the number of pending background builds
266    pub fn pending_build_count(&self) -> usize {
267        self.pending_builds.load(Ordering::SeqCst)
268    }
269
270    /// Get the number of pending background merges
271    pub fn pending_merge_count(&self) -> usize {
272        self.segment_manager.pending_merge_count()
273    }
274
275    /// Check merge policy and spawn background merges if needed
276    ///
277    /// This is called automatically after segment builds complete via SegmentManager.
278    /// Can also be called manually to trigger merge checking.
279    pub async fn maybe_merge(&self) {
280        self.segment_manager.maybe_merge().await;
281    }
282
283    /// Wait for all pending merges to complete
284    pub async fn wait_for_merges(&self) {
285        self.segment_manager.wait_for_merges().await;
286    }
287
288    /// Clean up orphan segment files that are not registered
289    ///
290    /// This can happen if the process halts after segment files are written
291    /// but before they are registered in segments.json. Call this after opening
292    /// an index to reclaim disk space from incomplete operations.
293    ///
294    /// Returns the number of orphan segments deleted.
295    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
296        self.segment_manager.cleanup_orphan_segments().await
297    }
298
299    /// Get current builder statistics for debugging (aggregated from all builders)
300    pub async fn get_builder_stats(&self) -> Option<crate::segment::SegmentBuilderStats> {
301        let mut total_stats: Option<crate::segment::SegmentBuilderStats> = None;
302
303        for builder_mutex in &self.builders {
304            let guard = builder_mutex.lock().await;
305            if let Some(builder) = guard.as_ref() {
306                let stats = builder.stats();
307                if let Some(ref mut total) = total_stats {
308                    total.num_docs += stats.num_docs;
309                    total.unique_terms += stats.unique_terms;
310                    total.postings_in_memory += stats.postings_in_memory;
311                    total.interned_strings += stats.interned_strings;
312                    total.doc_field_lengths_size += stats.doc_field_lengths_size;
313                } else {
314                    total_stats = Some(stats);
315                }
316            }
317        }
318
319        total_stats
320    }
321
322    /// Flush current builders to background processing (non-blocking)
323    ///
324    /// This takes all current builders with documents and spawns background tasks
325    /// to build them. Returns immediately - use `commit()` for durability.
326    /// New documents can continue to be added while segments are being built.
327    pub async fn flush(&self) -> Result<()> {
328        // Collect any already-completed segments
329        self.collect_completed_segments().await;
330
331        // Take all builders that have documents and spawn background builds
332        for builder_mutex in &self.builders {
333            let mut guard = builder_mutex.lock().await;
334            if let Some(builder) = guard.take()
335                && builder.num_docs() > 0
336            {
337                self.spawn_background_build(builder);
338            }
339        }
340
341        Ok(())
342    }
343
344    /// Commit all pending segments to disk and wait for completion
345    ///
346    /// This flushes any current builders and waits for ALL background builds
347    /// and merges to complete. Provides durability guarantees - all data is persisted.
348    ///
349    /// **Auto-triggers vector index build** when threshold is crossed for any field.
350    pub async fn commit(&self) -> Result<()> {
351        // First flush any current builders
352        self.flush().await?;
353
354        // Wait for all pending builds to complete
355        let mut receiver = self.segment_id_receiver.lock().await;
356        while self.pending_builds.load(Ordering::SeqCst) > 0 {
357            match receiver.recv().await {
358                Some(_segment_hex) => {
359                    self.pending_builds.fetch_sub(1, Ordering::SeqCst);
360                }
361                None => break, // Channel closed
362            }
363        }
364        drop(receiver);
365
366        // Auto-trigger vector index build if threshold crossed
367        self.maybe_build_vector_index().await?;
368
369        Ok(())
370    }
371
372    // Vector index building methods are in vector_builder.rs
373
374    /// Merge all segments into one (called explicitly via force_merge)
375    async fn do_merge(&self) -> Result<()> {
376        let segment_ids = self.segment_manager.get_segment_ids().await;
377
378        if segment_ids.len() < 2 {
379            return Ok(());
380        }
381
382        let ids_to_merge: Vec<String> = segment_ids.clone();
383        drop(segment_ids);
384
385        // Load segment readers
386        let mut readers = Vec::new();
387        let mut doc_offset = 0u32;
388
389        for id_str in &ids_to_merge {
390            let segment_id = SegmentId::from_hex(id_str)
391                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
392            let reader = SegmentReader::open(
393                self.directory.as_ref(),
394                segment_id,
395                Arc::clone(&self.schema),
396                doc_offset,
397                self.config.term_cache_blocks,
398            )
399            .await?;
400            doc_offset += reader.meta().num_docs;
401            readers.push(reader);
402        }
403
404        // Merge into new segment
405        let merger = SegmentMerger::new(Arc::clone(&self.schema));
406        let new_segment_id = SegmentId::new();
407        merger
408            .merge(self.directory.as_ref(), &readers, new_segment_id)
409            .await?;
410
411        // Atomically update segments and delete old ones via SegmentManager
412        self.segment_manager
413            .replace_segments(vec![new_segment_id.to_hex()], ids_to_merge)
414            .await?;
415
416        Ok(())
417    }
418
419    /// Force merge all segments into one
420    pub async fn force_merge(&self) -> Result<()> {
421        // First commit all pending documents (waits for completion)
422        self.commit().await?;
423        // Then merge all segments
424        self.do_merge().await
425    }
426
427    // Vector index methods (build_vector_index, rebuild_vector_index, etc.)
428    // are implemented in vector_builder.rs
429}