Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter - async document indexing with parallel segment building
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use rustc_hash::FxHashMap;
10use tokio::sync::Mutex as AsyncMutex;
11use tokio::sync::mpsc;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{
18    SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
19};
20use crate::tokenizer::BoxedTokenizer;
21
22use super::IndexConfig;
23
24/// Async IndexWriter for adding documents and committing segments
25///
26/// Features:
27/// - Parallel indexing with multiple segment builders
28/// - Streams documents to disk immediately (no in-memory document storage)
29/// - Uses string interning for terms (reduced allocations)
30/// - Uses hashbrown HashMap (faster than BTreeMap)
31pub struct IndexWriter<D: DirectoryWriter + 'static> {
32    directory: Arc<D>,
33    schema: Arc<Schema>,
34    config: IndexConfig,
35    builder_config: SegmentBuilderConfig,
36    tokenizers: FxHashMap<Field, BoxedTokenizer>,
37    /// Multiple segment builders for parallel indexing
38    builders: Vec<AsyncMutex<Option<SegmentBuilder>>>,
39    /// Segment manager - handles segment registration and background merging
40    segment_manager: Arc<crate::merge::SegmentManager<D>>,
41    /// Channel sender for completed segment IDs from background builds
42    segment_id_sender: mpsc::UnboundedSender<String>,
43    /// Channel receiver for completed segment IDs
44    segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<String>>,
45    /// Count of in-flight background builds
46    pending_builds: Arc<AtomicUsize>,
47}
48
49impl<D: DirectoryWriter + 'static> IndexWriter<D> {
50    /// Create a new index in the directory
51    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
52        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
53    }
54
55    /// Create a new index with custom builder config
56    pub async fn create_with_config(
57        directory: D,
58        schema: Schema,
59        config: IndexConfig,
60        builder_config: SegmentBuilderConfig,
61    ) -> Result<Self> {
62        let directory = Arc::new(directory);
63        let schema = Arc::new(schema);
64
65        // Write schema
66        let schema_bytes =
67            serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
68        directory
69            .write(Path::new("schema.json"), &schema_bytes)
70            .await?;
71
72        // Write empty segments list
73        let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
74            .map_err(|e| Error::Serialization(e.to_string()))?;
75        directory
76            .write(Path::new("segments.json"), &segments_bytes)
77            .await?;
78
79        // Create multiple builders for parallel indexing
80        let num_builders = config.num_indexing_threads.max(1);
81        let mut builders = Vec::with_capacity(num_builders);
82        for _ in 0..num_builders {
83            builders.push(AsyncMutex::new(None));
84        }
85
86        // Create channel for background builds to report completed segment IDs
87        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
88
89        // Create segment manager with the configured merge policy
90        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
91            Arc::clone(&directory),
92            Arc::clone(&schema),
93            Vec::new(),
94            config.merge_policy.clone_box(),
95            config.term_cache_blocks,
96        ));
97
98        Ok(Self {
99            directory,
100            schema,
101            config,
102            builder_config,
103            tokenizers: FxHashMap::default(),
104            builders,
105            segment_manager,
106            segment_id_sender,
107            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
108            pending_builds: Arc::new(AtomicUsize::new(0)),
109        })
110    }
111
112    /// Open an existing index for writing
113    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
114        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
115    }
116
117    /// Open an existing index with custom builder config
118    pub async fn open_with_config(
119        directory: D,
120        config: IndexConfig,
121        builder_config: SegmentBuilderConfig,
122    ) -> Result<Self> {
123        let directory = Arc::new(directory);
124
125        // Read schema
126        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
127        let schema_bytes = schema_slice.read_bytes().await?;
128        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
129            .map_err(|e| Error::Serialization(e.to_string()))?;
130        let schema = Arc::new(schema);
131
132        // Read existing segment IDs (hex strings)
133        let segments_slice = directory.open_read(Path::new("segments.json")).await?;
134        let segments_bytes = segments_slice.read_bytes().await?;
135        let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
136            .map_err(|e| Error::Serialization(e.to_string()))?;
137
138        // Create multiple builders for parallel indexing
139        let num_builders = config.num_indexing_threads.max(1);
140        let mut builders = Vec::with_capacity(num_builders);
141        for _ in 0..num_builders {
142            builders.push(AsyncMutex::new(None));
143        }
144
145        // Create channel for background builds to report completed segment IDs
146        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
147
148        // Create segment manager with the configured merge policy
149        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
150            Arc::clone(&directory),
151            Arc::clone(&schema),
152            segment_ids,
153            config.merge_policy.clone_box(),
154            config.term_cache_blocks,
155        ));
156
157        Ok(Self {
158            directory,
159            schema,
160            config,
161            builder_config,
162            tokenizers: FxHashMap::default(),
163            builders,
164            segment_manager,
165            segment_id_sender,
166            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
167            pending_builds: Arc::new(AtomicUsize::new(0)),
168        })
169    }
170
171    /// Get the schema
172    pub fn schema(&self) -> &Schema {
173        &self.schema
174    }
175
176    /// Set tokenizer for a field
177    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
178        self.tokenizers.insert(field, Box::new(tokenizer));
179    }
180
181    /// Add a document
182    ///
183    /// Documents are distributed randomly across multiple builders for parallel indexing.
184    /// Random distribution avoids atomic contention and provides better load balancing.
185    /// When a builder reaches `max_docs_per_segment`, it is committed and a new one starts.
186    pub async fn add_document(&self, doc: Document) -> Result<DocId> {
187        use rand::Rng;
188
189        // Random selection of builder - avoids atomic contention
190        let builder_idx = rand::rng().random_range(0..self.builders.len());
191
192        let mut builder_guard = self.builders[builder_idx].lock().await;
193
194        // Initialize builder if needed
195        if builder_guard.is_none() {
196            let mut builder =
197                SegmentBuilder::new((*self.schema).clone(), self.builder_config.clone())?;
198            for (field, tokenizer) in &self.tokenizers {
199                builder.set_tokenizer(*field, tokenizer.clone_box());
200            }
201            *builder_guard = Some(builder);
202        }
203
204        let builder = builder_guard.as_mut().unwrap();
205        let doc_id = builder.add_document(doc)?;
206
207        // Check if we need to commit
208        if builder.num_docs() >= self.config.max_docs_per_segment {
209            let full_builder = builder_guard.take().unwrap();
210            drop(builder_guard); // Release lock before spawning background task
211            self.spawn_background_build(full_builder);
212        }
213
214        Ok(doc_id)
215    }
216
217    /// Spawn a background task to build a segment without blocking document ingestion
218    ///
219    /// The background task will send its segment ID through the channel when complete,
220    /// allowing indexing to continue immediately.
221    fn spawn_background_build(&self, builder: SegmentBuilder) {
222        let directory = Arc::clone(&self.directory);
223        let segment_id = SegmentId::new();
224        let segment_hex = segment_id.to_hex();
225        let sender = self.segment_id_sender.clone();
226        let segment_manager = Arc::clone(&self.segment_manager);
227
228        self.pending_builds.fetch_add(1, Ordering::SeqCst);
229
230        // Spawn a fully independent task that registers its own segment ID
231        tokio::spawn(async move {
232            match builder.build(directory.as_ref(), segment_id).await {
233                Ok(_) => {
234                    // Register segment via SegmentManager (also triggers merge check)
235                    segment_manager.register_segment(segment_hex.clone()).await;
236                    // Also send through channel for flush() to know when all are done
237                    let _ = sender.send(segment_hex);
238                }
239                Err(e) => {
240                    // Log error but don't crash - segment just won't be registered
241                    eprintln!("Background segment build failed: {:?}", e);
242                }
243            }
244        });
245    }
246
247    /// Collect any completed segment IDs from the channel (non-blocking)
248    ///
249    /// Merge checking is now handled by SegmentManager.register_segment().
250    async fn collect_completed_segments(&self) {
251        let mut receiver = self.segment_id_receiver.lock().await;
252        while let Ok(_segment_hex) = receiver.try_recv() {
253            // Segment ID already registered by the background task via SegmentManager
254            self.pending_builds.fetch_sub(1, Ordering::SeqCst);
255        }
256    }
257
258    /// Get the number of pending background builds
259    pub fn pending_build_count(&self) -> usize {
260        self.pending_builds.load(Ordering::SeqCst)
261    }
262
263    /// Get the number of pending background merges
264    pub fn pending_merge_count(&self) -> usize {
265        self.segment_manager.pending_merge_count()
266    }
267
268    /// Check merge policy and spawn background merges if needed
269    ///
270    /// This is called automatically after segment builds complete via SegmentManager.
271    /// Can also be called manually to trigger merge checking.
272    pub async fn maybe_merge(&self) {
273        self.segment_manager.maybe_merge().await;
274    }
275
276    /// Wait for all pending merges to complete
277    pub async fn wait_for_merges(&self) {
278        self.segment_manager.wait_for_merges().await;
279    }
280
281    /// Clean up orphan segment files that are not registered
282    ///
283    /// This can happen if the process halts after segment files are written
284    /// but before they are registered in segments.json. Call this after opening
285    /// an index to reclaim disk space from incomplete operations.
286    ///
287    /// Returns the number of orphan segments deleted.
288    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
289        self.segment_manager.cleanup_orphan_segments().await
290    }
291
292    /// Get current builder statistics for debugging (aggregated from all builders)
293    pub async fn get_builder_stats(&self) -> Option<crate::segment::SegmentBuilderStats> {
294        let mut total_stats: Option<crate::segment::SegmentBuilderStats> = None;
295
296        for builder_mutex in &self.builders {
297            let guard = builder_mutex.lock().await;
298            if let Some(builder) = guard.as_ref() {
299                let stats = builder.stats();
300                if let Some(ref mut total) = total_stats {
301                    total.num_docs += stats.num_docs;
302                    total.unique_terms += stats.unique_terms;
303                    total.postings_in_memory += stats.postings_in_memory;
304                    total.interned_strings += stats.interned_strings;
305                    total.doc_field_lengths_size += stats.doc_field_lengths_size;
306                } else {
307                    total_stats = Some(stats);
308                }
309            }
310        }
311
312        total_stats
313    }
314
315    /// Flush current builders to background processing (non-blocking)
316    ///
317    /// This takes all current builders with documents and spawns background tasks
318    /// to build them. Returns immediately - use `commit()` for durability.
319    /// New documents can continue to be added while segments are being built.
320    pub async fn flush(&self) -> Result<()> {
321        // Collect any already-completed segments
322        self.collect_completed_segments().await;
323
324        // Take all builders that have documents and spawn background builds
325        for builder_mutex in &self.builders {
326            let mut guard = builder_mutex.lock().await;
327            if let Some(builder) = guard.take()
328                && builder.num_docs() > 0
329            {
330                self.spawn_background_build(builder);
331            }
332        }
333
334        Ok(())
335    }
336
337    /// Commit all pending segments to disk and wait for completion
338    ///
339    /// This flushes any current builders and waits for ALL background builds
340    /// to complete. Provides durability guarantees - all data is persisted.
341    pub async fn commit(&self) -> Result<()> {
342        // First flush any current builders
343        self.flush().await?;
344
345        // Wait for all pending builds to complete
346        let mut receiver = self.segment_id_receiver.lock().await;
347        while self.pending_builds.load(Ordering::SeqCst) > 0 {
348            match receiver.recv().await {
349                Some(_segment_hex) => {
350                    self.pending_builds.fetch_sub(1, Ordering::SeqCst);
351                }
352                None => break, // Channel closed
353            }
354        }
355        drop(receiver);
356
357        // Write segments.json to persist the segment list
358        let segment_ids = self.segment_manager.get_segment_ids().await;
359        let segments_bytes =
360            serde_json::to_vec(&segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
361        self.directory
362            .write(Path::new("segments.json"), &segments_bytes)
363            .await?;
364
365        Ok(())
366    }
367
368    /// Merge all segments into one (called explicitly via force_merge)
369    async fn do_merge(&self) -> Result<()> {
370        let segment_ids = self.segment_manager.get_segment_ids().await;
371
372        if segment_ids.len() < 2 {
373            return Ok(());
374        }
375
376        let ids_to_merge: Vec<String> = segment_ids.clone();
377        drop(segment_ids);
378
379        // Load segment readers
380        let mut readers = Vec::new();
381        let mut doc_offset = 0u32;
382
383        for id_str in &ids_to_merge {
384            let segment_id = SegmentId::from_hex(id_str)
385                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
386            let reader = SegmentReader::open(
387                self.directory.as_ref(),
388                segment_id,
389                Arc::clone(&self.schema),
390                doc_offset,
391                self.config.term_cache_blocks,
392            )
393            .await?;
394            doc_offset += reader.meta().num_docs;
395            readers.push(reader);
396        }
397
398        // Merge into new segment
399        let merger = SegmentMerger::new(Arc::clone(&self.schema));
400        let new_segment_id = SegmentId::new();
401        merger
402            .merge(self.directory.as_ref(), &readers, new_segment_id)
403            .await?;
404
405        // Update segment list via segment manager
406        {
407            let segment_ids_arc = self.segment_manager.segment_ids();
408            let mut segment_ids = segment_ids_arc.lock().await;
409            segment_ids.clear();
410            segment_ids.push(new_segment_id.to_hex());
411        }
412
413        let segment_ids = self.segment_manager.get_segment_ids().await;
414        let segments_bytes =
415            serde_json::to_vec(&segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
416        self.directory
417            .write(Path::new("segments.json"), &segments_bytes)
418            .await?;
419
420        // Delete old segments
421        for id_str in ids_to_merge {
422            if let Some(segment_id) = SegmentId::from_hex(&id_str) {
423                let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
424            }
425        }
426
427        Ok(())
428    }
429
430    /// Force merge all segments into one
431    pub async fn force_merge(&self) -> Result<()> {
432        // First commit all pending documents (waits for completion)
433        self.commit().await?;
434        // Then merge all segments
435        self.do_merge().await
436    }
437}